未验证 提交 9b8aa468 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9291 from taosdata/feature/dnode3

Feature/dnode3
......@@ -78,8 +78,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB, "create-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB, "alter-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB, "drop-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_VGROUP_LIST, "vgroup-list" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_STREAM, "kill-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_HEARTBEAT, "heartbeat" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" )
......@@ -946,18 +945,6 @@ typedef struct {
char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; // include subqueries' index, Obj IDs and states(C-complete/I-imcomplete)
} SQueryDesc;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
char dstTable[TSDB_TABLE_NAME_LEN];
int32_t streamId;
int64_t num; // number of computing/cycles
int64_t useconds;
int64_t ctime;
int64_t stime;
int64_t slidingTime;
int64_t interval;
} SStreamDesc;
typedef struct {
int32_t connId;
int32_t pid;
......
......@@ -73,7 +73,6 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_VGROUP_LIST] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_STREAM] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SHOW] = dndProcessMnodeReadMsg;
......
......@@ -8,8 +8,8 @@ add_subdirectory(db)
add_subdirectory(dnode)
# add_subdirectory(func)
# add_subdirectory(mnode)
# add_subdirectory(profile)
# add_subdirectory(show)
add_subdirectory(profile)
add_subdirectory(show)
add_subdirectory(stb)
# add_subdirectory(sync)
# add_subdirectory(telem)
......@@ -17,4 +17,4 @@ add_subdirectory(stb)
add_subdirectory(user)
add_subdirectory(vgroup)
# add_subdirectory(common)
add_subdirectory(sut)
add_executable(dnode_test_acct "")
target_sources(dnode_test_acct
PRIVATE
"acct.cpp"
"../sut/deploy.cpp"
)
aux_source_directory(. ACCT_SRC)
add_executable(dnode_test_acct ${ACCT_SRC})
target_link_libraries(
dnode_test_acct
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dnode_test_acct
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
PUBLIC sut
)
add_test(
......
......@@ -9,103 +9,59 @@
*
*/
#include "deploy.h"
#include "base.h"
class DndTestAcct : public ::testing::Test {
protected:
static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SServer* pServer = createServer(path, fqdn, port, firstEp);
ASSERT(pServer);
return pServer;
}
static void SetUpTestSuite() {
initLog("/tmp/tdlog");
const char* fqdn = "localhost";
const char* firstEp = "localhost:9012";
pServer = CreateServer("/tmp/dnode_test_user", fqdn, 9012, firstEp);
pClient = createClient("root", "taosdata", fqdn, 9012);
taosMsleep(300);
}
static void TearDownTestSuite() {
stopServer(pServer);
dropClient(pClient);
pServer = NULL;
pClient = NULL;
}
static SServer* pServer;
static SClient* pClient;
static int32_t connId;
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_acct", 9012); }
static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test;
public:
void SetUp() override {}
void TearDown() override {}
};
SServer* DndTestAcct::pServer;
SClient* DndTestAcct::pClient;
int32_t DndTestAcct::connId;
Testbase DndTestAcct::test;
TEST_F(DndTestAcct, 01_CreateAcct) {
ASSERT_NE(pClient, nullptr);
SCreateAcctMsg* pReq = (SCreateAcctMsg*)rpcMallocCont(sizeof(SCreateAcctMsg));
int32_t contLen = sizeof(SCreateAcctMsg);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateAcctMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_ACCT;
SCreateAcctMsg* pReq = (SCreateAcctMsg*)rpcMallocCont(contLen);
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_ACCT, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
}
TEST_F(DndTestAcct, 02_AlterAcct) {
ASSERT_NE(pClient, nullptr);
int32_t contLen = sizeof(SCreateAcctMsg);
SAlterAcctMsg* pReq = (SAlterAcctMsg*)rpcMallocCont(sizeof(SAlterAcctMsg));
SAlterAcctMsg* pReq = (SAlterAcctMsg*)rpcMallocCont(contLen);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SAlterAcctMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_ACCT;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_ALTER_ACCT, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
}
TEST_F(DndTestAcct, 03_DropAcct) {
ASSERT_NE(pClient, nullptr);
SDropAcctMsg* pReq = (SDropAcctMsg*)rpcMallocCont(sizeof(SDropAcctMsg));
int32_t contLen = sizeof(SDropAcctMsg);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropAcctMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_ACCT;
SDropAcctMsg* pReq = (SDropAcctMsg*)rpcMallocCont(contLen);
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_ACCT, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
}
TEST_F(DndTestAcct, 04_ShowAcct) {
ASSERT_NE(pClient, nullptr);
int32_t contLen = sizeof(SShowMsg);
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(contLen);
pReq->type = TSDB_MGMT_TABLE_ACCT;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SShowMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_SHOW, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_MSG_TYPE);
}
\ No newline at end of file
add_executable(dnode_test_cluster "")
target_sources(dnode_test_cluster
PRIVATE
"cluster.cpp"
"../sut/deploy.cpp"
)
aux_source_directory(. CLUSTER_SRC)
add_executable(dnode_test_cluster ${CLUSTER_SRC})
target_link_libraries(
dnode_test_cluster
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dnode_test_cluster
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
PUBLIC sut
)
add_test(
......
......@@ -9,162 +9,33 @@
*
*/
#include "deploy.h"
#include "base.h"
class DndTestCluster : public ::testing::Test {
protected:
static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SServer* pServer = createServer(path, fqdn, port, firstEp);
ASSERT(pServer);
return pServer;
}
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_cluster", 9030); }
static void TearDownTestSuite() { test.Cleanup(); }
static void SetUpTestSuite() {
initLog("/tmp/tdlog");
const char* fqdn = "localhost";
const char* firstEp = "localhost:9030";
pServer = CreateServer("/tmp/dnode_test_cluster", fqdn, 9030, firstEp);
pClient = createClient("root", "taosdata", fqdn, 9030);
taosMsleep(1100);
}
static void TearDownTestSuite() {
stopServer(pServer);
dropClient(pClient);
pServer = NULL;
pClient = NULL;
}
static SServer* pServer;
static SClient* pClient;
static int32_t connId;
static Testbase test;
public:
void SetUp() override {}
void TearDown() override {}
void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns, const char* db) {
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = showType;
if (db != NULL) {
strcpy(pShow->db, db);
}
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
ASSERT_NE(pShowRsp, nullptr);
pShowRsp->showId = htonl(pShowRsp->showId);
pMeta = &pShowRsp->tableMeta;
pMeta->numOfTags = htonl(pMeta->numOfTags);
pMeta->numOfColumns = htonl(pMeta->numOfColumns);
pMeta->sversion = htonl(pMeta->sversion);
pMeta->tversion = htonl(pMeta->tversion);
pMeta->tuid = htobe64(pMeta->tuid);
pMeta->suid = htobe64(pMeta->suid);
showId = pShowRsp->showId;
EXPECT_NE(pShowRsp->showId, 0);
EXPECT_STREQ(pMeta->tbFname, showName);
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, columns);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->update, 0);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tuid, 0);
EXPECT_EQ(pMeta->suid, 0);
}
void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, type);
EXPECT_EQ(pSchema->bytes, bytes);
EXPECT_STREQ(pSchema->name, name);
}
void SendThenCheckShowRetrieveMsg(int32_t rows) {
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = htonl(showId);
pRetrieve->free = 0;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->useconds, 0);
// EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0);
pData = pRetrieveRsp->data;
pos = 0;
}
void CheckInt32() {
int32_t data = *((int32_t*)(pData + pos));
pos += sizeof(int32_t);
EXPECT_GT(data, 0);
}
void CheckTimestamp() {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_GT(data, 0);
}
void CheckBinary(int32_t len) {
pos += sizeof(VarDataLenT);
char* data = (char*)(pData + pos);
pos += len;
}
int32_t showId;
STableMetaMsg* pMeta;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;
};
SServer* DndTestCluster::pServer;
SClient* DndTestCluster::pClient;
int32_t DndTestCluster::connId;
Testbase DndTestCluster::test;
TEST_F(DndTestCluster, 01_ShowCluster) {
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_CLUSTER, "show cluster", 3, NULL);
CheckSchema(0, TSDB_DATA_TYPE_INT, 4, "id");
CheckSchema(1, TSDB_DATA_TYPE_BINARY, TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, "name");
CheckSchema(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
test.SendShowMetaMsg(TSDB_MGMT_TABLE_CLUSTER, "");
CHECK_META( "show cluster", 3);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "id");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, "name");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
SendThenCheckShowRetrieveMsg(1);
CheckInt32();
CheckBinary(TSDB_CLUSTER_ID_LEN);
IgnoreInt32();
IgnoreBinary(TSDB_CLUSTER_ID_LEN);
CheckTimestamp();
}
\ No newline at end of file
add_executable(dnode_test_db "")
target_sources(dnode_test_db
PRIVATE
"db.cpp"
"../sut/deploy.cpp"
)
aux_source_directory(. DB_SRC)
add_executable(dnode_test_db ${DB_SRC})
target_link_libraries(
dnode_test_db
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dnode_test_db
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
PUBLIC sut
)
add_test(
......
......@@ -9,199 +9,52 @@
*
*/
#include "deploy.h"
#include "base.h"
class DndTestDb : public ::testing::Test {
protected:
static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SServer* pServer = createServer(path, fqdn, port, firstEp);
ASSERT(pServer);
return pServer;
}
static void SetUpTestSuite() {
initLog("/tmp/tdlog");
const char* fqdn = "localhost";
const char* firstEp = "localhost:9040";
pServer = CreateServer("/tmp/dnode_test_db", fqdn, 9040, firstEp);
pClient = createClient("root", "taosdata", fqdn, 9040);
taosMsleep(1100);
}
static void TearDownTestSuite() {
stopServer(pServer);
dropClient(pClient);
pServer = NULL;
pClient = NULL;
}
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_db", 9040); }
static void TearDownTestSuite() { test.Cleanup(); }
static SServer* pServer;
static SClient* pClient;
static int32_t connId;
static Testbase test;
public:
void SetUp() override {}
void TearDown() override {}
void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns, const char* db) {
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = showType;
if (db != NULL) {
strcpy(pShow->db, db);
}
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
ASSERT_NE(pShowRsp, nullptr);
pShowRsp->showId = htonl(pShowRsp->showId);
pMeta = &pShowRsp->tableMeta;
pMeta->numOfTags = htonl(pMeta->numOfTags);
pMeta->numOfColumns = htonl(pMeta->numOfColumns);
pMeta->sversion = htonl(pMeta->sversion);
pMeta->tversion = htonl(pMeta->tversion);
pMeta->tuid = htobe64(pMeta->tuid);
pMeta->suid = htobe64(pMeta->suid);
showId = pShowRsp->showId;
EXPECT_NE(pShowRsp->showId, 0);
EXPECT_STREQ(pMeta->tbFname, showName);
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, columns);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->update, 0);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tuid, 0);
EXPECT_EQ(pMeta->suid, 0);
}
void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, type);
EXPECT_EQ(pSchema->bytes, bytes);
EXPECT_STREQ(pSchema->name, name);
}
void SendThenCheckShowRetrieveMsg(int32_t rows) {
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = htonl(showId);
pRetrieve->free = 0;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->useconds, 0);
// EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0);
pData = pRetrieveRsp->data;
pos = 0;
}
void CheckInt8(int8_t val) {
int8_t data = *((int8_t*)(pData + pos));
pos += sizeof(int8_t);
EXPECT_EQ(data, val);
}
void CheckInt16(int16_t val) {
int16_t data = *((int16_t*)(pData + pos));
pos += sizeof(int16_t);
EXPECT_EQ(data, val);
}
void CheckInt32(int32_t val) {
int32_t data = *((int32_t*)(pData + pos));
pos += sizeof(int32_t);
EXPECT_EQ(data, val);
}
void CheckInt64(int64_t val) {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_EQ(data, val);
}
void CheckTimestamp() {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_GT(data, 0);
}
void CheckBinary(const char* val, int32_t len) {
pos += sizeof(VarDataLenT);
char* data = (char*)(pData + pos);
pos += len;
EXPECT_STREQ(data, val);
}
int32_t showId;
STableMetaMsg* pMeta;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;
};
SServer* DndTestDb::pServer;
SClient* DndTestDb::pClient;
int32_t DndTestDb::connId;
Testbase DndTestDb::test;
TEST_F(DndTestDb, 01_ShowDb) {
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name");
CheckSchema(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CheckSchema(2, TSDB_DATA_TYPE_SMALLINT, 2, "vgroups");
CheckSchema(3, TSDB_DATA_TYPE_SMALLINT, 2, "replica");
CheckSchema(4, TSDB_DATA_TYPE_SMALLINT, 2, "quorum");
CheckSchema(5, TSDB_DATA_TYPE_SMALLINT, 2, "days");
CheckSchema(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "keep0,keep1,keep2");
CheckSchema(7, TSDB_DATA_TYPE_INT, 4, "cache");
CheckSchema(8, TSDB_DATA_TYPE_INT, 4, "blocks");
CheckSchema(9, TSDB_DATA_TYPE_INT, 4, "minrows");
CheckSchema(10, TSDB_DATA_TYPE_INT, 4, "maxrows");
CheckSchema(11, TSDB_DATA_TYPE_TINYINT, 1, "wallevel");
CheckSchema(12, TSDB_DATA_TYPE_INT, 4, "fsync");
CheckSchema(13, TSDB_DATA_TYPE_TINYINT, 1, "comp");
CheckSchema(14, TSDB_DATA_TYPE_TINYINT, 1, "cachelast");
CheckSchema(15, TSDB_DATA_TYPE_BINARY, 3 + VARSTR_HEADER_SIZE, "precision");
CheckSchema(16, TSDB_DATA_TYPE_TINYINT, 1, "update");
SendThenCheckShowRetrieveMsg(0);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, "");
CHECK_META("show databases", 17);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_SMALLINT, 2, "vgroups");
CHECK_SCHEMA(3, TSDB_DATA_TYPE_SMALLINT, 2, "replica");
CHECK_SCHEMA(4, TSDB_DATA_TYPE_SMALLINT, 2, "quorum");
CHECK_SCHEMA(5, TSDB_DATA_TYPE_SMALLINT, 2, "days");
CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "keep0,keep1,keep2");
CHECK_SCHEMA(7, TSDB_DATA_TYPE_INT, 4, "cache");
CHECK_SCHEMA(8, TSDB_DATA_TYPE_INT, 4, "blocks");
CHECK_SCHEMA(9, TSDB_DATA_TYPE_INT, 4, "minrows");
CHECK_SCHEMA(10, TSDB_DATA_TYPE_INT, 4, "maxrows");
CHECK_SCHEMA(11, TSDB_DATA_TYPE_TINYINT, 1, "wallevel");
CHECK_SCHEMA(12, TSDB_DATA_TYPE_INT, 4, "fsync");
CHECK_SCHEMA(13, TSDB_DATA_TYPE_TINYINT, 1, "comp");
CHECK_SCHEMA(14, TSDB_DATA_TYPE_TINYINT, 1, "cachelast");
CHECK_SCHEMA(15, TSDB_DATA_TYPE_BINARY, 3 + VARSTR_HEADER_SIZE, "precision");
CHECK_SCHEMA(16, TSDB_DATA_TYPE_TINYINT, 1, "update");
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 0);
}
TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
{
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg));
int32_t contLen = sizeof(SCreateDbMsg);
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d1");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
......@@ -223,20 +76,16 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
pReq->cacheLastRow = 0;
pReq->ignoreExist = 1;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DB;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DB, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
// taosMsleep(1000000);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
SendThenCheckShowRetrieveMsg(1);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, "");
CHECK_META("show databases", 17);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
CheckBinary("d1", TSDB_DB_NAME_LEN - 1);
CheckTimestamp();
CheckInt16(2); // vgroups
......@@ -255,12 +104,15 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
CheckBinary("ms", 3); // precision
CheckInt8(0); // update
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_VGROUP, "show vgroups", 4, "1.d1");
CheckSchema(0, TSDB_DATA_TYPE_INT, 4, "vgId");
CheckSchema(1, TSDB_DATA_TYPE_INT, 4, "tables");
CheckSchema(2, TSDB_DATA_TYPE_SMALLINT, 2, "v1_dnode");
CheckSchema(3, TSDB_DATA_TYPE_BINARY, 9 + VARSTR_HEADER_SIZE, "v1_status");
SendThenCheckShowRetrieveMsg(2);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_VGROUP, "1.d1");
CHECK_META("show vgroups", 4);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "vgId");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_INT, 4, "tables");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_SMALLINT, 2, "v1_dnode");
CHECK_SCHEMA(3, TSDB_DATA_TYPE_BINARY, 9 + VARSTR_HEADER_SIZE, "v1_status");
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 2);
CheckInt32(1);
CheckInt32(2);
CheckInt32(0);
......@@ -271,7 +123,9 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
CheckBinary("master", 9);
{
SAlterDbMsg* pReq = (SAlterDbMsg*)rpcMallocCont(sizeof(SAlterDbMsg));
int32_t contLen = sizeof(SAlterDbMsg);
SAlterDbMsg* pReq = (SAlterDbMsg*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d1");
pReq->totalBlocks = htonl(12);
pReq->daysToKeep0 = htonl(300);
......@@ -282,19 +136,14 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
pReq->quorum = 2;
pReq->cacheLastRow = 1;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SAlterDbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_DB;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_ALTER_DB, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
SendThenCheckShowRetrieveMsg(1);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, "");
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
CheckBinary("d1", TSDB_DB_NAME_LEN - 1);
CheckTimestamp();
CheckInt16(2); // vgroups
......@@ -314,19 +163,14 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
CheckInt8(0); // update
// restart
stopServer(pServer);
pServer = NULL;
uInfo("start all server");
test.Restart();
const char* fqdn = "localhost";
const char* firstEp = "localhost:9040";
pServer = startServer("/tmp/dnode_test_db", fqdn, 9040, firstEp);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, "");
CHECK_META("show databases", 17);
uInfo("all server is running");
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
SendThenCheckShowRetrieveMsg(1);
CheckBinary("d1", TSDB_DB_NAME_LEN - 1);
CheckTimestamp();
CheckInt16(2); // vgroups
......@@ -346,27 +190,28 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
CheckInt8(0); // update
{
SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(sizeof(SDropDbMsg));
strcpy(pReq->db, "1.d1");
int32_t contLen = sizeof(SDropDbMsg);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropDbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_DB;
SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d1");
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_DB, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
SendThenCheckShowRetrieveMsg(0);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, "");
CHECK_META("show databases", 17);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 0);
}
TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) {
{
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg));
int32_t contLen = sizeof(SCreateDbMsg);
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d2");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
......@@ -388,33 +233,26 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) {
pReq->cacheLastRow = 0;
pReq->ignoreExist = 1;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DB;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DB, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
SendThenCheckShowRetrieveMsg(1);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, "");
CHECK_META("show databases", 17);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
CheckBinary("d2", TSDB_DB_NAME_LEN - 1);
{
SUseDbMsg* pReq = (SUseDbMsg*)rpcMallocCont(sizeof(SUseDbMsg));
int32_t contLen = sizeof(SUseDbMsg);
SUseDbMsg* pReq = (SUseDbMsg*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d2");
pReq->vgVersion = htonl(-1);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SUseDbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_USE_DB;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_USE_DB, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
......
add_executable(dnode_test_dnode "")
target_sources(dnode_test_dnode
PRIVATE
"dnode.cpp"
"../sut/deploy.cpp"
)
aux_source_directory(. DTEST_SRC)
add_executable(dnode_test_dnode ${DTEST_SRC})
target_link_libraries(
dnode_test_dnode
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dnode_test_dnode
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
PUBLIC sut
)
add_test(
......
......@@ -9,189 +9,62 @@
*
*/
#include "deploy.h"
#include "base.h"
class DndTestDnode : public ::testing::Test {
public:
static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SServer* pServer = createServer(path, fqdn, port, firstEp);
ASSERT(pServer);
return pServer;
}
void SetUp() override {}
void TearDown() override {}
public:
static void SetUpTestSuite() {
initLog("/tmp/tdlog");
test.Init("/tmp/dnode_test_dnode1", 9041);
const char* fqdn = "localhost";
const char* firstEp = "localhost:9041";
pServer1 = CreateServer("/tmp/dnode_test_dnode1", fqdn, 9041, firstEp);
pServer2 = CreateServer("/tmp/dnode_test_dnode2", fqdn, 9042, firstEp);
pServer3 = CreateServer("/tmp/dnode_test_dnode3", fqdn, 9043, firstEp);
pServer4 = CreateServer("/tmp/dnode_test_dnode4", fqdn, 9044, firstEp);
pServer5 = CreateServer("/tmp/dnode_test_dnode5", fqdn, 9045, firstEp);
pClient = createClient("root", "taosdata", fqdn, 9041);
server2.Start("/tmp/dnode_test_dnode2", fqdn, 9042, firstEp);
server3.Start("/tmp/dnode_test_dnode3", fqdn, 9043, firstEp);
server4.Start("/tmp/dnode_test_dnode4", fqdn, 9044, firstEp);
server5.Start("/tmp/dnode_test_dnode5", fqdn, 9045, firstEp);
taosMsleep(300);
}
static void TearDownTestSuite() {
stopServer(pServer1);
stopServer(pServer2);
stopServer(pServer3);
stopServer(pServer4);
stopServer(pServer5);
dropClient(pClient);
pServer1 = NULL;
pServer2 = NULL;
pServer3 = NULL;
pServer4 = NULL;
pServer5 = NULL;
pClient = NULL;
}
static SServer* pServer1;
static SServer* pServer2;
static SServer* pServer3;
static SServer* pServer4;
static SServer* pServer5;
static SClient* pClient;
public:
void SetUp() override {}
void TearDown() override {}
void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns) {
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = showType;
strcpy(pShow->db, "");
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
ASSERT_NE(pShowRsp, nullptr);
pShowRsp->showId = htonl(pShowRsp->showId);
pMeta = &pShowRsp->tableMeta;
pMeta->numOfTags = htonl(pMeta->numOfTags);
pMeta->numOfColumns = htonl(pMeta->numOfColumns);
pMeta->sversion = htonl(pMeta->sversion);
pMeta->tversion = htonl(pMeta->tversion);
pMeta->tuid = htobe64(pMeta->tuid);
pMeta->suid = htobe64(pMeta->suid);
showId = pShowRsp->showId;
EXPECT_NE(pShowRsp->showId, 0);
EXPECT_STREQ(pMeta->tbFname, showName);
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, columns);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->update, 0);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tuid, 0);
EXPECT_EQ(pMeta->suid, 0);
}
void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, type);
EXPECT_EQ(pSchema->bytes, bytes);
EXPECT_STREQ(pSchema->name, name);
}
void SendThenCheckShowRetrieveMsg(int32_t rows) {
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = htonl(showId);
pRetrieve->free = 0;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->useconds, 0);
// EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0);
pData = pRetrieveRsp->data;
pos = 0;
server2.Stop();
server3.Stop();
server4.Stop();
server5.Stop();
test.Cleanup();
}
void CheckInt16(int16_t val) {
int16_t data = *((int16_t*)(pData + pos));
pos += sizeof(int16_t);
EXPECT_EQ(data, val);
}
void CheckInt64(int64_t val) {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_EQ(data, val);
}
static Testbase test;
static TestServer server2;
static TestServer server3;
static TestServer server4;
static TestServer server5;
};
void CheckTimestamp() {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_GT(data, 0);
}
Testbase DndTestDnode::test;
TestServer DndTestDnode::server2;
TestServer DndTestDnode::server3;
TestServer DndTestDnode::server4;
TestServer DndTestDnode::server5;
void CheckBinary(const char* val, int32_t len) {
pos += sizeof(VarDataLenT);
char* data = (char*)(pData + pos);
pos += len;
EXPECT_STREQ(data, val);
}
TEST_F(DndTestDnode, 01_ShowDnode) {
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
int32_t showId;
STableMetaMsg* pMeta;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;
};
CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_SMALLINT, 2, "vnodes");
CHECK_SCHEMA(3, TSDB_DATA_TYPE_SMALLINT, 2, "max_vnodes");
CHECK_SCHEMA(4, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "status");
CHECK_SCHEMA(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline_reason");
SServer* DndTestDnode::pServer1;
SServer* DndTestDnode::pServer2;
SServer* DndTestDnode::pServer3;
SServer* DndTestDnode::pServer4;
SServer* DndTestDnode::pServer5;
SClient* DndTestDnode::pClient;
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
TEST_F(DndTestDnode, 01_ShowDnode) {
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
CheckSchema(0, TSDB_DATA_TYPE_SMALLINT, 2, "id");
CheckSchema(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint");
CheckSchema(2, TSDB_DATA_TYPE_SMALLINT, 2, "vnodes");
CheckSchema(3, TSDB_DATA_TYPE_SMALLINT, 2, "max_vnodes");
CheckSchema(4, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "status");
CheckSchema(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CheckSchema(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline_reason");
SendThenCheckShowRetrieveMsg(1);
CheckInt16(1);
CheckBinary("localhost:9041", TSDB_EP_LEN);
CheckInt16(0);
......@@ -202,40 +75,36 @@ TEST_F(DndTestDnode, 01_ShowDnode) {
}
TEST_F(DndTestDnode, 02_ConfigDnode) {
SCfgDnodeMsg* pReq = (SCfgDnodeMsg*)rpcMallocCont(sizeof(SCfgDnodeMsg));
int32_t contLen = sizeof(SCfgDnodeMsg);
SCfgDnodeMsg* pReq = (SCfgDnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
strcpy(pReq->config, "ddebugflag 131");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCfgDnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CONFIG_DNODE;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CONFIG_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
{
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg));
strcpy(pReq->ep, "localhost:9042");
int32_t contLen = sizeof(SCreateDnodeMsg);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DNODE;
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9042");
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
taosMsleep(1300);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
SendThenCheckShowRetrieveMsg(2);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 2);
CheckInt16(1);
CheckInt16(2);
CheckBinary("localhost:9041", TSDB_EP_LEN);
......@@ -252,22 +121,21 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
CheckBinary("", 24);
{
SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(sizeof(SDropDnodeMsg));
pReq->dnodeId = htonl(2);
int32_t contLen = sizeof(SDropDnodeMsg);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropDnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_DNODE;
SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
SendThenCheckShowRetrieveMsg(1);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
CheckInt16(1);
CheckBinary("localhost:9041", TSDB_EP_LEN);
CheckInt16(0);
......@@ -277,53 +145,44 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
CheckBinary("", 24);
{
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg));
strcpy(pReq->ep, "localhost:9043");
int32_t contLen = sizeof(SCreateDnodeMsg);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DNODE;
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9043");
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
{
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg));
strcpy(pReq->ep, "localhost:9044");
int32_t contLen = sizeof(SCreateDnodeMsg);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DNODE;
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9044");
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
{
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg));
strcpy(pReq->ep, "localhost:9045");
int32_t contLen = sizeof(SCreateDnodeMsg);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DNODE;
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
strcpy(pReq->ep, "localhost:9045");
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
taosMsleep(1300);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
SendThenCheckShowRetrieveMsg(4);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 4);
CheckInt16(1);
CheckInt16(3);
CheckInt16(4);
......@@ -355,31 +214,18 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) {
// restart
uInfo("stop all server");
stopServer(pServer1);
stopServer(pServer2);
stopServer(pServer3);
stopServer(pServer4);
stopServer(pServer5);
pServer1 = NULL;
pServer2 = NULL;
pServer3 = NULL;
pServer4 = NULL;
pServer5 = NULL;
uInfo("start all server");
const char* fqdn = "localhost";
const char* firstEp = "localhost:9041";
pServer1 = startServer("/tmp/dnode_test_dnode1", fqdn, 9041, firstEp);
pServer3 = startServer("/tmp/dnode_test_dnode3", fqdn, 9043, firstEp);
pServer4 = startServer("/tmp/dnode_test_dnode4", fqdn, 9044, firstEp);
pServer5 = startServer("/tmp/dnode_test_dnode5", fqdn, 9045, firstEp);
uInfo("all server is running");
test.Restart();
server2.Restart();
server3.Restart();
server4.Restart();
server5.Restart();
taosMsleep(1300);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
SendThenCheckShowRetrieveMsg(4);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "");
CHECK_META("show dnodes", 7);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 4);
CheckInt16(1);
CheckInt16(3);
CheckInt16(4);
......
add_executable(dndTestProfile "")
target_sources(dndTestProfile
PRIVATE
"profile.cpp"
"../sut/deploy.cpp"
)
aux_source_directory(. PROFILE_SRC)
add_executable(dnode_test_profile ${PROFILE_SRC})
target_link_libraries(
dndTestProfile
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dndTestProfile
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
dnode_test_profile
PUBLIC sut
)
add_test(
NAME dndTestProfile
COMMAND dndTestProfile
NAME dnode_test_profile
COMMAND dnode_test_profile
)
add_executable(dnode_test_show "")
target_sources(dnode_test_show
PRIVATE
"show.cpp"
"../sut/deploy.cpp"
)
aux_source_directory(. SHOW_SRC)
add_executable(dnode_test_show ${SHOW_SRC})
target_link_libraries(
dnode_test_show
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dnode_test_show
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
PUBLIC sut
)
add_test(
......
......@@ -9,226 +9,79 @@
*
*/
#include "deploy.h"
#include "base.h"
class DndTestShow : public ::testing::Test {
protected:
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_show", 9091); }
static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test;
public:
void SetUp() override {}
void TearDown() override {}
static void SetUpTestSuite() {
const char* user = "root";
const char* pass = "taosdata";
const char* path = "/tmp/dndTestShow";
const char* fqdn = "localhost";
uint16_t port = 9523;
pServer = createServer(path, fqdn, port);
ASSERT(pServer);
pClient = createClient(user, pass, fqdn, port);
}
static void TearDownTestSuite() {
stopServer(pServer);
dropClient(pClient);
}
static SServer* pServer;
static SClient* pClient;
static int32_t connId;
};
SServer* DndTestShow::pServer;
SClient* DndTestShow::pClient;
int32_t DndTestShow::connId;
Testbase DndTestShow::test;
TEST_F(DndTestShow, SShowMsg_01) {
ASSERT_NE(pClient, nullptr);
TEST_F(DndTestShow, 01_ShowMsg_InvalidMsgMax) {
int32_t contLen = sizeof(SShowMsg);
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(sizeof(SConnectMsg));
pReq->pid = htonl(1234);
strcpy(pReq->app, "dndTestShow");
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(contLen);
pReq->type = TSDB_MGMT_TABLE_MAX;
strcpy(pReq->db, "");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SConnectMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CONNECT;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_SHOW, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->connId = htonl(pRsp->connId);
EXPECT_EQ(pRsp->connId, 1);
connId = pRsp->connId;
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_MSG_TYPE);
}
TEST_F(DndTestShow, SShowMsg_02) {
ASSERT_NE(pClient, nullptr);
TEST_F(DndTestShow, 02_ShowMsg_InvalidMsgStart) {
int32_t contLen = sizeof(SShowMsg);
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pReq->type = TSDB_MGMT_TABLE_MAX;
pReq->type = TSDB_MGMT_TABLE_START;
strcpy(pReq->db, "");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SShowMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_SHOW, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_MSG_TYPE);
}
TEST_F(DndTestShow, SShowMsg_03) {
ASSERT_NE(pClient, nullptr);
TEST_F(DndTestShow, 02_ShowMsg_Conn) {
int32_t contLen = sizeof(SConnectMsg);
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pReq->type = TSDB_MGMT_TABLE_START;
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(contLen);
pReq->pid = htonl(1234);
strcpy(pReq->app, "dnode_test_show");
strcpy(pReq->db, "");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SShowMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CONNECT, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_MSG_TYPE);
}
TEST_F(DndTestShow, SShowMsg_04) {
ASSERT_NE(pClient, nullptr);
int32_t showId = 0;
{
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pReq->type = TSDB_MGMT_TABLE_CONNS;
strcpy(pReq->db, "");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SShowMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
SShowRsp* pRsp = (SShowRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->showId = htonl(pRsp->showId);
STableMetaMsg* pMeta = &pRsp->tableMeta;
pMeta->contLen = htonl(pMeta->contLen);
pMeta->numOfColumns = htons(pMeta->numOfColumns);
pMeta->sversion = htons(pMeta->sversion);
pMeta->tversion = htons(pMeta->tversion);
pMeta->tid = htonl(pMeta->tid);
pMeta->uid = htobe64(pMeta->uid);
pMeta->suid = htobe64(pMeta->suid);
showId = pRsp->showId;
EXPECT_NE(pRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0);
EXPECT_STREQ(pMeta->tbFname, "");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->numOfColumns, 7);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tid, 0);
EXPECT_EQ(pMeta->uid, 0);
EXPECT_STREQ(pMeta->sTableName, "");
EXPECT_EQ(pMeta->suid, 0);
SSchema* pSchema = NULL;
pSchema = &pMeta->pSchema[0];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "connId");
pSchema = &pMeta->pSchema[1];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "user");
pSchema = &pMeta->pSchema[2];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "program");
pSchema = &pMeta->pSchema[3];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "pid");
pSchema = &pMeta->pSchema[4];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "ip:port");
pSchema = &pMeta->pSchema[5];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "login_time");
pSchema = &pMeta->pSchema[6];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "last_access");
}
{
SRetrieveTableMsg* pReq = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pReq->showId = htonl(showId);
pReq->free = 0;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SRetrieveTableMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->offset = htobe64(pRsp->offset);
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen);
EXPECT_EQ(pRsp->numOfRows, 1);
EXPECT_EQ(pRsp->offset, 0);
EXPECT_EQ(pRsp->useconds, 0);
EXPECT_EQ(pRsp->completed, 1);
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRsp->compressed, 0);
EXPECT_EQ(pRsp->reserved, 0);
EXPECT_EQ(pRsp->compLen, 0);
}
ASSERT_EQ(pMsg->code, 0);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_CONNS, "");
STableMetaMsg* pMeta = test.GetShowMeta();
EXPECT_STREQ(pMeta->tbFname, "show connections");
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, 7);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->update, 0);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tuid, 0);
EXPECT_EQ(pMeta->suid, 0);
test.SendShowRetrieveMsg();
SRetrieveTableRsp* pRetrieveRsp = test.GetRetrieveRsp();
EXPECT_EQ(pRetrieveRsp->numOfRows, 1);
EXPECT_EQ(pRetrieveRsp->useconds, 0);
EXPECT_EQ(pRetrieveRsp->completed, 1);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0);
}
add_executable(dnode_test_stb "")
target_sources(dnode_test_stb
PRIVATE
"stb.cpp"
"../sut/deploy.cpp"
)
aux_source_directory(. STB_SRC)
add_executable(dnode_test_stb ${STB_SRC})
target_link_libraries(
dnode_test_stb
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dnode_test_stb
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
PUBLIC sut
)
add_test(
......
......@@ -9,176 +9,27 @@
*
*/
#include "deploy.h"
#include "base.h"
class DndTestStb : public ::testing::Test {
protected:
static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SServer* pServer = createServer(path, fqdn, port, firstEp);
ASSERT(pServer);
return pServer;
}
static void SetUpTestSuite() {
initLog("/tmp/tdlog");
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_stb", 9101); }
static void TearDownTestSuite() { test.Cleanup(); }
const char* fqdn = "localhost";
const char* firstEp = "localhost:9101";
pServer = CreateServer("/tmp/dnode_test_stb", fqdn, 9101, firstEp);
pClient = createClient("root", "taosdata", fqdn, 9101);
taosMsleep(1100);
}
static void TearDownTestSuite() {
stopServer(pServer);
dropClient(pClient);
pServer = NULL;
pClient = NULL;
}
static SServer* pServer;
static SClient* pClient;
static int32_t connId;
static Testbase test;
public:
void SetUp() override {}
void TearDown() override {}
void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns, const char* db) {
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = showType;
if (db != NULL) {
strcpy(pShow->db, db);
}
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
ASSERT_NE(pShowRsp, nullptr);
pShowRsp->showId = htonl(pShowRsp->showId);
pMeta = &pShowRsp->tableMeta;
pMeta->numOfTags = htonl(pMeta->numOfTags);
pMeta->numOfColumns = htonl(pMeta->numOfColumns);
pMeta->sversion = htonl(pMeta->sversion);
pMeta->tversion = htonl(pMeta->tversion);
pMeta->tuid = htobe64(pMeta->tuid);
pMeta->suid = htobe64(pMeta->suid);
showId = pShowRsp->showId;
EXPECT_NE(pShowRsp->showId, 0);
EXPECT_STREQ(pMeta->tbFname, showName);
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, columns);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->update, 0);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tuid, 0);
EXPECT_EQ(pMeta->suid, 0);
}
void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, type);
EXPECT_EQ(pSchema->bytes, bytes);
EXPECT_STREQ(pSchema->name, name);
}
void SendThenCheckShowRetrieveMsg(int32_t rows) {
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = htonl(showId);
pRetrieve->free = 0;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->useconds, 0);
// EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0);
pData = pRetrieveRsp->data;
pos = 0;
}
void CheckInt8(int8_t val) {
int8_t data = *((int8_t*)(pData + pos));
pos += sizeof(int8_t);
EXPECT_EQ(data, val);
}
void CheckInt16(int16_t val) {
int16_t data = *((int16_t*)(pData + pos));
pos += sizeof(int16_t);
EXPECT_EQ(data, val);
}
void CheckInt32(int32_t val) {
int32_t data = *((int32_t*)(pData + pos));
pos += sizeof(int32_t);
EXPECT_EQ(data, val);
}
void CheckInt64(int64_t val) {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_EQ(data, val);
}
void CheckTimestamp() {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_GT(data, 0);
}
void CheckBinary(const char* val, int32_t len) {
pos += sizeof(VarDataLenT);
char* data = (char*)(pData + pos);
pos += len;
EXPECT_STREQ(data, val);
}
int32_t showId;
STableMetaMsg* pMeta;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;
};
SServer* DndTestStb::pServer;
SClient* DndTestStb::pClient;
int32_t DndTestStb::connId;
Testbase DndTestStb::test;
TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
{
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg));
int32_t contLen = sizeof(SCreateDbMsg);
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(contLen);
strcpy(pReq->db, "1.d1");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
......@@ -200,13 +51,7 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
pReq->cacheLastRow = 0;
pReq->ignoreExist = 1;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DB;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DB, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
......@@ -214,9 +59,9 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
{
int32_t cols = 2;
int32_t tags = 3;
int32_t size = (tags + cols) * sizeof(SSchema) + sizeof(SCreateStbMsg);
int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SCreateStbMsg);
SCreateStbMsg* pReq = (SCreateStbMsg*)rpcMallocCont(size);
SCreateStbMsg* pReq = (SCreateStbMsg*)rpcMallocCont(contLen);
strcpy(pReq->name, "1.d1.stb");
pReq->numOfTags = htonl(tags);
pReq->numOfColumns = htonl(cols);
......@@ -261,24 +106,21 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
strcpy(pSchema->name, "tag3");
}
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = size;
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_STB;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_STB, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1");
CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name");
CheckSchema(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CheckSchema(2, TSDB_DATA_TYPE_INT, 4, "columns");
CheckSchema(3, TSDB_DATA_TYPE_INT, 4, "tags");
test.SendShowMetaMsg(TSDB_MGMT_TABLE_STB, "1.d1");
CHECK_META("show stables", 4);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_INT, 4, "columns");
CHECK_SCHEMA(3, TSDB_DATA_TYPE_INT, 4, "tags");
SendThenCheckShowRetrieveMsg(1);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
CheckBinary("stb", TSDB_TABLE_NAME_LEN);
CheckTimestamp();
CheckInt32(2);
......@@ -286,16 +128,12 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
// ----- meta ------
{
STableInfoMsg* pReq = (STableInfoMsg*)rpcMallocCont(sizeof(STableInfoMsg));
strcpy(pReq->tableFname, "1.d1.stb");
int32_t contLen = sizeof(STableInfoMsg);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(STableInfoMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_TABLE_META;
STableInfoMsg* pReq = (STableInfoMsg*)rpcMallocCont(contLen);
strcpy(pReq->tableFname, "1.d1.stb");
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_TABLE_META, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
......@@ -336,39 +174,31 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
}
// restart
stopServer(pServer);
pServer = NULL;
test.Restart();
uInfo("start all server");
test.SendShowMetaMsg(TSDB_MGMT_TABLE_STB, "1.d1");
CHECK_META("show stables", 4);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
const char* fqdn = "localhost";
const char* firstEp = "localhost:9101";
pServer = startServer("/tmp/dnode_test_stb", fqdn, 9101, firstEp);
uInfo("all server is running");
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1");
SendThenCheckShowRetrieveMsg(1);
CheckBinary("stb", TSDB_TABLE_NAME_LEN);
CheckTimestamp();
CheckInt32(2);
CheckInt32(3);
{
SDropStbMsg* pReq = (SDropStbMsg*)rpcMallocCont(sizeof(SDropStbMsg));
strcpy(pReq->name, "1.d1.stb");
int32_t contLen = sizeof(SDropStbMsg);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropStbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_STB;
SDropStbMsg* pReq = (SDropStbMsg*)rpcMallocCont(contLen);
strcpy(pReq->name, "1.d1.stb");
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_STB, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1");
SendThenCheckShowRetrieveMsg(0);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_STB, "1.d1");
CHECK_META("show stables", 4);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 0);
}
aux_source_directory(src SUT_SRC)
add_library(sut STATIC ${SUT_SRC})
target_link_libraries(
sut
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(
sut
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "deploy.h"
void initLog(const char* path) {
dDebugFlag = 207;
vDebugFlag = 0;
mDebugFlag = 207;
cDebugFlag = 0;
jniDebugFlag = 0;
tmrDebugFlag = 0;
uDebugFlag = 143;
rpcDebugFlag = 0;
odbcDebugFlag = 0;
qDebugFlag = 0;
wDebugFlag = 0;
sDebugFlag = 0;
tsdbDebugFlag = 0;
cqDebugFlag = 0;
tscEmbeddedInUtil = 1;
taosRemoveDir(path);
taosMkDir(path);
char temp[PATH_MAX];
snprintf(temp, PATH_MAX, "%s/taosdlog", path);
if (taosInitLog(temp, tsNumOfLogLines, 1) != 0) {
printf("failed to init log file\n");
}
}
void* runServer(void* param) {
SServer* pServer = (SServer*)param;
while (1) {
taosMsleep(100);
pthread_testcancel();
}
}
void initOption(SDnodeOpt* pOption, const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
pOption->sver = 1;
pOption->numOfCores = 1;
pOption->numOfSupportMnodes = 1;
pOption->numOfSupportVnodes = 1;
pOption->numOfSupportQnodes = 1;
pOption->statusInterval = 1;
pOption->numOfThreadsPerCore = 1;
pOption->ratioOfQueryCores = 1;
pOption->maxShellConns = 1000;
pOption->shellActivityTimer = 30;
pOption->serverPort = port;
strcpy(pOption->dataDir, path);
snprintf(pOption->localEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
snprintf(pOption->localFqdn, TSDB_FQDN_LEN, "%s", fqdn);
snprintf(pOption->firstEp, TSDB_EP_LEN, "%s", firstEp);
}
SServer* startServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
taosMkDir(path);
SDnodeOpt option = {0};
initOption(&option, path, fqdn, port, firstEp);
SDnode* pDnode = dndInit(&option);
ASSERT(pDnode);
SServer* pServer = (SServer*)calloc(1, sizeof(SServer));
ASSERT(pServer);
pServer->pDnode = pDnode;
pServer->threadId = taosCreateThread(runServer, pServer);
ASSERT(pServer->threadId);
return pServer;
}
SServer* createServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
taosRemoveDir(path);
return startServer(path, fqdn, port, firstEp);
}
void stopServer(SServer* pServer) {
if (pServer == NULL) return;
if (pServer->threadId != NULL) {
taosDestoryThread(pServer->threadId);
}
if (pServer->pDnode != NULL) {
dndCleanup(pServer->pDnode);
pServer->pDnode = NULL;
}
}
void processClientRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SClient* pClient = (SClient*)parent;
pClient->pRsp = pMsg;
uInfo("response:%s from dnode, pCont:%p contLen:%d code:0x%X", taosMsg[pMsg->msgType], pMsg->pCont, pMsg->contLen,
pMsg->code);
tsem_post(&pClient->sem);
}
SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port) {
SClient* pClient = (SClient*)calloc(1, sizeof(SClient));
ASSERT(pClient);
char secretEncrypt[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass((uint8_t*)pass, strlen(pass), secretEncrypt);
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = (char*)"DND-C";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processClientRsp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 30 * 1000;
rpcInit.user = (char*)user;
rpcInit.ckey = (char*)"key";
rpcInit.parent = pClient;
rpcInit.secret = (char*)secretEncrypt;
rpcInit.parent = pClient;
// rpcInit.spi = 1;
pClient->clientRpc = rpcOpen(&rpcInit);
ASSERT(pClient->clientRpc);
tsem_init(&pClient->sem, 0, 0);
strcpy(pClient->fqdn, fqdn);
pClient->port = port;
return pClient;
}
void dropClient(SClient* pClient) {
tsem_destroy(&pClient->sem);
rpcClose(pClient->clientRpc);
}
void sendMsg(SClient* pClient, SRpcMsg* pMsg) {
SEpSet epSet = {0};
epSet.inUse = 0;
epSet.numOfEps = 1;
epSet.port[0] = pClient->port;
memcpy(epSet.fqdn[0], pClient->fqdn, TSDB_FQDN_LEN);
rpcSendRequest(pClient->clientRpc, &epSet, pMsg, NULL);
tsem_wait(&pClient->sem);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TEST_BASE_H_
#define _TD_TEST_BASE_H_
#include <gtest/gtest.h>
#include "os.h"
#include "dnode.h"
#include "taosmsg.h"
#include "tconfig.h"
#include "tdataformat.h"
#include "tglobal.h"
#include "tnote.h"
#include "trpc.h"
#include "tthread.h"
#include "ulog.h"
#include "client.h"
#include "server.h"
class Testbase {
public:
void Init(const char* path, int16_t port);
void Cleanup();
void Restart();
SRpcMsg* SendMsg(int8_t msgType, void* pCont, int32_t contLen);
private:
void InitLog(const char* path);
private:
TestServer server;
TestClient client;
int32_t connId;
public:
void SendShowMetaMsg(int8_t showType, const char* db);
void SendShowRetrieveMsg();
STableMetaMsg* GetShowMeta();
SRetrieveTableRsp* GetRetrieveRsp();
int32_t GetMetaNum();
const char* GetMetaTbName();
int32_t GetMetaColId(int32_t index);
int8_t GetMetaType(int32_t index);
int32_t GetMetaBytes(int32_t index);
const char* GetMetaName(int32_t index);
const char* GetShowName();
int32_t GetShowRows();
int8_t GetShowInt8();
int16_t GetShowInt16();
int32_t GetShowInt32();
int64_t GetShowInt64();
int64_t GetShowTimestamp();
const char* GetShowBinary(int32_t len);
private:
int32_t showId;
STableMetaMsg* pMeta;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;
};
#define CHECK_META(tbName, numOfColumns) \
{ \
EXPECT_EQ(test.GetMetaNum(), numOfColumns); \
EXPECT_STREQ(test.GetMetaTbName(), tbName); \
}
#define CHECK_SCHEMA(colId, type, bytes, colName) \
{ \
EXPECT_EQ(test.GetMetaType(colId), type); \
EXPECT_EQ(test.GetMetaBytes(colId), bytes); \
EXPECT_STREQ(test.GetMetaName(colId), colName); \
}
#define CheckBinary(val, len) \
{ EXPECT_STREQ(test.GetShowBinary(len), val); }
#define CheckInt8(val) \
{ EXPECT_EQ(test.GetShowInt8(), val); }
#define CheckInt16(val) \
{ EXPECT_EQ(test.GetShowInt16(), val); }
#define CheckInt32(val) \
{ EXPECT_EQ(test.GetShowInt32(), val); }
#define CheckInt64(val) \
{ EXPECT_EQ(test.GetShowInt64(), val); }
#define CheckTimestamp() \
{ EXPECT_GT(test.GetShowTimestamp(), 0); }
#define IgnoreBinary(len) \
{ test.GetShowBinary(len); }
#define IgnoreInt8() \
{ test.GetShowInt8(); }
#define IgnoreInt16() \
{ test.GetShowInt16(); }
#define IgnoreInt32() \
{ test.GetShowInt32(); }
#define IgnoreInt64() \
{ test.GetShowInt64(); }
#define IgnoreTimestamp() \
{ test.GetShowTimestamp(); }
#endif /* _TD_TEST_BASE_H_ */
\ No newline at end of file
......@@ -13,37 +13,24 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include "os.h"
#ifndef _TD_TEST_CLIENT_H_
#define _TD_TEST_CLIENT_H_
#include "dnode.h"
#include "taosmsg.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tnote.h"
#include "trpc.h"
#include "tthread.h"
#include "ulog.h"
#include "tdataformat.h"
class TestClient {
public:
bool Init(const char* user, const char* pass, const char* fqdn, uint16_t port);
void Cleanup();
typedef struct {
SDnode* pDnode;
pthread_t* threadId;
} SServer;
SRpcMsg* SendMsg(SRpcMsg* pMsg);
void SetRpcRsp(SRpcMsg* pRsp);
tsem_t* GetSem();
typedef struct {
private:
char fqdn[TSDB_FQDN_LEN];
uint16_t port;
void* clientRpc;
SRpcMsg* pRsp;
tsem_t sem;
} SClient;
void initLog(const char* path);
SServer* createServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
SServer* startServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
void stopServer(SServer* pServer);
SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port);
void dropClient(SClient* pClient);
void sendMsg(SClient* pClient, SRpcMsg* pMsg);
};
#endif /* _TD_TEST_CLIENT_H_ */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TEST_SERVER_H_
#define _TD_TEST_SERVER_H_
class TestServer {
public:
bool Start(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
void Stop();
void Restart();
private:
SDnodeOpt BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
bool DoStart();
private:
SDnode* pDnode;
pthread_t* threadId;
char path[PATH_MAX];
char fqdn[TSDB_FQDN_LEN];
char firstEp[TSDB_EP_LEN];
uint16_t port;
};
#endif /* _TD_TEST_SERVER_H_ */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "base.h"
void Testbase::InitLog(const char* path) {
dDebugFlag = 207;
vDebugFlag = 0;
mDebugFlag = 207;
cDebugFlag = 0;
jniDebugFlag = 0;
tmrDebugFlag = 0;
uDebugFlag = 143;
rpcDebugFlag = 0;
odbcDebugFlag = 0;
qDebugFlag = 0;
wDebugFlag = 0;
sDebugFlag = 0;
tsdbDebugFlag = 0;
cqDebugFlag = 0;
tscEmbeddedInUtil = 1;
taosRemoveDir(path);
taosMkDir(path);
char temp[PATH_MAX];
snprintf(temp, PATH_MAX, "%s/taosdlog", path);
if (taosInitLog(temp, tsNumOfLogLines, 1) != 0) {
printf("failed to init log file\n");
}
}
void Testbase::Init(const char* path, int16_t port) {
char fqdn[] = "localhost";
char firstEp[TSDB_EP_LEN] = {0};
snprintf(firstEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
InitLog("/tmp/tdlog");
server.Start(path, fqdn, port, firstEp);
client.Init("root", "taosdata", fqdn, port);
taosMsleep(1100);
}
void Testbase::Cleanup() {
server.Stop();
client.Cleanup();
}
void Testbase::Restart() { server.Restart(); }
SRpcMsg* Testbase::SendMsg(int8_t msgType, void* pCont, int32_t contLen) {
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pCont;
rpcMsg.contLen = contLen;
rpcMsg.msgType = msgType;
return client.SendMsg(&rpcMsg);
}
void Testbase::SendShowMetaMsg(int8_t showType, const char* db) {
int32_t contLen = sizeof(SShowMsg);
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(contLen);
pShow->type = showType;
strcpy(pShow->db, db);
SRpcMsg* pMsg = SendMsg(TSDB_MSG_TYPE_SHOW, pShow, contLen);
SShowRsp* pShowRsp = (SShowRsp*)pMsg->pCont;
ASSERT(pShowRsp != nullptr);
pShowRsp->showId = htonl(pShowRsp->showId);
pMeta = &pShowRsp->tableMeta;
pMeta->numOfTags = htonl(pMeta->numOfTags);
pMeta->numOfColumns = htonl(pMeta->numOfColumns);
pMeta->sversion = htonl(pMeta->sversion);
pMeta->tversion = htonl(pMeta->tversion);
pMeta->tuid = htobe64(pMeta->tuid);
pMeta->suid = htobe64(pMeta->suid);
showId = pShowRsp->showId;
}
int32_t Testbase::GetMetaColId(int32_t index) {
SSchema* pSchema = &pMeta->pSchema[index];
pSchema->colId = htonl(pSchema->colId);
return pSchema->colId;
}
int8_t Testbase::GetMetaType(int32_t index) {
SSchema* pSchema = &pMeta->pSchema[index];
return pSchema->type;
}
int32_t Testbase::GetMetaBytes(int32_t index) {
SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htonl(pSchema->bytes);
return pSchema->bytes;
}
const char* Testbase::GetMetaName(int32_t index) {
SSchema* pSchema = &pMeta->pSchema[index];
return pSchema->name;
}
int32_t Testbase::GetMetaNum() { return pMeta->numOfColumns; }
const char* Testbase::GetMetaTbName() { return pMeta->tbFname; }
void Testbase::SendShowRetrieveMsg() {
int32_t contLen = sizeof(SRetrieveTableMsg);
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(contLen);
pRetrieve->showId = htonl(showId);
pRetrieve->free = 0;
SRpcMsg* pMsg = SendMsg(TSDB_MSG_TYPE_SHOW_RETRIEVE, pRetrieve, contLen);
pRetrieveRsp = (SRetrieveTableRsp*)pMsg->pCont;
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
pData = pRetrieveRsp->data;
pos = 0;
}
const char* Testbase::GetShowName() { return pMeta->tbFname; }
int8_t Testbase::GetShowInt8() {
int8_t data = *((int8_t*)(pData + pos));
pos += sizeof(int8_t);
return data;
}
int16_t Testbase::GetShowInt16() {
int16_t data = *((int16_t*)(pData + pos));
pos += sizeof(int16_t);
return data;
}
int32_t Testbase::GetShowInt32() {
int32_t data = *((int32_t*)(pData + pos));
pos += sizeof(int32_t);
return data;
}
int64_t Testbase::GetShowInt64() {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
return data;
}
int64_t Testbase::GetShowTimestamp() {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
return data;
}
const char* Testbase::GetShowBinary(int32_t len) {
pos += sizeof(VarDataLenT);
char* data = (char*)(pData + pos);
pos += len;
return data;
}
int32_t Testbase::GetShowRows() { return pRetrieveRsp->numOfRows; }
STableMetaMsg* Testbase::GetShowMeta() { return pMeta; }
SRetrieveTableRsp* Testbase::GetRetrieveRsp() { return pRetrieveRsp; }
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "base.h"
static void processClientRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
TestClient* client = (TestClient*)parent;
client->SetRpcRsp(pMsg);
uInfo("response:%s from dnode, code:0x%x", taosMsg[pMsg->msgType], pMsg->code);
tsem_post(client->GetSem());
}
void TestClient::SetRpcRsp(SRpcMsg* pRsp) { this->pRsp = pRsp; };
tsem_t* TestClient::GetSem() { return &sem; }
bool TestClient::Init(const char* user, const char* pass, const char* fqdn, uint16_t port) {
char secretEncrypt[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass((uint8_t*)pass, strlen(pass), secretEncrypt);
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = (char*)"DND-C";
rpcInit.numOfThreads = 1;
rpcInit.cfp = processClientRsp;
rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 30 * 1000;
rpcInit.user = (char*)user;
rpcInit.ckey = (char*)"key";
rpcInit.parent = this;
rpcInit.secret = (char*)secretEncrypt;
// rpcInit.spi = 1;
clientRpc = rpcOpen(&rpcInit);
ASSERT(clientRpc);
tsem_init(&sem, 0, 0);
strcpy(this->fqdn, fqdn);
this->port = port;
return true;
}
void TestClient::Cleanup() {
tsem_destroy(&sem);
rpcClose(clientRpc);
}
SRpcMsg* TestClient::SendMsg(SRpcMsg* pMsg) {
SEpSet epSet = {0};
epSet.inUse = 0;
epSet.numOfEps = 1;
epSet.port[0] = port;
memcpy(epSet.fqdn[0], fqdn, TSDB_FQDN_LEN);
rpcSendRequest(clientRpc, &epSet, pMsg, NULL);
tsem_wait(&sem);
return pRsp;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "base.h"
void* serverLoop(void* param) {
while (1) {
taosMsleep(100);
pthread_testcancel();
}
}
SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SDnodeOpt option = {0};
option.sver = 1;
option.numOfCores = 1;
option.numOfSupportMnodes = 1;
option.numOfSupportVnodes = 1;
option.numOfSupportQnodes = 1;
option.statusInterval = 1;
option.numOfThreadsPerCore = 1;
option.ratioOfQueryCores = 1;
option.maxShellConns = 1000;
option.shellActivityTimer = 30;
option.serverPort = port;
strcpy(option.dataDir, path);
snprintf(option.localEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
snprintf(option.localFqdn, TSDB_FQDN_LEN, "%s", fqdn);
snprintf(option.firstEp, TSDB_EP_LEN, "%s", firstEp);
return option;
}
bool TestServer::DoStart() {
SDnodeOpt option = BuildOption(path, fqdn, port, firstEp);
taosMkDir(path);
pDnode = dndInit(&option);
if (pDnode != NULL) {
return false;
}
threadId = taosCreateThread(serverLoop, NULL);
if (threadId != NULL) {
return false;
}
return true;
}
void TestServer::Restart() {
uInfo("start all server");
Stop();
DoStart();
uInfo("all server is running");
}
bool TestServer::Start(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
strcpy(this->path, path);
strcpy(this->fqdn, fqdn);
this->port = port;
strcpy(this->firstEp, firstEp);
taosRemoveDir(path);
return DoStart();
}
void TestServer::Stop() {
if (threadId != NULL) {
taosDestoryThread(threadId);
threadId = NULL;
}
if (pDnode != NULL) {
dndCleanup(pDnode);
pDnode = NULL;
}
}
add_executable(dnode_test_user "")
target_sources(dnode_test_user
PRIVATE
"user.cpp"
"../sut/deploy.cpp"
)
aux_source_directory(. USER_SRC)
add_executable(dnode_test_user ${USER_SRC})
target_link_libraries(
dnode_test_user
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dnode_test_user
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
PUBLIC sut
)
add_test(
......
......@@ -9,168 +9,34 @@
*
*/
#include "deploy.h"
#include "base.h"
class DndTestUser : public ::testing::Test {
protected:
static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SServer* pServer = createServer(path, fqdn, port, firstEp);
ASSERT(pServer);
return pServer;
}
static void SetUpTestSuite() {
initLog("/tmp/tdlog");
const char* fqdn = "localhost";
const char* firstEp = "localhost:9140";
pServer = CreateServer("/tmp/dnode_test_user", fqdn, 9140, firstEp);
pClient = createClient("root", "taosdata", fqdn, 9140);
taosMsleep(300);
}
static void TearDownTestSuite() {
stopServer(pServer);
dropClient(pClient);
pServer = NULL;
pClient = NULL;
}
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_user", 9140); }
static void TearDownTestSuite() { test.Cleanup(); }
static SServer* pServer;
static SClient* pClient;
static int32_t connId;
static Testbase test;
public:
void SetUp() override {}
void TearDown() override {}
void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns) {
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = showType;
strcpy(pShow->db, "");
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
ASSERT_NE(pShowRsp, nullptr);
pShowRsp->showId = htonl(pShowRsp->showId);
pMeta = &pShowRsp->tableMeta;
pMeta->numOfTags = htonl(pMeta->numOfTags);
pMeta->numOfColumns = htonl(pMeta->numOfColumns);
pMeta->sversion = htonl(pMeta->sversion);
pMeta->tversion = htonl(pMeta->tversion);
pMeta->tuid = htobe64(pMeta->tuid);
pMeta->suid = htobe64(pMeta->suid);
showId = pShowRsp->showId;
EXPECT_NE(pShowRsp->showId, 0);
EXPECT_STREQ(pMeta->tbFname, showName);
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, columns);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->update, 0);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tuid, 0);
EXPECT_EQ(pMeta->suid, 0);
}
void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, type);
EXPECT_EQ(pSchema->bytes, bytes);
EXPECT_STREQ(pSchema->name, name);
}
void SendThenCheckShowRetrieveMsg(int32_t rows) {
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = htonl(showId);
pRetrieve->free = 0;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->useconds, 0);
// EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0);
pData = pRetrieveRsp->data;
pos = 0;
}
void CheckInt16(int16_t val) {
int16_t data = *((int16_t*)(pData + pos));
pos += sizeof(int16_t);
EXPECT_EQ(data, val);
}
void CheckInt64(int64_t val) {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_EQ(data, val);
}
void CheckTimestamp() {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_GT(data, 0);
}
void CheckBinary(const char* val, int32_t len) {
pos += sizeof(VarDataLenT);
char* data = (char*)(pData + pos);
pos += len;
EXPECT_STREQ(data, val);
}
int32_t showId;
STableMetaMsg* pMeta;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;
};
SServer* DndTestUser::pServer;
SClient* DndTestUser::pClient;
int32_t DndTestUser::connId;
Testbase DndTestUser::test;
TEST_F(DndTestUser, 01_ShowUser) {
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4);
CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN + VARSTR_HEADER_SIZE, "name");
CheckSchema(1, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "privilege");
CheckSchema(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CheckSchema(3, TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN + VARSTR_HEADER_SIZE, "account");
test.SendShowMetaMsg(TSDB_MGMT_TABLE_USER, "");
CHECK_META("show users", 4);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN + VARSTR_HEADER_SIZE, "name");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "privilege");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CHECK_SCHEMA(3, TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN + VARSTR_HEADER_SIZE, "account");
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1);
SendThenCheckShowRetrieveMsg(1);
CheckBinary("root", TSDB_USER_LEN);
CheckBinary("super", 10);
CheckTimestamp();
......@@ -179,39 +45,35 @@ TEST_F(DndTestUser, 01_ShowUser) {
TEST_F(DndTestUser, 02_Create_Drop_Alter_User) {
{
SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(sizeof(SCreateUserMsg));
int32_t contLen = sizeof(SCreateUserMsg);
SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(contLen);
strcpy(pReq->user, "u1");
strcpy(pReq->pass, "p1");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateUserMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_USER;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_USER, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
{
SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(sizeof(SCreateUserMsg));
int32_t contLen = sizeof(SCreateUserMsg);
SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(contLen);
strcpy(pReq->user, "u2");
strcpy(pReq->pass, "p2");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateUserMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_USER;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_USER, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4);
SendThenCheckShowRetrieveMsg(3);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_USER, "");
CHECK_META("show users", 4);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 3);
CheckBinary("u1", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN);
CheckBinary("u2", TSDB_USER_LEN);
......@@ -226,22 +88,23 @@ TEST_F(DndTestUser, 02_Create_Drop_Alter_User) {
CheckBinary("root", TSDB_USER_LEN);
{
SAlterUserMsg* pReq = (SAlterUserMsg*)rpcMallocCont(sizeof(SAlterUserMsg));
int32_t contLen = sizeof(SAlterUserMsg);
SAlterUserMsg* pReq = (SAlterUserMsg*)rpcMallocCont(contLen);
strcpy(pReq->user, "u1");
strcpy(pReq->pass, "p2");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SAlterUserMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_USER;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_ALTER_USER, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4);
SendThenCheckShowRetrieveMsg(3);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_USER, "");
CHECK_META("show users", 4);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 3);
CheckBinary("u1", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN);
CheckBinary("u2", TSDB_USER_LEN);
......@@ -256,21 +119,22 @@ TEST_F(DndTestUser, 02_Create_Drop_Alter_User) {
CheckBinary("root", TSDB_USER_LEN);
{
SDropUserMsg* pReq = (SDropUserMsg*)rpcMallocCont(sizeof(SDropUserMsg));
strcpy(pReq->user, "u1");
int32_t contLen = sizeof(SDropUserMsg);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropUserMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_USER;
SDropUserMsg* pReq = (SDropUserMsg*)rpcMallocCont(contLen);
strcpy(pReq->user, "u1");
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_USER, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4);
SendThenCheckShowRetrieveMsg(2);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_USER, "");
CHECK_META("show users", 4);
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 2);
CheckBinary("root", TSDB_USER_LEN);
CheckBinary("u2", TSDB_USER_LEN);
CheckBinary("super", 10);
......@@ -281,19 +145,14 @@ TEST_F(DndTestUser, 02_Create_Drop_Alter_User) {
CheckBinary("root", TSDB_USER_LEN);
// restart
stopServer(pServer);
pServer = NULL;
uInfo("start all server");
test.Restart();
const char* fqdn = "localhost";
const char* firstEp = "localhost:9140";
pServer = startServer("/tmp/dnode_test_user", fqdn, 9140, firstEp);
test.SendShowMetaMsg(TSDB_MGMT_TABLE_USER, "");
CHECK_META("show users", 4);
uInfo("all server is running");
test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 2);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4);
SendThenCheckShowRetrieveMsg(2);
CheckBinary("root", TSDB_USER_LEN);
CheckBinary("u2", TSDB_USER_LEN);
CheckBinary("super", 10);
......
add_executable(dnode_test_vgroup "")
target_sources(dnode_test_vgroup
PRIVATE
"vgroup.cpp"
"../sut/deploy.cpp"
)
aux_source_directory(. VGROUP_SRC)
add_executable(dnode_test_vgroup ${VGROUP_SRC})
target_link_libraries(
dnode_test_vgroup
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dnode_test_vgroup
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
PUBLIC sut
)
add_test(
......
......@@ -9,177 +9,28 @@
*
*/
#include "deploy.h"
#include "base.h"
class DndTestVgroup : public ::testing::Test {
protected:
static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SServer* pServer = createServer(path, fqdn, port, firstEp);
ASSERT(pServer);
return pServer;
}
static void SetUpTestSuite() {
initLog("/tmp/tdlog");
const char* fqdn = "localhost";
const char* firstEp = "localhost:9150";
pServer = CreateServer("/tmp/dnode_test_vgroup", fqdn, 9150, firstEp);
pClient = createClient("root", "taosdata", fqdn, 9150);
taosMsleep(1100);
}
static void TearDownTestSuite() {
stopServer(pServer);
dropClient(pClient);
pServer = NULL;
pClient = NULL;
}
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_vgroup", 9150); }
static void TearDownTestSuite() { test.Cleanup(); }
static SServer* pServer;
static SClient* pClient;
static int32_t connId;
static Testbase test;
public:
void SetUp() override {}
void TearDown() override {}
void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns, const char* db) {
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = showType;
if (db != NULL) {
strcpy(pShow->db, db);
}
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
ASSERT_NE(pShowRsp, nullptr);
pShowRsp->showId = htonl(pShowRsp->showId);
pMeta = &pShowRsp->tableMeta;
pMeta->numOfTags = htonl(pMeta->numOfTags);
pMeta->numOfColumns = htonl(pMeta->numOfColumns);
pMeta->sversion = htonl(pMeta->sversion);
pMeta->tversion = htonl(pMeta->tversion);
pMeta->tuid = htobe64(pMeta->tuid);
pMeta->suid = htobe64(pMeta->suid);
showId = pShowRsp->showId;
EXPECT_NE(pShowRsp->showId, 0);
EXPECT_STREQ(pMeta->tbFname, showName);
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, columns);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->update, 0);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tuid, 0);
EXPECT_EQ(pMeta->suid, 0);
}
void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, type);
EXPECT_EQ(pSchema->bytes, bytes);
EXPECT_STREQ(pSchema->name, name);
}
void SendThenCheckShowRetrieveMsg(int32_t rows) {
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = htonl(showId);
pRetrieve->free = 0;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->useconds, 0);
// EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0);
pData = pRetrieveRsp->data;
pos = 0;
}
void CheckInt8(int8_t val) {
int8_t data = *((int8_t*)(pData + pos));
pos += sizeof(int8_t);
EXPECT_EQ(data, val);
}
void CheckInt16(int16_t val) {
int16_t data = *((int16_t*)(pData + pos));
pos += sizeof(int16_t);
EXPECT_EQ(data, val);
}
void CheckInt32(int32_t val) {
int32_t data = *((int32_t*)(pData + pos));
pos += sizeof(int32_t);
EXPECT_EQ(data, val);
}
void CheckInt64(int64_t val) {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_EQ(data, val);
}
void CheckTimestamp() {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_GT(data, 0);
}
void CheckBinary(const char* val, int32_t len) {
pos += sizeof(VarDataLenT);
char* data = (char*)(pData + pos);
pos += len;
EXPECT_STREQ(data, val);
}
int32_t showId;
STableMetaMsg* pMeta;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;
};
SServer* DndTestVgroup::pServer;
SClient* DndTestVgroup::pClient;
int32_t DndTestVgroup::connId;
Testbase DndTestVgroup::test;
TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
{
for (int i = 0; i < 3; ++i) {
SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg));
int32_t contLen = sizeof(SCreateVnodeMsg);
SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(contLen);
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
......@@ -209,13 +60,7 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
pReplica->port = htons(9150);
}
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateVnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_VNODE_IN, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
......@@ -223,7 +68,9 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
{
for (int i = 0; i < 3; ++i) {
SAlterVnodeMsg* pReq = (SAlterVnodeMsg*)rpcMallocCont(sizeof(SAlterVnodeMsg));
int32_t contLen = sizeof(SAlterVnodeMsg);
SAlterVnodeMsg* pReq = (SAlterVnodeMsg*)rpcMallocCont(contLen);
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
......@@ -253,13 +100,7 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
pReplica->port = htons(9150);
}
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SAlterVnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_ALTER_VNODE_IN, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
......@@ -267,7 +108,9 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
{
for (int i = 0; i < 3; ++i) {
SDropVnodeMsg* pReq = (SDropVnodeMsg*)rpcMallocCont(sizeof(SDropVnodeMsg));
int32_t contLen = sizeof(SDropVnodeMsg);
SDropVnodeMsg* pReq = (SDropVnodeMsg*)rpcMallocCont(contLen);
pReq->vgId = htonl(2);
pReq->dnodeId = htonl(1);
strcpy(pReq->db, "1.d1");
......@@ -278,8 +121,7 @@ TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) {
rpcMsg.contLen = sizeof(SDropVnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_VNODE_IN, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
......
......@@ -23,29 +23,25 @@
#define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18
#define SUBQUERY_INFO_SIZE 6
#define QUERY_STREAM_SAVE_SIZE 20
#define QUERY_SAVE_SIZE 20
typedef struct {
char user[TSDB_USER_LEN];
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
int32_t pid; // pid of app that invokes taosc
int64_t appStartTime; // app start time
int32_t id;
int8_t killed;
int8_t align;
uint16_t port;
uint32_t ip;
int64_t stime;
int64_t lastAccess;
int32_t queryId;
int32_t streamId;
int32_t numOfQueries;
int32_t numOfStreams;
SStreamDesc *pStreams;
SQueryDesc *pQueries;
int32_t id;
char user[TSDB_USER_LEN];
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
int64_t appStartTimeMs; // app start time
int32_t pid; // pid of app that invokes taosc
uint32_t ip;
uint16_t port;
int8_t killed;
int64_t loginTimeMs;
int64_t lastAccessTimeMs;
int32_t queryId;
int32_t numOfQueries;
SQueryDesc *pQueries;
} SConnObj;
static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app, int64_t startTime);
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime);
static void mndFreeConn(SConnObj *pConn);
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
......@@ -54,16 +50,12 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg);
static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg);
static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg);
static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg);
static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg);
static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveStreams(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
int32_t mndInitProfile(SMnode *pMnode) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
......@@ -79,7 +71,6 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_HEARTBEAT, mndProcessHeartBeatMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONNECT, mndProcessConnectMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_KILL_QUERY, mndProcessKillQueryMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_KILL_STREAM, mndProcessKillStreamMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_KILL_CONN, mndProcessKillConnectionMsg);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta);
......@@ -88,9 +79,6 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndGetQueryMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndGetStreamMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStreams);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
return 0;
}
......@@ -103,46 +91,43 @@ void mndCleanupProfile(SMnode *pMnode) {
}
}
static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app, int64_t startTime) {
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1);
if (startTime == 0) startTime = taosGetTimestampMs();
SConnObj connObj = {.pid = pid,
.appStartTime = startTime,
.id = connId,
SConnObj connObj = {.id = connId,
.appStartTimeMs = startTime,
.pid = pid,
.ip = pInfo->clientIp,
.port = pInfo->clientPort,
.killed = 0,
.port = port,
.ip = ip,
.stime = taosGetTimestampMs(),
.lastAccess = 0,
.loginTimeMs = taosGetTimestampMs(),
.lastAccessTimeMs = 0,
.queryId = 0,
.streamId = 0,
.numOfQueries = 0,
.numOfStreams = 0,
.pStreams = NULL,
.pQueries = NULL};
connObj.lastAccess = connObj.stime;
tstrncpy(connObj.user, user, TSDB_USER_LEN);
connObj.lastAccessTimeMs = connObj.loginTimeMs;
tstrncpy(connObj.user, pInfo->user, TSDB_USER_LEN);
tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);
int32_t keepTime = pMnode->cfg.shellActivityTimer * 3;
SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
if (pConn == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("conn:%d, data:%p failed to put into cache since %s, user:%s", connId, pConn, user, terrstr());
mError("conn:%d, data:%p failed to put into cache since %s, user:%s", connId, pConn, pInfo->user, terrstr());
return NULL;
} else {
mTrace("conn:%d, data:%p created, user:%s", pConn->id, pConn, user);
mTrace("conn:%d, data:%p created, user:%s", pConn->id, pConn, pInfo->user);
return pConn;
}
}
static void mndFreeConn(SConnObj *pConn) {
tfree(pConn->pQueries);
tfree(pConn->pStreams);
mTrace("conn:%d, data:%p destroyed", pConn->id, pConn);
}
......@@ -156,7 +141,7 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) {
}
int32_t keepTime = pMnode->cfg.shellActivityTimer * 3;
pConn->lastAccess = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
pConn->lastAccessTimeMs = keepTime * 1000 + (uint64_t)taosGetTimestampMs();
mTrace("conn:%d, data:%p acquired from cache", pConn->id, pConn);
return pConn;
......@@ -219,7 +204,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
mndReleaseDb(pMnode, pDb);
}
SConnObj *pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app, pReq->startTime);
SConnObj *pConn = mndCreateConn(pMnode, &info, pReq->pid, pReq->app, pReq->startTime);
if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pMsg->user, ip, terrstr());
return -1;
......@@ -254,16 +239,14 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pMsg) {
pConn->numOfQueries = 0;
pConn->numOfStreams = 0;
int32_t numOfQueries = htonl(pMsg->numOfQueries);
int32_t numOfStreams = htonl(pMsg->numOfStreams);
if (numOfQueries > 0) {
if (pConn->pQueries == NULL) {
pConn->pQueries = calloc(sizeof(SQueryDesc), QUERY_STREAM_SAVE_SIZE);
pConn->pQueries = calloc(sizeof(SQueryDesc), QUERY_SAVE_SIZE);
}
pConn->numOfQueries = MIN(QUERY_STREAM_SAVE_SIZE, numOfQueries);
pConn->numOfQueries = MIN(QUERY_SAVE_SIZE, numOfQueries);
int32_t saveSize = pConn->numOfQueries * sizeof(SQueryDesc);
if (saveSize > 0 && pConn->pQueries != NULL) {
......@@ -271,19 +254,6 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pMsg) {
}
}
if (numOfStreams > 0) {
if (pConn->pStreams == NULL) {
pConn->pStreams = calloc(sizeof(SStreamDesc), QUERY_STREAM_SAVE_SIZE);
}
pConn->numOfStreams = MIN(QUERY_STREAM_SAVE_SIZE, numOfStreams);
int32_t saveSize = pConn->numOfStreams * sizeof(SStreamDesc);
if (saveSize > 0 && pConn->pStreams != NULL) {
memcpy(pConn->pStreams, pMsg->pData + numOfQueries * sizeof(SQueryDesc), saveSize);
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -303,7 +273,7 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
SConnObj *pConn = mndAcquireConn(pMnode, pReq->connId);
if (pConn == NULL) {
pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app, 0);
pConn = mndCreateConn(pMnode, &info, pReq->pid, pReq->app, 0);
if (pConn == NULL) {
mError("user:%s, conn:%d is freed and failed to create new conn since %s", pMsg->user, pReq->connId, terrstr());
return -1;
......@@ -343,11 +313,6 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
pRsp->killConnection = 1;
}
if (pConn->streamId != 0) {
pRsp->streamId = htonl(pConn->streamId);
pConn->streamId = 0;
}
if (pConn->queryId != 0) {
pRsp->queryId = htonl(pConn->queryId);
pConn->queryId = 0;
......@@ -395,37 +360,6 @@ static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) {
}
}
static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}
mndReleaseUser(pMnode, pUser);
SKillStreamMsg *pKill = pMsg->rpcMsg.pCont;
int32_t connId = htonl(pKill->connId);
int32_t streamId = htonl(pKill->streamId);
mDebug("kill stream msg is received, streamId:%d", streamId);
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%d, failed to kill streamId:%d, conn not exist", connId, streamId);
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1;
} else {
mInfo("connId:%d, streamId:%d is killed by user:%s", connId, streamId, pMsg->user);
pConn->streamId = streamId;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return TSDB_CODE_SUCCESS;
}
}
static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
......@@ -525,6 +459,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0;
}
......@@ -567,12 +502,12 @@ static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, in
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pConn->stime;
*(int64_t *)pWrite = pConn->loginTimeMs;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pConn->lastAccess < pConn->stime) pConn->lastAccess = pConn->stime;
*(int64_t *)pWrite = pConn->lastAccess;
if (pConn->lastAccessTimeMs < pConn->loginTimeMs) pConn->lastAccessTimeMs = pConn->loginTimeMs;
*(int64_t *)pWrite = pConn->lastAccessTimeMs;
cols++;
numOfRows++;
......@@ -623,7 +558,7 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 24;
pShow->bytes[cols] = 22 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "qid");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
......@@ -693,6 +628,7 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
pShow->numOfRows = 1000000;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type));
return 0;
}
......@@ -790,6 +726,7 @@ static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
}
}
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows;
return numOfRows;
}
......@@ -798,173 +735,3 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}
static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}
mndReleaseUser(pMnode, pUser);
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "streamId");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "connId");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "user");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "destination");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip:port");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "exec");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema[cols].name, "time(us)");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "sql");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "cycles");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htonl(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = 1000000;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
return 0;
}
static int32_t mndRetrieveStreams(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
SMnode *pMnode = pMsg->pMnode;
int32_t numOfRows = 0;
SConnObj *pConn = NULL;
int32_t cols = 0;
char *pWrite;
void *pIter;
char ipStr[TSDB_IPv4ADDR_LEN + 6];
while (numOfRows < rows) {
pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn);
if (pConn == NULL) {
pShow->pIter = pIter;
break;
}
if (numOfRows + pConn->numOfStreams >= rows) {
mndCancelGetNextConn(pMnode, pIter);
break;
}
pShow->pIter = pIter;
for (int32_t i = 0; i < pConn->numOfStreams; ++i) {
SStreamDesc *pDesc = pConn->pStreams + i;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = htobe64(pDesc->streamId);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = htobe64(pConn->id);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->dstTable, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConn->ip), pConn->port);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = htobe64(pDesc->ctime);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = htobe64(pDesc->stime);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = htobe64(pDesc->useconds);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->bytes[cols]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = (int32_t)htobe64(pDesc->num);
cols++;
numOfRows++;
}
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}
......@@ -138,7 +138,7 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
ShowMetaFp metaFp = pMgmt->metaFps[type];
if (metaFp == NULL) {
terrno = TSDB_CODE_MND_INVALID_MSG_TYPE;
mError("failed to process show-meta msg:%s since no message handle", mndShowStr(type));
mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr());
return -1;
}
......
......@@ -26,7 +26,7 @@ if $rows != 2 then
endi
print $data00 $data01 $data02
print $data10 $data11 $data22
print $data10 $data11 $data12
print $data20 $data11 $data22
print $data30 $data31 $data32
......@@ -38,7 +38,7 @@ if $rows != 3 then
endi
print $data00 $data01 $data02
print $data10 $data11 $data22
print $data10 $data11 $data12
print $data20 $data11 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
......
......@@ -3,8 +3,6 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c wallevel -v 0
system sh/exec.sh -n dnode1 -s start
sleep 2000
sql connect
$i = 0
......@@ -29,7 +27,7 @@ step12:
sql create user $user PASS 'taosdata'
sql show users
if $rows != 4 then
if $rows != 2 then
return -1
endi
......@@ -40,7 +38,7 @@ sql drop user $user -x step2
step2:
sql create user $user PASS '1'
sql show users
if $rows != 5 then
if $rows != 3 then
return -1
endi
......@@ -52,7 +50,7 @@ step3:
sql create user $user PASS 'abc0123456789'
sql show users
if $rows != 6 then
if $rows != 3 then
return -1
endi
......@@ -63,7 +61,7 @@ sql create user $user PASS 'abcd012345678901234567891234567890' -x step4
return -1
step4:
sql show users
if $rows != 6 then
if $rows != 4 then
return -1
endi
......@@ -75,7 +73,7 @@ while $i < 3
endw
sql show users
if $rows != 3 then
if $rows != 1 then
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册