提交 50749639 编写于 作者: H Hongze Cheng

Merge branch '3.0' into feature/vnode

...@@ -25,6 +25,7 @@ if(${BUILD_TEST}) ...@@ -25,6 +25,7 @@ if(${BUILD_TEST})
enable_testing() enable_testing()
endif(${BUILD_TEST}) endif(${BUILD_TEST})
add_subdirectory(source) add_subdirectory(source)
add_subdirectory(tools)
# docs # docs
add_subdirectory(docs) add_subdirectory(docs)
......
...@@ -783,12 +783,8 @@ typedef struct { ...@@ -783,12 +783,8 @@ typedef struct {
} SAuthVnodeMsg; } SAuthVnodeMsg;
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; int32_t vgId;
} SStbInfoMsg; char tableFname[TSDB_TABLE_FNAME_LEN];
typedef struct {
SMsgHead msgHead;
char tableFname[TSDB_TABLE_FNAME_LEN];
} STableInfoMsg; } STableInfoMsg;
typedef struct { typedef struct {
...@@ -799,10 +795,6 @@ typedef struct { ...@@ -799,10 +795,6 @@ typedef struct {
char tableNames[]; char tableNames[];
} SMultiTableInfoMsg; } SMultiTableInfoMsg;
typedef struct SSTableVgroupMsg {
int32_t numOfTables;
} SSTableVgroupMsg, SSTableVgroupRspMsg;
typedef struct SVgroupInfo { typedef struct SVgroupInfo {
int32_t vgId; int32_t vgId;
uint32_t hashBegin; uint32_t hashBegin;
...@@ -812,12 +804,6 @@ typedef struct SVgroupInfo { ...@@ -812,12 +804,6 @@ typedef struct SVgroupInfo {
SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SVgroupInfo; } SVgroupInfo;
typedef struct SVgroupListRspMsg {
int32_t vgroupNum;
int32_t vgroupVersion;
SVgroupInfo vgroupInfo[];
} SVgroupListRspMsg;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int8_t numOfEps; int8_t numOfEps;
...@@ -839,8 +825,8 @@ typedef struct { ...@@ -839,8 +825,8 @@ typedef struct {
int8_t update; int8_t update;
int32_t sversion; int32_t sversion;
int32_t tversion; int32_t tversion;
uint64_t tuid;
uint64_t suid; uint64_t suid;
uint64_t tuid;
int32_t vgId; int32_t vgId;
SSchema pSchema[]; SSchema pSchema[];
} STableMetaMsg; } STableMetaMsg;
......
...@@ -34,6 +34,13 @@ enum OPERATOR_TYPE_E { ...@@ -34,6 +34,13 @@ enum OPERATOR_TYPE_E {
OP_TotalNum OP_TotalNum
}; };
enum DATASINK_TYPE_E {
DSINK_Unknown,
DSINK_Dispatch,
DSINK_Insert,
DSINK_TotalNum
};
struct SEpSet; struct SEpSet;
struct SQueryStmtInfo; struct SQueryStmtInfo;
...@@ -49,6 +56,22 @@ typedef struct SQueryNodeBasicInfo { ...@@ -49,6 +56,22 @@ typedef struct SQueryNodeBasicInfo {
const char *name; // operator name const char *name; // operator name
} SQueryNodeBasicInfo; } SQueryNodeBasicInfo;
typedef struct SDataSink {
SQueryNodeBasicInfo info;
SDataBlockSchema schema;
} SDataSink;
typedef struct SDataDispatcher {
SDataSink sink;
// todo
} SDataDispatcher;
typedef struct SDataInserter {
SDataSink sink;
uint64_t uid; // unique id of the table
// todo data field
} SDataInserter;
typedef struct SPhyNode { typedef struct SPhyNode {
SQueryNodeBasicInfo info; SQueryNodeBasicInfo info;
SArray *pTargets; // target list to be computed or scanned at this node SArray *pTargets; // target list to be computed or scanned at this node
...@@ -113,15 +136,16 @@ typedef struct SQueryDag { ...@@ -113,15 +136,16 @@ typedef struct SQueryDag {
*/ */
int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag); int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag);
int32_t qSetSuplanExecutionNode(SArray* subplans, SArray* nodes); int32_t qSetSuplanExecutionNode(SSubplan* subplan, SArray* nodes);
int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str); int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str);
/** /**
* Convert to subplan to string for the scheduler to send to the executor * Convert to subplan to string for the scheduler to send to the executor
*/ */
int32_t qSubPlanToString(struct SSubplan *pPhyNode, char** str); int32_t qSubPlanToString(const SSubplan* subplan, char** str);
int32_t qStringToSubplan(const char* str, SSubplan** subplan);
/** /**
* Destroy the physical plan. * Destroy the physical plan.
......
...@@ -14,13 +14,13 @@ ...@@ -14,13 +14,13 @@
*/ */
#if defined(INCLUDE_AS_ENUM) // enum define mode #if defined(INCLUDE_AS_ENUM) // enum define mode
#undef OP_ENUM_MACRO #undef OP_ENUM_MACRO
#define OP_ENUM_MACRO(op) OP_##op, #define OP_ENUM_MACRO(op) OP_##op,
#elif defined(INCLUDE_AS_NAME) // comment define mode #elif defined(INCLUDE_AS_NAME) // comment define mode
#undef OP_ENUM_MACRO #undef OP_ENUM_MACRO
#define OP_ENUM_MACRO(op) #op, #define OP_ENUM_MACRO(op) #op,
#else #else
#error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME #error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME
#endif #endif
OP_ENUM_MACRO(TableScan) OP_ENUM_MACRO(TableScan)
......
...@@ -24,8 +24,6 @@ extern "C" { ...@@ -24,8 +24,6 @@ extern "C" {
#include "thash.h" #include "thash.h"
#include "tlog.h" #include "tlog.h"
typedef SVgroupListRspMsg SVgroupListInfo;
typedef struct STableComInfo { typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision uint8_t precision; // the number of precision
......
...@@ -67,6 +67,8 @@ int32_t scheduleFetchRows(void *pJob, void *data); ...@@ -67,6 +67,8 @@ int32_t scheduleFetchRows(void *pJob, void *data);
*/ */
int32_t scheduleCancelJob(void *pJob); int32_t scheduleCancelJob(void *pJob);
void scheduleFreeJob(void *pJob);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -206,44 +206,28 @@ int32_t* taosGetErrno(); ...@@ -206,44 +206,28 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0392) #define TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0392)
// mnode-stable // mnode-stable
#define TSDB_CODE_MND_STB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360) #define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0)
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360) #define TSDB_CODE_MND_STB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A1)
#define TSDB_CODE_MND_STB_INVALID_IGEXIST TAOS_DEF_ERROR_CODE(0, 0x0360) #define TSDB_CODE_MND_TOO_MANY_STBS TAOS_DEF_ERROR_CODE(0, 0x03A2)
#define TSDB_CODE_MND_STB_INVALID_COLS_NUM TAOS_DEF_ERROR_CODE(0, 0x0360) #define TSDB_CODE_MND_INVALID_STB TAOS_DEF_ERROR_CODE(0, 0x03A3)
#define TSDB_CODE_MND_STB_INVALID_TAGS_NUM TAOS_DEF_ERROR_CODE(0, 0x0360) #define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x03A4)
#define TSDB_CODE_MND_STB_INVALID_COL_TYPE TAOS_DEF_ERROR_CODE(0, 0x0360) #define TSDB_CODE_MND_STB_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03A5)
#define TSDB_CODE_MND_STB_INVALID_COL_ID TAOS_DEF_ERROR_CODE(0, 0x0360) #define TSDB_CODE_MND_TOO_MANY_TAGS TAOS_DEF_ERROR_CODE(0, 0x03A6)
#define TSDB_CODE_MND_STB_INVALID_COL_BYTES TAOS_DEF_ERROR_CODE(0, 0x0360) #define TSDB_CODE_MND_TAG_ALREAY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A7)
#define TSDB_CODE_MND_STB_INVALID_COL_NAME TAOS_DEF_ERROR_CODE(0, 0x0360) #define TSDB_CODE_MND_TAG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A8)
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x03A9)
#define TSDB_CODE_MND_COLUMN_ALREAY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AA)
#define TSDB_CODE_MND_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0361) //"Table name too long") #define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AB)
#define TSDB_CODE_MND_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0362) //"Table does not exist") #define TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x03AC)
#define TSDB_CODE_MND_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0363) //"Invalid table type in tsdb")
#define TSDB_CODE_MND_TOO_MANY_TAGS TAOS_DEF_ERROR_CODE(0, 0x0364) //"Too many tags") // mnode-func
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x0365) //"Too many columns") #define TSDB_CODE_MND_FUNC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03C0)
#define TSDB_CODE_MND_TOO_MANY_TIMESERIES TAOS_DEF_ERROR_CODE(0, 0x0366) //"Too many time series") #define TSDB_CODE_MND_FUNC_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03C1)
#define TSDB_CODE_MND_NOT_SUPER_TABLE TAOS_DEF_ERROR_CODE(0, 0x0367) //"Not super table") // operation only available for super table #define TSDB_CODE_MND_INVALID_FUNC TAOS_DEF_ERROR_CODE(0, 0x03C2)
#define TSDB_CODE_MND_COL_NAME_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x0368) //"Tag name too long") #define TSDB_CODE_MND_INVALID_FUNC_NAME TAOS_DEF_ERROR_CODE(0, 0x03C3)
#define TSDB_CODE_MND_TAG_ALREAY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0369) //"Tag already exists") #define TSDB_CODE_MND_INVALID_FUNC_COMMENT TAOS_DEF_ERROR_CODE(0, 0x03C4)
#define TSDB_CODE_MND_TAG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x036A) //"Tag does not exist") #define TSDB_CODE_MND_INVALID_FUNC_CODE TAOS_DEF_ERROR_CODE(0, 0x03C5)
#define TSDB_CODE_MND_FIELD_ALREAY_EXIST TAOS_DEF_ERROR_CODE(0, 0x036B) //"Field already exists") #define TSDB_CODE_MND_INVALID_FUNC_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x03C6)
#define TSDB_CODE_MND_FIELD_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x036C) //"Field does not exist")
#define TSDB_CODE_MND_INVALID_STABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x036D) //"Super table does not exist")
#define TSDB_CODE_MND_INVALID_CREATE_TABLE_MSG TAOS_DEF_ERROR_CODE(0, 0x036E) //"Invalid create table message")
#define TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x036F) //"Exceed max row bytes")
#define TSDB_CODE_MND_FUNC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0370)
#define TSDB_CODE_MND_FUNC_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0371)
#define TSDB_CODE_MND_INVALID_FUNC TAOS_DEF_ERROR_CODE(0, 0x0372)
#define TSDB_CODE_MND_INVALID_FUNC_NAME TAOS_DEF_ERROR_CODE(0, 0x0373)
#define TSDB_CODE_MND_INVALID_FUNC_COMMENT TAOS_DEF_ERROR_CODE(0, 0x0374)
#define TSDB_CODE_MND_INVALID_FUNC_CODE TAOS_DEF_ERROR_CODE(0, 0x0375)
#define TSDB_CODE_MND_INVALID_FUNC_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x0376)
#define TSDB_CODE_MND_INVALID_TAG_LENGTH TAOS_DEF_ERROR_CODE(0, 0x0376) //"invalid tag length")
#define TSDB_CODE_MND_INVALID_COLUMN_LENGTH TAOS_DEF_ERROR_CODE(0, 0x0377) //"invalid column length")
// dnode // dnode
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
...@@ -506,6 +490,11 @@ int32_t* taosGetErrno(); ...@@ -506,6 +490,11 @@ int32_t* taosGetErrno();
#define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error #define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error
#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) //catalog system error #define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) //catalog system error
//scheduler
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -38,7 +38,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -38,7 +38,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_TABLE_META] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TSDB_MSG_TYPE_TABLE_META] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_TABLES_META] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TSDB_MSG_TYPE_TABLES_META] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dndProcessVnodeQueryMsg;
......
enable_testing()
add_subdirectory(acct) add_subdirectory(acct)
# add_subdirectory(auth) # add_subdirectory(auth)
# add_subdirectory(balance) # add_subdirectory(balance)
# add_subdirectory(cluster) add_subdirectory(cluster)
add_subdirectory(db) add_subdirectory(db)
add_subdirectory(dnode) add_subdirectory(dnode)
# add_subdirectory(func) # add_subdirectory(func)
# add_subdirectory(mnode) # add_subdirectory(mnode)
# add_subdirectory(profile) # add_subdirectory(profile)
# add_subdirectory(show) # add_subdirectory(show)
# add_subdirectory(stb) add_subdirectory(stb)
# add_subdirectory(sync) # add_subdirectory(sync)
# add_subdirectory(telem) # add_subdirectory(telem)
# add_subdirectory(trans) # add_subdirectory(trans)
......
...@@ -21,8 +21,6 @@ target_include_directories(dnode_test_acct ...@@ -21,8 +21,6 @@ target_include_directories(dnode_test_acct
"${CMAKE_CURRENT_SOURCE_DIR}/../sut" "${CMAKE_CURRENT_SOURCE_DIR}/../sut"
) )
enable_testing()
add_test( add_test(
NAME dnode_test_acct NAME dnode_test_acct
COMMAND dnode_test_acct COMMAND dnode_test_acct
......
/** /**
* @file vnodeApiTests.cpp * @file acct.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief DNODE module acct-msg tests * @brief DNODE module acct-msg tests
* @version 0.1 * @version 0.1
......
add_executable(dndTestCluster "") add_executable(dnode_test_cluster "")
target_sources(dndTestCluster target_sources(dnode_test_cluster
PRIVATE PRIVATE
"cluster.cpp" "cluster.cpp"
"../sut/deploy.cpp" "../sut/deploy.cpp"
) )
target_link_libraries( target_link_libraries(
dndTestCluster dnode_test_cluster
PUBLIC dnode PUBLIC dnode
PUBLIC util PUBLIC util
PUBLIC os PUBLIC os
PUBLIC gtest_main PUBLIC gtest_main
) )
target_include_directories(dndTestCluster target_include_directories(dnode_test_cluster
PUBLIC PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt" "${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut" "${CMAKE_CURRENT_SOURCE_DIR}/../sut"
) )
enable_testing()
add_test( add_test(
NAME dndTestCluster NAME dnode_test_cluster
COMMAND dndTestCluster COMMAND dnode_test_cluster
) )
/** /**
* @file vnodeApiTests.cpp * @file cluster.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief DNODE module cluster-msg tests * @brief DNODE module cluster-msg tests
* @version 0.1 * @version 0.1
...@@ -13,154 +13,158 @@ ...@@ -13,154 +13,158 @@
class DndTestCluster : public ::testing::Test { class DndTestCluster : public ::testing::Test {
protected: protected:
void SetUp() override {} static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
void TearDown() override {} SServer* pServer = createServer(path, fqdn, port, firstEp);
ASSERT(pServer);
return pServer;
}
static void SetUpTestSuite() { static void SetUpTestSuite() {
const char* user = "root"; initLog("/tmp/tdlog");
const char* pass = "taosdata";
const char* path = "/tmp/dndTestCluster";
const char* fqdn = "localhost";
uint16_t port = 9521;
pServer = createServer(path, fqdn, port); const char* fqdn = "localhost";
ASSERT(pServer); const char* firstEp = "localhost:9030";
pClient = createClient(user, pass, fqdn, port); pServer = CreateServer("/tmp/dnode_test_cluster", fqdn, 9030, firstEp);
pClient = createClient("root", "taosdata", fqdn, 9030);
taosMsleep(1100);
} }
static void TearDownTestSuite() { static void TearDownTestSuite() {
stopServer(pServer); stopServer(pServer);
dropClient(pClient); dropClient(pClient);
pServer = NULL;
pClient = NULL;
} }
static SServer* pServer; static SServer* pServer;
static SClient* pClient; static SClient* pClient;
static int32_t connId; static int32_t connId;
};
SServer* DndTestCluster::pServer; public:
SClient* DndTestCluster::pClient; void SetUp() override {}
int32_t DndTestCluster::connId; void TearDown() override {}
TEST_F(DndTestCluster, ShowCluster) { void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns, const char* db) {
ASSERT_NE(pClient, nullptr); SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
int32_t showId = 0; pShow->type = showType;
if (db != NULL) {
{ strcpy(pShow->db, db);
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); }
pReq->type = TSDB_MGMT_TABLE_CLUSTER; SRpcMsg showRpcMsg = {0};
strcpy(pReq->db, ""); showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
SRpcMsg rpcMsg = {0}; showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SShowMsg); sendMsg(pClient, &showRpcMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW; ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
sendMsg(pClient, &rpcMsg); ASSERT_NE(pClient->pRsp->pCont, nullptr);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr); SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
ASSERT_NE(pShowRsp, nullptr);
SShowRsp* pRsp = (SShowRsp*)pMsg->pCont; pShowRsp->showId = htonl(pShowRsp->showId);
ASSERT_NE(pRsp, nullptr); pMeta = &pShowRsp->tableMeta;
pRsp->showId = htonl(pRsp->showId); pMeta->numOfTags = htonl(pMeta->numOfTags);
STableMetaMsg* pMeta = &pRsp->tableMeta; pMeta->numOfColumns = htonl(pMeta->numOfColumns);
pMeta->contLen = htonl(pMeta->contLen); pMeta->sversion = htonl(pMeta->sversion);
pMeta->numOfColumns = htons(pMeta->numOfColumns); pMeta->tversion = htonl(pMeta->tversion);
pMeta->sversion = htons(pMeta->sversion); pMeta->tuid = htobe64(pMeta->tuid);
pMeta->tversion = htons(pMeta->tversion);
pMeta->tid = htonl(pMeta->tid);
pMeta->uid = htobe64(pMeta->uid);
pMeta->suid = htobe64(pMeta->suid); pMeta->suid = htobe64(pMeta->suid);
showId = pRsp->showId; showId = pShowRsp->showId;
EXPECT_NE(pRsp->showId, 0); EXPECT_NE(pShowRsp->showId, 0);
EXPECT_EQ(pMeta->contLen, 0); EXPECT_STREQ(pMeta->tbFname, showName);
EXPECT_STREQ(pMeta->tbFname, "show cluster");
EXPECT_EQ(pMeta->numOfTags, 0); EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, columns);
EXPECT_EQ(pMeta->precision, 0); EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0); EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->numOfColumns, 3); EXPECT_EQ(pMeta->update, 0);
EXPECT_EQ(pMeta->sversion, 0); EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0); EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tid, 0); EXPECT_EQ(pMeta->tuid, 0);
EXPECT_EQ(pMeta->uid, 0);
EXPECT_STREQ(pMeta->sTableName, "");
EXPECT_EQ(pMeta->suid, 0); EXPECT_EQ(pMeta->suid, 0);
}
SSchema* pSchema = NULL; void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
pSchema = &pMeta->pSchema[0]; SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "id");
pSchema = &pMeta->pSchema[1];
pSchema->bytes = htons(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0); EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); EXPECT_EQ(pSchema->type, type);
EXPECT_EQ(pSchema->bytes, TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE); EXPECT_EQ(pSchema->bytes, bytes);
EXPECT_STREQ(pSchema->name, "name"); EXPECT_STREQ(pSchema->name, name);
}
pSchema = &pMeta->pSchema[2]; void SendThenCheckShowRetrieveMsg(int32_t rows) {
pSchema->bytes = htons(pSchema->bytes); SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
EXPECT_EQ(pSchema->colId, 0); pRetrieve->showId = htonl(showId);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP); pRetrieve->free = 0;
EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "create_time"); SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->useconds, 0);
// EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0);
pData = pRetrieveRsp->data;
pos = 0;
} }
{ void CheckInt32() {
SRetrieveTableMsg* pReq = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); int32_t data = *((int32_t*)(pData + pos));
pReq->showId = htonl(showId);
pReq->free = 0;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SRetrieveTableMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr);
pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->offset = htobe64(pRsp->offset);
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen);
EXPECT_EQ(pRsp->numOfRows, 1);
EXPECT_EQ(pRsp->offset, 0);
EXPECT_EQ(pRsp->useconds, 0);
EXPECT_EQ(pRsp->completed, 1);
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRsp->compressed, 0);
EXPECT_EQ(pRsp->reserved, 0);
EXPECT_EQ(pRsp->compLen, 0);
char* pData = pRsp->data;
int32_t pos = 0;
int32_t id = *((int32_t*)(pData + pos));
pos += sizeof(int32_t); pos += sizeof(int32_t);
EXPECT_GT(data, 0);
}
int32_t nameLen = varDataLen(pData + pos); void CheckTimestamp() {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_GT(data, 0);
}
void CheckBinary(int32_t len) {
pos += sizeof(VarDataLenT); pos += sizeof(VarDataLenT);
char* data = (char*)(pData + pos);
pos += len;
}
char* name = (char*)(pData + pos); int32_t showId;
pos += TSDB_CLUSTER_ID_LEN; STableMetaMsg* pMeta;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;
};
int64_t create_time = *((int64_t*)(pData + pos)); SServer* DndTestCluster::pServer;
pos += sizeof(int64_t); SClient* DndTestCluster::pClient;
int32_t DndTestCluster::connId;
EXPECT_NE(id, 0); TEST_F(DndTestCluster, 01_ShowCluster) {
EXPECT_EQ(nameLen, 36); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_CLUSTER, "show cluster", 3, NULL);
EXPECT_STRNE(name, ""); CheckSchema(0, TSDB_DATA_TYPE_INT, 4, "id");
EXPECT_GT(create_time, 0); CheckSchema(1, TSDB_DATA_TYPE_BINARY, TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, "name");
printf("--- id:%d nameLen:%d name:%s time:%" PRId64 " --- \n", id, nameLen, name, create_time); CheckSchema(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
}
SendThenCheckShowRetrieveMsg(1);
CheckInt32();
CheckBinary(TSDB_CLUSTER_ID_LEN);
CheckTimestamp();
} }
\ No newline at end of file
...@@ -21,8 +21,6 @@ target_include_directories(dnode_test_db ...@@ -21,8 +21,6 @@ target_include_directories(dnode_test_db
"${CMAKE_CURRENT_SOURCE_DIR}/../sut" "${CMAKE_CURRENT_SOURCE_DIR}/../sut"
) )
enable_testing()
add_test( add_test(
NAME dnode_test_db NAME dnode_test_db
COMMAND dnode_test_db COMMAND dnode_test_db
......
/** /**
* @file vnodeApiTests.cpp * @file db.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief DNODE module db-msg tests * @brief DNODE module db-msg tests
* @version 0.1 * @version 0.1
...@@ -88,7 +88,7 @@ class DndTestDb : public ::testing::Test { ...@@ -88,7 +88,7 @@ class DndTestDb : public ::testing::Test {
void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) { void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
SSchema* pSchema = &pMeta->pSchema[index]; SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0); EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, type); EXPECT_EQ(pSchema->type, type);
EXPECT_EQ(pSchema->bytes, bytes); EXPECT_EQ(pSchema->bytes, bytes);
...@@ -114,17 +114,14 @@ class DndTestDb : public ::testing::Test { ...@@ -114,17 +114,14 @@ class DndTestDb : public ::testing::Test {
pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr); ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->offset = htobe64(pRetrieveRsp->offset);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds); pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen); pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, rows); EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->offset, 0);
EXPECT_EQ(pRetrieveRsp->useconds, 0); EXPECT_EQ(pRetrieveRsp->useconds, 0);
// EXPECT_EQ(pRetrieveRsp->completed, completed); // EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI); EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0); EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->reserved, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0); EXPECT_EQ(pRetrieveRsp->compLen, 0);
pData = pRetrieveRsp->data; pData = pRetrieveRsp->data;
...@@ -182,13 +179,13 @@ int32_t DndTestDb::connId; ...@@ -182,13 +179,13 @@ int32_t DndTestDb::connId;
TEST_F(DndTestDb, 01_ShowDb) { TEST_F(DndTestDb, 01_ShowDb) {
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name"); CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name");
CheckSchema(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time"); CheckSchema(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CheckSchema(2, TSDB_DATA_TYPE_SMALLINT, 2, "vgroups"); CheckSchema(2, TSDB_DATA_TYPE_SMALLINT, 2, "vgroups");
CheckSchema(3, TSDB_DATA_TYPE_SMALLINT, 2, "replica"); CheckSchema(3, TSDB_DATA_TYPE_SMALLINT, 2, "replica");
CheckSchema(4, TSDB_DATA_TYPE_SMALLINT, 2, "quorum"); CheckSchema(4, TSDB_DATA_TYPE_SMALLINT, 2, "quorum");
CheckSchema(5, TSDB_DATA_TYPE_SMALLINT, 2, "days"); CheckSchema(5, TSDB_DATA_TYPE_SMALLINT, 2, "days");
CheckSchema(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "keep0,keep1,keep2"); CheckSchema(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "keep0,keep1,keep2");
CheckSchema(7, TSDB_DATA_TYPE_INT, 4, "cache(MB)"); CheckSchema(7, TSDB_DATA_TYPE_INT, 4, "cache");
CheckSchema(8, TSDB_DATA_TYPE_INT, 4, "blocks"); CheckSchema(8, TSDB_DATA_TYPE_INT, 4, "blocks");
CheckSchema(9, TSDB_DATA_TYPE_INT, 4, "minrows"); CheckSchema(9, TSDB_DATA_TYPE_INT, 4, "minrows");
CheckSchema(10, TSDB_DATA_TYPE_INT, 4, "maxrows"); CheckSchema(10, TSDB_DATA_TYPE_INT, 4, "maxrows");
......
...@@ -21,8 +21,6 @@ target_include_directories(dnode_test_dnode ...@@ -21,8 +21,6 @@ target_include_directories(dnode_test_dnode
"${CMAKE_CURRENT_SOURCE_DIR}/../sut" "${CMAKE_CURRENT_SOURCE_DIR}/../sut"
) )
enable_testing()
add_test( add_test(
NAME dnode_test_dnode NAME dnode_test_dnode
COMMAND dnode_test_dnode COMMAND dnode_test_dnode
......
/** /**
* @file vnodeApiTests.cpp * @file dnode.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief DNODE module dnode-msg tests * @brief DNODE module dnode-msg tests
* @version 0.1 * @version 0.1
...@@ -81,8 +81,8 @@ class DndTestDnode : public ::testing::Test { ...@@ -81,8 +81,8 @@ class DndTestDnode : public ::testing::Test {
pMeta->numOfTags = htonl(pMeta->numOfTags); pMeta->numOfTags = htonl(pMeta->numOfTags);
pMeta->numOfColumns = htonl(pMeta->numOfColumns); pMeta->numOfColumns = htonl(pMeta->numOfColumns);
pMeta->sversion = htonl(pMeta->sversion); pMeta->sversion = htonl(pMeta->sversion);
pMeta->tversion = htons(pMeta->tversion); pMeta->tversion = htonl(pMeta->tversion);
pMeta->tuid = htonl(pMeta->tuid); pMeta->tuid = htobe64(pMeta->tuid);
pMeta->suid = htobe64(pMeta->suid); pMeta->suid = htobe64(pMeta->suid);
showId = pShowRsp->showId; showId = pShowRsp->showId;
...@@ -102,7 +102,7 @@ class DndTestDnode : public ::testing::Test { ...@@ -102,7 +102,7 @@ class DndTestDnode : public ::testing::Test {
void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) { void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
SSchema* pSchema = &pMeta->pSchema[index]; SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0); EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, type); EXPECT_EQ(pSchema->type, type);
EXPECT_EQ(pSchema->bytes, bytes); EXPECT_EQ(pSchema->bytes, bytes);
...@@ -128,17 +128,14 @@ class DndTestDnode : public ::testing::Test { ...@@ -128,17 +128,14 @@ class DndTestDnode : public ::testing::Test {
pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr); ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->offset = htobe64(pRetrieveRsp->offset);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds); pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen); pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, rows); EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->offset, 0);
EXPECT_EQ(pRetrieveRsp->useconds, 0); EXPECT_EQ(pRetrieveRsp->useconds, 0);
// EXPECT_EQ(pRetrieveRsp->completed, completed); // EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI); EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0); EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->reserved, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0); EXPECT_EQ(pRetrieveRsp->compLen, 0);
pData = pRetrieveRsp->data; pData = pRetrieveRsp->data;
...@@ -187,12 +184,12 @@ SClient* DndTestDnode::pClient; ...@@ -187,12 +184,12 @@ SClient* DndTestDnode::pClient;
TEST_F(DndTestDnode, 01_ShowDnode) { TEST_F(DndTestDnode, 01_ShowDnode) {
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
CheckSchema(0, TSDB_DATA_TYPE_SMALLINT, 2, "id"); CheckSchema(0, TSDB_DATA_TYPE_SMALLINT, 2, "id");
CheckSchema(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "end point"); CheckSchema(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint");
CheckSchema(2, TSDB_DATA_TYPE_SMALLINT, 2, "vnodes"); CheckSchema(2, TSDB_DATA_TYPE_SMALLINT, 2, "vnodes");
CheckSchema(3, TSDB_DATA_TYPE_SMALLINT, 2, "max vnodes"); CheckSchema(3, TSDB_DATA_TYPE_SMALLINT, 2, "max_vnodes");
CheckSchema(4, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "status"); CheckSchema(4, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "status");
CheckSchema(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time"); CheckSchema(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CheckSchema(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline reason"); CheckSchema(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline_reason");
SendThenCheckShowRetrieveMsg(1); SendThenCheckShowRetrieveMsg(1);
CheckInt16(1); CheckInt16(1);
......
...@@ -21,8 +21,6 @@ target_include_directories(dndTestProfile ...@@ -21,8 +21,6 @@ target_include_directories(dndTestProfile
"${CMAKE_CURRENT_SOURCE_DIR}/../sut" "${CMAKE_CURRENT_SOURCE_DIR}/../sut"
) )
enable_testing()
add_test( add_test(
NAME dndTestProfile NAME dndTestProfile
COMMAND dndTestProfile COMMAND dndTestProfile
......
/** /**
* @file vnodeApiTests.cpp * @file profile.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief DNODE module profile-msg tests * @brief DNODE module profile-msg tests
* @version 0.1 * @version 0.1
...@@ -216,17 +216,14 @@ TEST_F(DndTestProfile, SConnectMsg_03) { ...@@ -216,17 +216,14 @@ TEST_F(DndTestProfile, SConnectMsg_03) {
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pMsg->pCont; SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pMsg->pCont;
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
pRsp->numOfRows = htonl(pRsp->numOfRows); pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->offset = htobe64(pRsp->offset);
pRsp->useconds = htobe64(pRsp->useconds); pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen); pRsp->compLen = htonl(pRsp->compLen);
EXPECT_EQ(pRsp->numOfRows, 1); EXPECT_EQ(pRsp->numOfRows, 1);
EXPECT_EQ(pRsp->offset, 0);
EXPECT_EQ(pRsp->useconds, 0); EXPECT_EQ(pRsp->useconds, 0);
EXPECT_EQ(pRsp->completed, 1); EXPECT_EQ(pRsp->completed, 1);
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI); EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRsp->compressed, 0); EXPECT_EQ(pRsp->compressed, 0);
EXPECT_EQ(pRsp->reserved, 0);
EXPECT_EQ(pRsp->compLen, 0); EXPECT_EQ(pRsp->compLen, 0);
} }
} }
...@@ -497,7 +494,7 @@ TEST_F(DndTestProfile, SKillQueryMsg_03) { ...@@ -497,7 +494,7 @@ TEST_F(DndTestProfile, SKillQueryMsg_03) {
EXPECT_STREQ(pSchema->name, "queryId"); EXPECT_STREQ(pSchema->name, "queryId");
pSchema = &pMeta->pSchema[1]; pSchema = &pMeta->pSchema[1];
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0); EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT); EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4); EXPECT_EQ(pSchema->bytes, 4);
......
add_executable(dndTestShow "") add_executable(dnode_test_show "")
target_sources(dndTestShow target_sources(dnode_test_show
PRIVATE PRIVATE
"show.cpp" "show.cpp"
"../sut/deploy.cpp" "../sut/deploy.cpp"
) )
target_link_libraries( target_link_libraries(
dndTestShow dnode_test_show
PUBLIC dnode PUBLIC dnode
PUBLIC util PUBLIC util
PUBLIC os PUBLIC os
PUBLIC gtest_main PUBLIC gtest_main
) )
target_include_directories(dndTestShow target_include_directories(dnode_test_show
PUBLIC PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt" "${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut" "${CMAKE_CURRENT_SOURCE_DIR}/../sut"
) )
enable_testing()
add_test( add_test(
NAME dndTestShow NAME dnode_test_show
COMMAND dndTestShow COMMAND dnode_test_show
) )
/** /**
* @file vnodeApiTests.cpp * @file show.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief DNODE module show-msg tests * @brief DNODE module show-msg tests
* @version 0.1 * @version 0.1
...@@ -151,49 +151,49 @@ TEST_F(DndTestShow, SShowMsg_04) { ...@@ -151,49 +151,49 @@ TEST_F(DndTestShow, SShowMsg_04) {
SSchema* pSchema = NULL; SSchema* pSchema = NULL;
pSchema = &pMeta->pSchema[0]; pSchema = &pMeta->pSchema[0];
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0); EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT); EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4); EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "connId"); EXPECT_STREQ(pSchema->name, "connId");
pSchema = &pMeta->pSchema[1]; pSchema = &pMeta->pSchema[1];
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0); EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE); EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "user"); EXPECT_STREQ(pSchema->name, "user");
pSchema = &pMeta->pSchema[2]; pSchema = &pMeta->pSchema[2];
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0); EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE); EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "program"); EXPECT_STREQ(pSchema->name, "program");
pSchema = &pMeta->pSchema[3]; pSchema = &pMeta->pSchema[3];
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0); EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT); EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
EXPECT_EQ(pSchema->bytes, 4); EXPECT_EQ(pSchema->bytes, 4);
EXPECT_STREQ(pSchema->name, "pid"); EXPECT_STREQ(pSchema->name, "pid");
pSchema = &pMeta->pSchema[4]; pSchema = &pMeta->pSchema[4];
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0); EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
EXPECT_EQ(pSchema->bytes, TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE); EXPECT_EQ(pSchema->bytes, TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE);
EXPECT_STREQ(pSchema->name, "ip:port"); EXPECT_STREQ(pSchema->name, "ip:port");
pSchema = &pMeta->pSchema[5]; pSchema = &pMeta->pSchema[5];
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0); EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP); EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->bytes, 8); EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "login_time"); EXPECT_STREQ(pSchema->name, "login_time");
pSchema = &pMeta->pSchema[6]; pSchema = &pMeta->pSchema[6];
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0); EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP); EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->bytes, 8); EXPECT_EQ(pSchema->bytes, 8);
......
add_executable(dnode_test_stb "")
target_sources(dnode_test_stb
PRIVATE
"stb.cpp"
"../sut/deploy.cpp"
)
target_link_libraries(
dnode_test_stb
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(dnode_test_stb
PUBLIC
"${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt"
"${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
"${CMAKE_CURRENT_SOURCE_DIR}/../sut"
)
add_test(
NAME dnode_test_stb
COMMAND dnode_test_stb
)
/**
* @file stb.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module db-msg tests
* @version 0.1
* @date 2021-12-17
*
* @copyright Copyright (c) 2021
*
*/
#include "deploy.h"
class DndTestStb : public ::testing::Test {
protected:
static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
SServer* pServer = createServer(path, fqdn, port, firstEp);
ASSERT(pServer);
return pServer;
}
static void SetUpTestSuite() {
initLog("/tmp/tdlog");
const char* fqdn = "localhost";
const char* firstEp = "localhost:9101";
pServer = CreateServer("/tmp/dnode_test_stb", fqdn, 9101, firstEp);
pClient = createClient("root", "taosdata", fqdn, 9101);
taosMsleep(1100);
}
static void TearDownTestSuite() {
stopServer(pServer);
dropClient(pClient);
pServer = NULL;
pClient = NULL;
}
static SServer* pServer;
static SClient* pClient;
static int32_t connId;
public:
void SetUp() override {}
void TearDown() override {}
void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns, const char* db) {
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = showType;
if (db != NULL) {
strcpy(pShow->db, db);
}
SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg);
showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW;
sendMsg(pClient, &showRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont;
ASSERT_NE(pShowRsp, nullptr);
pShowRsp->showId = htonl(pShowRsp->showId);
pMeta = &pShowRsp->tableMeta;
pMeta->numOfTags = htonl(pMeta->numOfTags);
pMeta->numOfColumns = htonl(pMeta->numOfColumns);
pMeta->sversion = htonl(pMeta->sversion);
pMeta->tversion = htonl(pMeta->tversion);
pMeta->tuid = htobe64(pMeta->tuid);
pMeta->suid = htobe64(pMeta->suid);
showId = pShowRsp->showId;
EXPECT_NE(pShowRsp->showId, 0);
EXPECT_STREQ(pMeta->tbFname, showName);
EXPECT_EQ(pMeta->numOfTags, 0);
EXPECT_EQ(pMeta->numOfColumns, columns);
EXPECT_EQ(pMeta->precision, 0);
EXPECT_EQ(pMeta->tableType, 0);
EXPECT_EQ(pMeta->update, 0);
EXPECT_EQ(pMeta->sversion, 0);
EXPECT_EQ(pMeta->tversion, 0);
EXPECT_EQ(pMeta->tuid, 0);
EXPECT_EQ(pMeta->suid, 0);
}
void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, type);
EXPECT_EQ(pSchema->bytes, bytes);
EXPECT_STREQ(pSchema->name, name);
}
void SendThenCheckShowRetrieveMsg(int32_t rows) {
SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg));
pRetrieve->showId = htonl(showId);
pRetrieve->free = 0;
SRpcMsg retrieveRpcMsg = {0};
retrieveRpcMsg.pCont = pRetrieve;
retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg);
retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
sendMsg(pClient, &retrieveRpcMsg);
ASSERT_NE(pClient->pRsp, nullptr);
ASSERT_EQ(pClient->pRsp->code, 0);
ASSERT_NE(pClient->pRsp->pCont, nullptr);
pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->useconds, 0);
// EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0);
pData = pRetrieveRsp->data;
pos = 0;
}
void CheckInt8(int8_t val) {
int8_t data = *((int8_t*)(pData + pos));
pos += sizeof(int8_t);
EXPECT_EQ(data, val);
}
void CheckInt16(int16_t val) {
int16_t data = *((int16_t*)(pData + pos));
pos += sizeof(int16_t);
EXPECT_EQ(data, val);
}
void CheckInt32(int32_t val) {
int32_t data = *((int32_t*)(pData + pos));
pos += sizeof(int32_t);
EXPECT_EQ(data, val);
}
void CheckInt64(int64_t val) {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_EQ(data, val);
}
void CheckTimestamp() {
int64_t data = *((int64_t*)(pData + pos));
pos += sizeof(int64_t);
EXPECT_GT(data, 0);
}
void CheckBinary(const char* val, int32_t len) {
pos += sizeof(VarDataLenT);
char* data = (char*)(pData + pos);
pos += len;
EXPECT_STREQ(data, val);
}
int32_t showId;
STableMetaMsg* pMeta;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;
};
SServer* DndTestStb::pServer;
SClient* DndTestStb::pClient;
int32_t DndTestStb::connId;
TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
{
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg));
strcpy(pReq->db, "1.d1");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRowsPerFileBlock = htonl(100);
pReq->maxRowsPerFileBlock = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replications = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->ignoreExist = 1;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DB;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
{
int32_t cols = 2;
int32_t tags = 3;
int32_t size = (tags + cols) * sizeof(SSchema) + sizeof(SCreateStbMsg);
SCreateStbMsg* pReq = (SCreateStbMsg*)rpcMallocCont(size);
strcpy(pReq->name, "1.d1.stb");
pReq->numOfTags = htonl(tags);
pReq->numOfColumns = htonl(cols);
{
SSchema* pSchema = &pReq->pSchema[0];
pSchema->colId = htonl(0);
pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema->name, "ts");
}
{
SSchema* pSchema = &pReq->pSchema[1];
pSchema->colId = htonl(1);
pSchema->bytes = htonl(4);
pSchema->type = TSDB_DATA_TYPE_INT;
strcpy(pSchema->name, "col1");
}
{
SSchema* pSchema = &pReq->pSchema[2];
pSchema->colId = htonl(2);
pSchema->bytes = htonl(2);
pSchema->type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema->name, "tag1");
}
{
SSchema* pSchema = &pReq->pSchema[3];
pSchema->colId = htonl(3);
pSchema->bytes = htonl(8);
pSchema->type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema->name, "tag2");
}
{
SSchema* pSchema = &pReq->pSchema[4];
pSchema->colId = htonl(4);
pSchema->bytes = htonl(16);
pSchema->type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema->name, "tag3");
}
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = size;
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_STB;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1");
CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name");
CheckSchema(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CheckSchema(2, TSDB_DATA_TYPE_INT, 4, "columns");
CheckSchema(3, TSDB_DATA_TYPE_INT, 4, "tags");
SendThenCheckShowRetrieveMsg(1);
CheckBinary("stb", TSDB_TABLE_NAME_LEN);
CheckTimestamp();
CheckInt32(2);
CheckInt32(3);
// ----- meta ------
{
STableInfoMsg* pReq = (STableInfoMsg*)rpcMallocCont(sizeof(STableInfoMsg));
strcpy(pReq->tableFname, "1.d1.stb");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(STableInfoMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_TABLE_META;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
STableMetaMsg* pRsp = (STableMetaMsg*)pMsg->pCont;
pRsp->numOfTags = htonl(pRsp->numOfTags);
pRsp->numOfColumns = htonl(pRsp->numOfColumns);
pRsp->sversion = htonl(pRsp->sversion);
pRsp->tversion = htonl(pRsp->tversion);
pRsp->suid = htobe64(pRsp->suid);
pRsp->tuid = htobe64(pRsp->tuid);
pRsp->vgId = htobe64(pRsp->vgId);
for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) {
SSchema* pSchema = &pRsp->pSchema[i];
pSchema->colId = htonl(pSchema->colId);
pSchema->bytes = htonl(pSchema->bytes);
}
EXPECT_STREQ(pRsp->tbFname, "");
EXPECT_STREQ(pRsp->stbFname, "1.d1.stb");
EXPECT_EQ(pRsp->numOfColumns, 2);
EXPECT_EQ(pRsp->numOfTags, 3);
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRsp->tableType, TSDB_SUPER_TABLE);
EXPECT_EQ(pRsp->update, 0);
EXPECT_EQ(pRsp->sversion, 1);
EXPECT_EQ(pRsp->tversion, 0);
EXPECT_GT(pRsp->suid, 0);
EXPECT_EQ(pRsp->tuid, 0);
EXPECT_EQ(pRsp->vgId, 0);
{
SSchema* pSchema = &pRsp->pSchema[0];
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->bytes, 8);
EXPECT_STREQ(pSchema->name, "ts");
}
}
// restart
stopServer(pServer);
pServer = NULL;
uInfo("start all server");
const char* fqdn = "localhost";
const char* firstEp = "localhost:9101";
pServer = startServer("/tmp/dnode_test_stb", fqdn, 9101, firstEp);
uInfo("all server is running");
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1");
SendThenCheckShowRetrieveMsg(1);
CheckBinary("stb", TSDB_TABLE_NAME_LEN);
CheckTimestamp();
CheckInt32(2);
CheckInt32(3);
{
SDropStbMsg* pReq = (SDropStbMsg*)rpcMallocCont(sizeof(SDropStbMsg));
strcpy(pReq->name, "1.d1.stb");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropStbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_STB;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_STB, "show stables", 4, "1.d1");
SendThenCheckShowRetrieveMsg(0);
}
...@@ -21,8 +21,6 @@ target_include_directories(dnode_test_user ...@@ -21,8 +21,6 @@ target_include_directories(dnode_test_user
"${CMAKE_CURRENT_SOURCE_DIR}/../sut" "${CMAKE_CURRENT_SOURCE_DIR}/../sut"
) )
enable_testing()
add_test( add_test(
NAME dnode_test_user NAME dnode_test_user
COMMAND dnode_test_user COMMAND dnode_test_user
......
/** /**
* @file vnodeApiTests.cpp * @file user.cpp
* @author slguan (slguan@taosdata.com) * @author slguan (slguan@taosdata.com)
* @brief DNODE module user-msg tests * @brief DNODE module user-msg tests
* @version 0.1 * @version 0.1
...@@ -87,7 +87,7 @@ class DndTestUser : public ::testing::Test { ...@@ -87,7 +87,7 @@ class DndTestUser : public ::testing::Test {
void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) { void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) {
SSchema* pSchema = &pMeta->pSchema[index]; SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htons(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
EXPECT_EQ(pSchema->colId, 0); EXPECT_EQ(pSchema->colId, 0);
EXPECT_EQ(pSchema->type, type); EXPECT_EQ(pSchema->type, type);
EXPECT_EQ(pSchema->bytes, bytes); EXPECT_EQ(pSchema->bytes, bytes);
...@@ -113,17 +113,14 @@ class DndTestUser : public ::testing::Test { ...@@ -113,17 +113,14 @@ class DndTestUser : public ::testing::Test {
pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont;
ASSERT_NE(pRetrieveRsp, nullptr); ASSERT_NE(pRetrieveRsp, nullptr);
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
pRetrieveRsp->offset = htobe64(pRetrieveRsp->offset);
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds); pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen); pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
EXPECT_EQ(pRetrieveRsp->numOfRows, rows); EXPECT_EQ(pRetrieveRsp->numOfRows, rows);
EXPECT_EQ(pRetrieveRsp->offset, 0);
EXPECT_EQ(pRetrieveRsp->useconds, 0); EXPECT_EQ(pRetrieveRsp->useconds, 0);
// EXPECT_EQ(pRetrieveRsp->completed, completed); // EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI); EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(pRetrieveRsp->compressed, 0); EXPECT_EQ(pRetrieveRsp->compressed, 0);
EXPECT_EQ(pRetrieveRsp->reserved, 0);
EXPECT_EQ(pRetrieveRsp->compLen, 0); EXPECT_EQ(pRetrieveRsp->compLen, 0);
pData = pRetrieveRsp->data; pData = pRetrieveRsp->data;
...@@ -170,7 +167,7 @@ TEST_F(DndTestUser, 01_ShowUser) { ...@@ -170,7 +167,7 @@ TEST_F(DndTestUser, 01_ShowUser) {
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4);
CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN + VARSTR_HEADER_SIZE, "name"); CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN + VARSTR_HEADER_SIZE, "name");
CheckSchema(1, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "privilege"); CheckSchema(1, TSDB_DATA_TYPE_BINARY, 10 + VARSTR_HEADER_SIZE, "privilege");
CheckSchema(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time"); CheckSchema(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
CheckSchema(3, TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN + VARSTR_HEADER_SIZE, "account"); CheckSchema(3, TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN + VARSTR_HEADER_SIZE, "account");
SendThenCheckShowRetrieveMsg(1); SendThenCheckShowRetrieveMsg(1);
......
...@@ -747,7 +747,7 @@ static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMe ...@@ -747,7 +747,7 @@ static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMe
pShow->bytes[cols] = 2; pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "vgroups"); strcpy(pSchema[cols].name, "vgroups");
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 2; pShow->bytes[cols] = 2;
......
...@@ -291,7 +291,7 @@ char *mndShowStr(int32_t showType) { ...@@ -291,7 +291,7 @@ char *mndShowStr(int32_t showType) {
case TSDB_MGMT_TABLE_VNODES: case TSDB_MGMT_TABLE_VNODES:
return "show vnodes"; return "show vnodes";
case TSDB_MGMT_TABLE_CLUSTER: case TSDB_MGMT_TABLE_CLUSTER:
return "show clusters"; return "show cluster";
case TSDB_MGMT_TABLE_STREAMTABLES: case TSDB_MGMT_TABLE_STREAMTABLES:
return "show streamtables"; return "show streamtables";
case TSDB_MGMT_TABLE_TP: case TSDB_MGMT_TABLE_TP:
......
...@@ -80,7 +80,7 @@ static SSdbRaw *mndStbActionEncode(SStbObj *pStb) { ...@@ -80,7 +80,7 @@ static SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT64(pRaw, dataPos, pStb->updateTime) SDB_SET_INT64(pRaw, dataPos, pStb->updateTime)
SDB_SET_INT64(pRaw, dataPos, pStb->uid) SDB_SET_INT64(pRaw, dataPos, pStb->uid)
SDB_SET_INT64(pRaw, dataPos, pStb->dbUid) SDB_SET_INT64(pRaw, dataPos, pStb->dbUid)
SDB_SET_INT64(pRaw, dataPos, pStb->version) SDB_SET_INT32(pRaw, dataPos, pStb->version)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns) SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags) SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags)
...@@ -157,11 +157,17 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb ...@@ -157,11 +157,17 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb
atomic_exchange_32(&pOldStb->version, pNewStb->version); atomic_exchange_32(&pOldStb->version, pNewStb->version);
taosWLockLatch(&pOldStb->lock); taosWLockLatch(&pOldStb->lock);
pOldStb->numOfColumns = pNewStb->numOfColumns;
pOldStb->numOfTags = pNewStb->numOfTags;
int32_t totalCols = pNewStb->numOfTags + pNewStb->numOfColumns; int32_t totalCols = pNewStb->numOfTags + pNewStb->numOfColumns;
int32_t totalSize = totalCols * sizeof(SSchema); int32_t totalSize = totalCols * sizeof(SSchema);
if (pOldStb->numOfTags + pOldStb->numOfColumns < totalCols) { if (pOldStb->numOfTags + pOldStb->numOfColumns < totalCols) {
pOldStb->pSchema = malloc(totalSize); void *pSchema = malloc(totalSize);
if (pSchema != NULL) {
free(pOldStb->pSchema);
pOldStb->pSchema = pSchema;
}
} }
memcpy(pOldStb->pSchema, pNewStb->pSchema, totalSize); memcpy(pOldStb->pSchema, pNewStb->pSchema, totalSize);
...@@ -200,37 +206,37 @@ static int32_t mndCheckStbMsg(SCreateStbMsg *pCreate) { ...@@ -200,37 +206,37 @@ static int32_t mndCheckStbMsg(SCreateStbMsg *pCreate) {
} }
if (pCreate->igExists < 0 || pCreate->igExists > 1) { if (pCreate->igExists < 0 || pCreate->igExists > 1) {
terrno = TSDB_CODE_MND_STB_INVALID_IGEXIST; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
if (pCreate->numOfColumns < TSDB_MIN_COLUMNS || pCreate->numOfColumns > TSDB_MAX_COLUMNS) { if (pCreate->numOfColumns < TSDB_MIN_COLUMNS || pCreate->numOfColumns > TSDB_MAX_COLUMNS) {
terrno = TSDB_CODE_MND_STB_INVALID_COLS_NUM; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
if (pCreate->numOfTags <= 0 || pCreate->numOfTags > TSDB_MAX_TAGS) { if (pCreate->numOfTags <= 0 || pCreate->numOfTags > TSDB_MAX_TAGS) {
terrno = TSDB_CODE_MND_STB_INVALID_TAGS_NUM; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
int32_t maxColId = (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS); int32_t maxColId = (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS);
for (int32_t i = 0; i < totalCols; ++i) { for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pCreate->pSchema[i]; SSchema *pSchema = &pCreate->pSchema[i];
if (pSchema->type <= 0) { if (pSchema->type < 0) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_TYPE; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
if (pSchema->colId < 0 || pSchema->colId >= maxColId) { if (pSchema->colId < 0 || pSchema->colId >= maxColId) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_ID; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
if (pSchema->bytes <= 0) { if (pSchema->bytes <= 0) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_BYTES; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
if (pSchema->name[0] == 0) { if (pSchema->name[0] == 0) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_NAME; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
} }
...@@ -245,6 +251,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre ...@@ -245,6 +251,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre
stbObj.createdTime = taosGetTimestampMs(); stbObj.createdTime = taosGetTimestampMs();
stbObj.updateTime = stbObj.createdTime; stbObj.updateTime = stbObj.createdTime;
stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
stbObj.dbUid = pDb->uid;
stbObj.version = 1; stbObj.version = 1;
stbObj.numOfColumns = pCreate->numOfColumns; stbObj.numOfColumns = pCreate->numOfColumns;
stbObj.numOfTags = pCreate->numOfTags; stbObj.numOfTags = pCreate->numOfTags;
...@@ -350,19 +357,19 @@ static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) { ...@@ -350,19 +357,19 @@ static int32_t mndCheckAlterStbMsg(SAlterStbMsg *pAlter) {
pSchema->bytes = htonl(pSchema->bytes); pSchema->bytes = htonl(pSchema->bytes);
if (pSchema->type <= 0) { if (pSchema->type <= 0) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_TYPE; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) { if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_ID; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
if (pSchema->bytes <= 0) { if (pSchema->bytes <= 0) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_BYTES; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
if (pSchema->name[0] == 0) { if (pSchema->name[0] == 0) {
terrno = TSDB_CODE_MND_STB_INVALID_COL_NAME; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
...@@ -480,31 +487,37 @@ static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) { ...@@ -480,31 +487,37 @@ static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SStbInfoMsg *pInfo = pMsg->rpcMsg.pCont; STableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
mDebug("stb:%s, start to retrieve meta", pInfo->name); mDebug("stb:%s, start to retrieve meta", pInfo->tableFname);
SDbObj *pDb = mndAcquireDbByStb(pMnode, pInfo->name); SDbObj *pDb = mndAcquireDbByStb(pMnode, pInfo->tableFname);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED; terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stb:%s, failed to retrieve meta since %s", pInfo->name, terrstr()); mError("stb:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr());
return -1; return -1;
} }
SStbObj *pStb = mndAcquireStb(pMnode, pInfo->name); SStbObj *pStb = mndAcquireStb(pMnode, pInfo->tableFname);
if (pStb == NULL) { if (pStb == NULL) {
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_TABLE_NAME; terrno = TSDB_CODE_MND_INVALID_STB;
mError("stb:%s, failed to get meta since %s", pInfo->name, terrstr()); mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1; return -1;
} }
int32_t contLen = sizeof(STableMetaMsg) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema); taosRLockLatch(&pStb->lock);
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
int32_t contLen = sizeof(STableMetaMsg) + totalCols * sizeof(SSchema);
STableMetaMsg *pMeta = rpcMallocCont(contLen); STableMetaMsg *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) { if (pMeta == NULL) {
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("stb:%s, failed to get meta since %s", pInfo->name, terrstr()); mError("stb:%s, failed to get meta since %s", pInfo->tableFname, terrstr());
return -1; return -1;
} }
...@@ -517,7 +530,7 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { ...@@ -517,7 +530,7 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
pMeta->sversion = htonl(pStb->version); pMeta->sversion = htonl(pStb->version);
pMeta->suid = htonl(pStb->uid); pMeta->suid = htonl(pStb->uid);
for (int32_t i = 0; i < pStb->numOfColumns; ++i) { for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pMeta->pSchema[i]; SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pSrcSchema = &pStb->pSchema[i]; SSchema *pSrcSchema = &pStb->pSchema[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
...@@ -525,11 +538,14 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) { ...@@ -525,11 +538,14 @@ static int32_t mndProcessStbMetaMsg(SMnodeMsg *pMsg) {
pSchema->colId = htonl(pSrcSchema->colId); pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes); pSchema->bytes = htonl(pSrcSchema->bytes);
} }
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
pMsg->pCont = pMeta; pMsg->pCont = pMeta;
pMsg->contLen = contLen; pMsg->contLen = contLen;
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->name, pStb->numOfColumns, pStb->numOfTags); mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pStb->numOfColumns, pStb->numOfTags);
return 0; return 0;
} }
...@@ -546,7 +562,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs ...@@ -546,7 +562,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pStb); pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
if (pIter == NULL) break; if (pIter == NULL) break;
if (strcmp(pStb->db, dbName) == 0) { if (strcmp(pStb->db, dbName) == 0) {
...@@ -583,14 +599,14 @@ static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pM ...@@ -583,14 +599,14 @@ static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pM
pSchema[cols].bytes = htonl(pShow->bytes[cols]); pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 2; pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "columns"); strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htonl(pShow->bytes[cols]); pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 2; pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tags"); strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = htonl(pShow->bytes[cols]); pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++; cols++;
...@@ -603,6 +619,7 @@ static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pM ...@@ -603,6 +619,7 @@ static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pM
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
} }
pShow->numOfRows = sdbGetSize(pSdb, SDB_STB);
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
strcpy(pMeta->tbFname, mndShowStr(pShow->type)); strcpy(pMeta->tbFname, mndShowStr(pShow->type));
...@@ -646,8 +663,8 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 ...@@ -646,8 +663,8 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
cols = 0; cols = 0;
char stbName[TSDB_TABLE_FNAME_LEN] = {0}; char stbName[TSDB_TABLE_NAME_LEN] = {0};
memcpy(stbName, pStb->name + prefixLen, TSDB_TABLE_FNAME_LEN - prefixLen); tstrncpy(stbName, pStb->name + prefixLen, TSDB_TABLE_NAME_LEN);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, stbName); STR_TO_VARSTR(pWrite, stbName);
cols++; cols++;
...@@ -657,11 +674,11 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 ...@@ -657,11 +674,11 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pStb->numOfColumns; *(int32_t *)pWrite = pStb->numOfColumns;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pStb->numOfTags; *(int32_t *)pWrite = pStb->numOfTags;
cols++; cols++;
numOfRows++; numOfRows++;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mndInt.h"
#include "mndTrans.h"
int32_t mndInitSync(SMnode *pMnode) { return 0; }
void mndCleanupSync(SMnode *pMnode) {}
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
int32_t code = 0;
// int32_t len = sdbGetRawTotalSize(pRaw);
// SSdbRaw *pReceived = calloc(1, len);
// memcpy(pReceived, pRaw, len);
// mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg);
// mndTransApply(pMnode, pReceived, code);
return code;
}
bool mndIsMaster(SMnode *pMnode) { return true; }
\ No newline at end of file
...@@ -421,7 +421,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p ...@@ -421,7 +421,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p
pShow->bytes[cols] = 8; pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create time"); strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]); pSchema[cols].bytes = htonl(pShow->bytes[cols]);
cols++; cols++;
......
...@@ -19,5 +19,5 @@ target_link_libraries( ...@@ -19,5 +19,5 @@ target_link_libraries(
# test # test
if(${BUILD_TEST}) if(${BUILD_TEST})
add_subdirectory(test) #add_subdirectory(test)
endif(${BUILD_TEST}) endif(${BUILD_TEST})
\ No newline at end of file
...@@ -77,6 +77,14 @@ typedef struct SInsertParseContext { ...@@ -77,6 +77,14 @@ typedef struct SInsertParseContext {
SInsertStmtInfo* pOutput; SInsertStmtInfo* pOutput;
} SInsertParseContext; } SInsertParseContext;
typedef int32_t (*FRowAppend)(const void *value, int32_t len, void *param);
typedef struct SKvParam {
char buf[TSDB_MAX_TAGS_LEN];
SKVRowBuilder* builder;
SSchema* schema;
} SKvParam;
static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE; static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE; static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
...@@ -300,14 +308,6 @@ static int parseTime(SInsertParseContext* pCxt, SToken *pToken, int16_t timePrec ...@@ -300,14 +308,6 @@ static int parseTime(SInsertParseContext* pCxt, SToken *pToken, int16_t timePrec
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
typedef int32_t (*FRowAppend)(const void *value, int32_t len, void *param);
typedef struct SKvParam {
char buf[TSDB_MAX_TAGS_LEN];
SKVRowBuilder* builder;
SSchema* schema;
} SKvParam;
static FORCE_INLINE int32_t KvRowAppend(const void *value, int32_t len, void *param) { static FORCE_INLINE int32_t KvRowAppend(const void *value, int32_t len, void *param) {
SKvParam* pa = (SKvParam*)param; SKvParam* pa = (SKvParam*)param;
if (TSDB_DATA_TYPE_BINARY == pa->schema->type) { if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
......
...@@ -8,7 +8,7 @@ target_include_directories( ...@@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries( target_link_libraries(
planner planner
PRIVATE os util common catalog parser transport function query PRIVATE os util common cjson catalog parser transport function query
) )
ADD_SUBDIRECTORY(test) ADD_SUBDIRECTORY(test)
...@@ -101,13 +101,7 @@ int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); ...@@ -101,13 +101,7 @@ int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag); int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag);
/** int32_t subPlanToString(const SSubplan *pPhyNode, char** str);
* Convert to physical plan to string to enable to print it out in the shell.
* @param pPhyNode
* @param str
* @return
*/
int32_t phyPlanToString(struct SPhyNode *pPhyNode, char** str);
/** /**
* Destroy the query plan object. * Destroy the query plan object.
......
...@@ -215,7 +215,3 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD ...@@ -215,7 +215,3 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
*pDag = context.pDag; *pDag = context.pDag;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t subPlanToString(struct SSubplan *pPhyNode, char** str) {
return TSDB_CODE_SUCCESS;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "plannerInt.h"
#include "parser.h"
#include "cJSON.h"
typedef cJSON* (*FToObj)(const void* obj);
static bool addObject(cJSON* json, const char* name, FToObj func, const void* obj) {
if (NULL == obj) {
return true;
}
cJSON* jObj = func(obj);
if (NULL == jObj) {
return false;
}
return cJSON_AddItemToObject(json, name, jObj);
}
static bool addItem(cJSON* json, FToObj func, const void* item) {
cJSON* jItem = func(item);
if (NULL == jItem) {
return false;
}
return cJSON_AddItemToArray(json, jItem);
}
static bool addArray(cJSON* json, const char* name, FToObj func, const SArray* array) {
size_t size = (NULL == array) ? 0 : taosArrayGetSize(array);
if (size > 0) {
cJSON* jArray = cJSON_AddArrayToObject(json, name);
if (NULL == jArray) {
return false;
}
for (size_t i = 0; i < size; ++i) {
if (!addItem(jArray, func, taosArrayGetP(array, i))) {
return false;
}
}
}
return true;
}
static bool addRawArray(cJSON* json, const char* name, FToObj func, const void* array, int32_t itemSize, int32_t size) {
if (size > 0) {
cJSON* jArray = cJSON_AddArrayToObject(json, name);
if (NULL == jArray) {
return false;
}
for (size_t i = 0; i < size; ++i) {
if (!addItem(jArray, func, (const char*)array + itemSize * i)) {
return false;
}
}
}
return true;
}
static cJSON* schemaToJson(const void* obj) {
const SSlotSchema* schema = (const SSlotSchema*)obj;
cJSON* jSchema = cJSON_CreateObject();
if (NULL == jSchema) {
return NULL;
}
// The 'name' field do not need to be serialized.
bool res = cJSON_AddNumberToObject(jSchema, "Type", schema->type);
if (res) {
res = cJSON_AddNumberToObject(jSchema, "ColId", schema->colId);
}
if (res) {
res = cJSON_AddNumberToObject(jSchema, "Bytes", schema->bytes);
}
if (!res) {
cJSON_Delete(jSchema);
return NULL;
}
return jSchema;
}
static cJSON* columnFilterInfoToJson(const void* obj) {
const SColumnFilterInfo* filter = (const SColumnFilterInfo*)obj;
cJSON* jFilter = cJSON_CreateObject();
if (NULL == jFilter) {
return NULL;
}
bool res = cJSON_AddNumberToObject(jFilter, "LowerRelOptr", filter->lowerRelOptr);
if (res) {
res = cJSON_AddNumberToObject(jFilter, "UpperRelOptr", filter->upperRelOptr);
}
if (res) {
res = cJSON_AddNumberToObject(jFilter, "Filterstr", filter->filterstr);
}
if (res) {
res = cJSON_AddNumberToObject(jFilter, "LowerBnd", filter->lowerBndd);
}
if (res) {
res = cJSON_AddNumberToObject(jFilter, "UpperBnd", filter->upperBndd);
}
if (!res) {
cJSON_Delete(jFilter);
return NULL;
}
return jFilter;
}
static cJSON* columnInfoToJson(const void* obj) {
const SColumnInfo* col = (const SColumnInfo*)obj;
cJSON* jCol = cJSON_CreateObject();
if (NULL == jCol) {
return NULL;
}
bool res = cJSON_AddNumberToObject(jCol, "ColId", col->colId);
if (res) {
res = cJSON_AddNumberToObject(jCol, "Type", col->type);
}
if (res) {
res = cJSON_AddNumberToObject(jCol, "Bytes", col->bytes);
}
if (res) {
res = addRawArray(jCol, "FilterList", columnFilterInfoToJson, col->flist.filterInfo, sizeof(SColumnFilterInfo), col->flist.numOfFilters);
}
if (!res) {
cJSON_Delete(jCol);
return NULL;
}
return jCol;
}
static cJSON* columnToJson(const void* obj) {
const SColumn* col = (const SColumn*)obj;
cJSON* jCol = cJSON_CreateObject();
if (NULL == jCol) {
return NULL;
}
bool res = cJSON_AddNumberToObject(jCol, "TableId", col->uid);
if (res) {
res = cJSON_AddNumberToObject(jCol, "Flag", col->flag);
}
if (res) {
res = addObject(jCol, "Info", columnInfoToJson, &col->info);
}
if (!res) {
cJSON_Delete(jCol);
return NULL;
}
return jCol;
}
static cJSON* exprNodeToJson(const void* obj);
static cJSON* operatorToJson(const void* obj) {
const tExprNode* exprInfo = (const tExprNode*)obj;
cJSON* jOper = cJSON_CreateObject();
if (NULL == jOper) {
return NULL;
}
bool res = cJSON_AddNumberToObject(jOper, "Oper", exprInfo->_node.optr);
if (res) {
res = addObject(jOper, "Left", exprNodeToJson, exprInfo->_node.pLeft);
}
if (res) {
res = addObject(jOper, "Right", exprNodeToJson, exprInfo->_node.pRight);
}
if (!res) {
cJSON_Delete(jOper);
return NULL;
}
return jOper;
}
static cJSON* functionToJson(const void* obj) {
const tExprNode* exprInfo = (const tExprNode*)obj;
cJSON* jFunc = cJSON_CreateObject();
if (NULL == jFunc) {
return NULL;
}
bool res = cJSON_AddStringToObject(jFunc, "Name", exprInfo->_function.functionName);
if (res) {
res = addRawArray(jFunc, "Child", exprNodeToJson, exprInfo->_function.pChild, sizeof(tExprNode*), exprInfo->_function.num);
}
if (!res) {
cJSON_Delete(jFunc);
return NULL;
}
return jFunc;
}
static cJSON* variantToJson(const void* obj) {
const SVariant* var = (const SVariant*)obj;
cJSON* jVar = cJSON_CreateObject();
if (NULL == jVar) {
return NULL;
}
bool res = cJSON_AddNumberToObject(jVar, "Type", var->nType);
if (res) {
res = cJSON_AddNumberToObject(jVar, "Len", var->nLen);
}
if (res) {
if (0/* in */) {
res = addArray(jVar, "values", variantToJson, var->arr);
} else if (IS_NUMERIC_TYPE(var->nType)) {
res = cJSON_AddNumberToObject(jVar, "Value", var->d);
} else {
res = cJSON_AddStringToObject(jVar, "Value", var->pz);
}
}
if (!res) {
cJSON_Delete(jVar);
return NULL;
}
return jVar;
}
static cJSON* exprNodeToJson(const void* obj) {
const tExprNode* exprInfo = (const tExprNode*)obj;
cJSON* jExprInfo = cJSON_CreateObject();
if (NULL == jExprInfo) {
return NULL;
}
bool res = cJSON_AddNumberToObject(jExprInfo, "Type", exprInfo->nodeType);
if (res) {
switch (exprInfo->nodeType) {
case TEXPR_BINARYEXPR_NODE:
case TEXPR_UNARYEXPR_NODE:
res = addObject(jExprInfo, "Operator", operatorToJson, exprInfo);
break;
case TEXPR_FUNCTION_NODE:
res = addObject(jExprInfo, "Function", functionToJson, exprInfo);
break;
case TEXPR_COL_NODE:
res = addObject(jExprInfo, "Column", schemaToJson, exprInfo->pSchema);
break;
case TEXPR_VALUE_NODE:
res = addObject(jExprInfo, "Value", variantToJson, exprInfo->pVal);
break;
default:
res = false;
break;
}
}
if (!res) {
cJSON_Delete(jExprInfo);
return NULL;
}
return jExprInfo;
}
static cJSON* sqlExprToJson(const void* obj) {
const SSqlExpr* expr = (const SSqlExpr*)obj;
cJSON* jExpr = cJSON_CreateObject();
if (NULL == jExpr) {
return NULL;
}
// token does not need to be serialized.
bool res = addObject(jExpr, "Schema", schemaToJson, &expr->resSchema);
if (res) {
res = addRawArray(jExpr, "Columns", columnToJson, expr->pColumns, sizeof(SColumn), expr->numOfCols);
}
if (res) {
res = cJSON_AddNumberToObject(jExpr, "InterBytes", expr->interBytes);
}
if (res) {
res = addRawArray(jExpr, "Params", variantToJson, expr->param, sizeof(SVariant), expr->numOfParams);
}
if (!res) {
cJSON_Delete(jExpr);
return NULL;
}
return jExpr;
}
static cJSON* exprInfoToJson(const void* obj) {
const SExprInfo* exprInfo = (const SExprInfo*)obj;
cJSON* jExprInfo = cJSON_CreateObject();
if (NULL == jExprInfo) {
return NULL;
}
bool res = addObject(jExprInfo, "Base", sqlExprToJson, &exprInfo->base);
if (res) {
res = addObject(jExprInfo, "Expr", exprNodeToJson, exprInfo->pExpr);
}
if (!res) {
cJSON_Delete(jExprInfo);
return NULL;
}
return jExprInfo;
}
static cJSON* phyNodeToJson(const void* obj) {
const SPhyNode* phyNode = (const SPhyNode*)obj;
cJSON* jNode = cJSON_CreateObject();
if (NULL == jNode) {
return NULL;
}
// The 'pParent' field do not need to be serialized.
bool res = cJSON_AddStringToObject(jNode, "Name", phyNode->info.name);
if (res) {
res = addArray(jNode, "Targets", exprInfoToJson, phyNode->pTargets);
}
if (res) {
res = addArray(jNode, "Conditions", exprInfoToJson, phyNode->pConditions);
}
if (res) {
res = addRawArray(jNode, "Schema", schemaToJson, phyNode->targetSchema.pSchema, sizeof(SSlotSchema), phyNode->targetSchema.numOfCols);
}
if (res) {
res = addArray(jNode, "Children", phyNodeToJson, phyNode->pChildren);
}
if (!res) {
cJSON_Delete(jNode);
return NULL;
}
return jNode;
}
static cJSON* subplanIdToJson(const void* obj) {
const SSubplanId* id = (const SSubplanId*)obj;
cJSON* jId = cJSON_CreateObject();
if (NULL == jId) {
return NULL;
}
bool res = cJSON_AddNumberToObject(jId, "QueryId", id->queryId);
if (res) {
res = cJSON_AddNumberToObject(jId, "TemplateId", id->templateId);
}
if (res) {
res = cJSON_AddNumberToObject(jId, "SubplanId", id->subplanId);
}
if (!res) {
cJSON_Delete(jId);
return NULL;
}
return jId;
}
static cJSON* subplanToJson(const SSubplan* subplan) {
cJSON* jSubplan = cJSON_CreateObject();
if (NULL == jSubplan) {
return NULL;
}
// The 'type', 'level', 'execEpSet', 'pChildern' and 'pParents' fields do not need to be serialized.
bool res = addObject(jSubplan, "Id", subplanIdToJson, &subplan->id);
if (res) {
res = addObject(jSubplan, "Node", phyNodeToJson, subplan->pNode);
}
if (!res) {
cJSON_Delete(jSubplan);
return NULL;
}
return jSubplan;
}
int32_t subPlanToString(const SSubplan* subplan, char** str) {
cJSON* json = subplanToJson(subplan);
if (NULL == json) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
*str = cJSON_Print(json);
return TSDB_CODE_SUCCESS;
}
int32_t stringToSubplan(const char* str, SSubplan** subplan) {
// todo
return TSDB_CODE_SUCCESS;
}
...@@ -41,3 +41,11 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* ...@@ -41,3 +41,11 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
destroyQueryPlan(logicPlan); destroyQueryPlan(logicPlan);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qSubPlanToString(const SSubplan *subplan, char** str) {
return subPlanToString(subplan, str);
}
int32_t qStringToSubplan(const char* str, SSubplan** subplan) {
return stringToSubplan(str, subplan);
}
...@@ -39,7 +39,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3 ...@@ -39,7 +39,7 @@ int32_t queryBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int3
STableInfoMsg *bMsg = (STableInfoMsg *)*msg; STableInfoMsg *bMsg = (STableInfoMsg *)*msg;
bMsg->msgHead.vgId = bInput->vgId; bMsg->vgId = bInput->vgId;
strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname)); strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname));
bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0; bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;
......
...@@ -38,12 +38,13 @@ enum { ...@@ -38,12 +38,13 @@ enum {
}; };
typedef struct SSchedulerMgmt { typedef struct SSchedulerMgmt {
uint64_t taskId;
SHashObj *Jobs; // key: queryId, value: SQueryJob* SHashObj *Jobs; // key: queryId, value: SQueryJob*
} SSchedulerMgmt; } SSchedulerMgmt;
typedef struct SQueryTask { typedef struct SQueryTask {
uint64_t taskId; // task id uint64_t taskId; // task id
char *pSubplan; // operator tree char *msg; // operator tree
int8_t status; // task status int8_t status; // task status
SQueryProfileSummary summary; // task execution summary SQueryProfileSummary summary; // task execution summary
} SQueryTask; } SQueryTask;
...@@ -68,9 +69,12 @@ typedef struct SQueryJob { ...@@ -68,9 +69,12 @@ typedef struct SQueryJob {
} SQueryJob; } SQueryJob;
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0) #define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -51,16 +51,17 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ ...@@ -51,16 +51,17 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_
} }
int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
int32_t code = 0;
int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
if (levelNum <= 0) { if (levelNum <= 0) {
qError("invalid level num:%d", levelNum); qError("invalid level num:%d", levelNum);
return TSDB_CODE_QRY_INVALID_INPUT; SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel)); job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel));
if (NULL == job->levels) { if (NULL == job->levels) {
qError("taosArrayInit %d failed", levelNum); qError("taosArrayInit %d failed", levelNum);
return TSDB_CODE_QRY_OUT_OF_MEMORY; SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
job->levelNum = levelNum; job->levelNum = levelNum;
...@@ -73,28 +74,66 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -73,28 +74,66 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
SArray *levelPlans = NULL; SArray *levelPlans = NULL;
int32_t levelPlanNum = 0; int32_t levelPlanNum = 0;
level.status = SCH_STATUS_NOT_START;
for (int32_t i = 0; i < levelNum; ++i) { for (int32_t i = 0; i < levelNum; ++i) {
levelPlans = taosArrayGetP(dag->pSubplans, i); levelPlans = taosArrayGetP(dag->pSubplans, i);
if (NULL == levelPlans) { if (NULL == levelPlans) {
qError("no level plans for level %d", i); qError("no level plans for level %d", i);
return TSDB_CODE_QRY_INVALID_INPUT; SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
levelPlanNum = (int32_t)taosArrayGetSize(levelPlans); levelPlanNum = (int32_t)taosArrayGetSize(levelPlans);
if (levelPlanNum <= 0) { if (levelPlanNum <= 0) {
qError("invalid level plans number:%d, level:%d", levelPlanNum, i); qError("invalid level plans number:%d, level:%d", levelPlanNum, i);
return TSDB_CODE_QRY_INVALID_INPUT; SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
level.taskNum = levelPlanNum;
level.subPlans = levelPlans;
level.subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask));
if (NULL == level.subTasks) {
qError("taosArrayInit %d failed", levelPlanNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
for (int32_t n = 0; n < levelPlanNum; ++n) { for (int32_t n = 0; n < levelPlanNum; ++n) {
SQueryTask *task = taosArrayGet(level.subTasks, n);
task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
task->status = SCH_STATUS_NOT_START;
}
if (NULL == taosArrayPush(job->levels, &level)) {
qError("taosArrayPush failed");
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return:
if (level.subTasks) {
taosArrayDestroy(level.subTasks);
}
SCH_RET(code);
} }
int32_t schJobExecute(SQueryJob *job) {
switch (job->status) {
case SCH_STATUS_NOT_START:
break;
default:
SCH_JOB_ERR_LOG("invalid job status:%d", job->status);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
}
int32_t schedulerInit(SSchedulerCfg *cfg) { int32_t schedulerInit(SSchedulerCfg *cfg) {
schMgmt.Jobs = taosHashInit(SCHEDULE_DEFAULT_JOB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); schMgmt.Jobs = taosHashInit(SCHEDULE_DEFAULT_JOB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
...@@ -108,32 +147,39 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { ...@@ -108,32 +147,39 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) { int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) {
if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
return TSDB_CODE_QRY_INVALID_INPUT; SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
int32_t code = 0;
SQueryJob *job = calloc(1, sizeof(SQueryJob)); SQueryJob *job = calloc(1, sizeof(SQueryJob));
if (NULL == job) { if (NULL == job) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
schValidateAndBuildJob(pDag, job); SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
SCH_ERR_JRET(schJobExecute(job));
*(SQueryJob **)pJob = job; *(SQueryJob **)pJob = job;
return TSDB_CODE_SUCCESS;
_return:
*(SQueryJob **)pJob = NULL;
scheduleFreeJob(job);
SCH_RET(code);
} }
int32_t scheduleFetchRows(void *pJob, void *data); int32_t scheduleFetchRows(void *pJob, void *data);
int32_t scheduleCancelJob(void *pJob); int32_t scheduleCancelJob(void *pJob);
void scheduleFreeJob(void *pJob) {
}
void schedulerDestroy(void) { void schedulerDestroy(void) {
if (schMgmt.Jobs) { if (schMgmt.Jobs) {
taosHashCleanup(schMgmt.Jobs); //TBD taosHashCleanup(schMgmt.Jobs); //TBD
......
...@@ -217,22 +217,20 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "VGroup does not exist ...@@ -217,22 +217,20 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "VGroup does not exist
// mnode-stable // mnode-stable
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "Stable already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "Stable already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_ID, "Table name too long") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_NOT_EXIST, "Stable not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_NAME, "Table does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STBS, "Too many stables")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_TYPE, "Invalid table type in tsdb") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB, "Invalid stable name")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_OPTION_UNCHNAGED, "Stable options not changed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, "Too many tags") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, "Too many tags")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TIMESERIES, "Too many time series")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_SUPER_TABLE, "Not super table")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COL_NAME_TOO_LONG, "Tag name too long")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREAY_EXIST, "Tag already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREAY_EXIST, "Tag already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, "Tag does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, "Tag does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_ALREAY_EXIST, "Field already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_NOT_EXIST, "Field does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREAY_EXIST, "Column already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STABLE_NAME, "Super table does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_CREATE_TABLE_MSG, "Invalid create table message")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES, "Exceed max row bytes") TAOS_DEFINE_ERROR(TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES, "Exceed max row bytes")
// mnode-func
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FUNC_ALREADY_EXIST, "Func already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_FUNC_ALREADY_EXIST, "Func already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FUNC_NOT_EXIST, "Func not exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_FUNC_NOT_EXIST, "Func not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC, "Invalid func") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC, "Invalid func")
...@@ -241,9 +239,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment" ...@@ -241,9 +239,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_COMMENT, "Invalid func comment"
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_CODE, "Invalid func code") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_CODE, "Invalid func code")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_BUFSIZE, "Invalid func bufSize") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_BUFSIZE, "Invalid func bufSize")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TAG_LENGTH, "invalid tag length")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_COLUMN_LENGTH, "invalid column length")
// dnode // dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_EXITING, "Dnode is exiting") TAOS_DEFINE_ERROR(TSDB_CODE_DND_EXITING, "Dnode is exiting")
...@@ -501,6 +496,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready" ...@@ -501,6 +496,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready"
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error")
//scheduler
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
......
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
ADD_SUBDIRECTORY(shell)
ADD_SUBDIRECTORY(taosdemo)
ADD_SUBDIRECTORY(taosdump)
ADD_SUBDIRECTORY(taospack)
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
PROJECT(TDengine)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
INCLUDE_DIRECTORIES(inc)
IF (TD_LINUX)
AUX_SOURCE_DIRECTORY(./src SRC)
LIST(REMOVE_ITEM SRC ./src/shellWindows.c)
LIST(REMOVE_ITEM SRC ./src/shellDarwin.c)
ADD_EXECUTABLE(shell ${SRC})
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
SET(LINK_JEMALLOC "-L${CMAKE_BINARY_DIR}/build/lib -ljemalloc")
ELSE ()
SET(LINK_JEMALLOC "")
ENDIF ()
IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(shell taos_static lua ${LINK_JEMALLOC})
ELSE ()
TARGET_LINK_LIBRARIES(shell taos lua ${LINK_JEMALLOC})
ENDIF ()
SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos)
ELSEIF (TD_WINDOWS)
LIST(APPEND SRC ./src/shellEngine.c)
LIST(APPEND SRC ./src/shellMain.c)
LIST(APPEND SRC ./src/shellWindows.c)
ADD_EXECUTABLE(shell ${SRC})
TARGET_LINK_LIBRARIES(shell taos_static)
IF (TD_POWER)
SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME power)
ELSE ()
SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos)
ENDIF ()
ELSEIF (TD_DARWIN)
LIST(APPEND SRC ./src/shellEngine.c)
LIST(APPEND SRC ./src/shellMain.c)
LIST(APPEND SRC ./src/shellDarwin.c)
LIST(APPEND SRC ./src/shellCommand.c)
LIST(APPEND SRC ./src/shellImport.c)
LIST(APPEND SRC ./src/shellCheck.c)
ADD_EXECUTABLE(shell ${SRC})
# linking with dylib
TARGET_LINK_LIBRARIES(shell taos)
# linking taos statically
# TARGET_LINK_LIBRARIES(shell taos_static)
SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos)
ENDIF ()
#add_subdirectory(shell)
\ No newline at end of file
aux_source_directory(src SHELL_SRC)
list(REMOVE_ITEM SHELL_SRC ./src/shellWindows.c)
list(REMOVE_ITEM SHELL_SRC ./src/shellDarwin.c)
add_executable(shell ${SHELL_SRC})
target_link_libraries(
shell
PUBLIC taos
PUBLIC util
PUBLIC os
)
SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册