提交 75e2ee10 编写于 作者: L Liu Jicong

tq consume refactor

...@@ -106,6 +106,10 @@ pipeline { ...@@ -106,6 +106,10 @@ pipeline {
abortPreviousBuilds() abortPreviousBuilds()
} }
pre_test() pre_test()
sh'''
cd ${WKC}/tests
./test-all.sh b1fq
'''
} }
} }
// stage('Parallel test stage') { // stage('Parallel test stage') {
......
# stub
ExternalProject_Add(stub
GIT_REPOSITORY https://github.com/coolxv/cpp-stub.git
GIT_SUBMODULES "src"
SOURCE_DIR "${CMAKE_CONTRIB_DIR}/cpp-stub"
BINARY_DIR "${CMAKE_CONTRIB_DIR}/cpp-stub/src"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
...@@ -12,6 +12,7 @@ configure_file("${CMAKE_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE} ...@@ -12,6 +12,7 @@ configure_file("${CMAKE_SUPPORT_DIR}/deps_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}
# googletest # googletest
if(${BUILD_TEST}) if(${BUILD_TEST})
cat("${CMAKE_SUPPORT_DIR}/gtest_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${CMAKE_SUPPORT_DIR}/gtest_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${CMAKE_SUPPORT_DIR}/stub_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_TEST}) endif(${BUILD_TEST})
# lz4 # lz4
...@@ -79,6 +80,11 @@ execute_process(COMMAND "${CMAKE_COMMAND}" --build . ...@@ -79,6 +80,11 @@ execute_process(COMMAND "${CMAKE_COMMAND}" --build .
# googletest # googletest
if(${BUILD_TEST}) if(${BUILD_TEST})
add_subdirectory(googletest) add_subdirectory(googletest)
target_include_directories(
gtest
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src>
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src_linux>
)
endif(${BUILD_TEST}) endif(${BUILD_TEST})
# cJson # cJson
......
...@@ -51,7 +51,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) ...@@ -51,7 +51,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET_CUR, "mq-set-cur" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET_CUR, "mq-set-cur" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RSP_READY, "rsp-ready" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RES_READY, "res-ready" )
// message from client to mnode // message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )
...@@ -78,8 +78,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB, "create-stb" ) ...@@ -78,8 +78,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB, "create-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB, "alter-stb" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB, "alter-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB, "drop-stb" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB, "drop-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_VGROUP_LIST, "vgroup-list" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_VGROUP_LIST, "vgroup-list" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_STREAM, "kill-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_HEARTBEAT, "heartbeat" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_HEARTBEAT, "heartbeat" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" )
...@@ -94,22 +93,22 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) ...@@ -94,22 +93,22 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
// message from vnode to dnode // message from vnode to dnode
// message from mnode to vnode // message from mnode to vnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB_IN, "create-stb-in" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB_IN, "create-stb-internal" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB_IN, "alter-stb-in" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB_IN, "alter-stb-internal" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB_IN, "drop-stb-in" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB_IN, "drop-stb-internal" )
// message from mnode to mnode // message from mnode to mnode
// message from mnode to qnode // message from mnode to qnode
// message from mnode to dnode // message from mnode to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode-internal" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode-internal" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode-internal" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode-internal" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode-internal" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode-internal" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode-internal" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_MNODE_IN, "alter-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_MNODE_IN, "alter-mnode-internal" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode-internal" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode-internal" )
// message from qnode to vnode // message from qnode to vnode
// message from qnode to mnode // message from qnode to mnode
...@@ -312,11 +311,11 @@ typedef struct { ...@@ -312,11 +311,11 @@ typedef struct {
typedef struct { typedef struct {
int32_t len; // one create table message int32_t len; // one create table message
char tableName[TSDB_TABLE_FNAME_LEN]; char tableName[TSDB_TABLE_FNAME_LEN];
int8_t igExists;
int8_t getMeta;
int16_t numOfTags; int16_t numOfTags;
int16_t numOfColumns; int16_t numOfColumns;
int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string
int8_t igExists;
int8_t rspMeta;
int8_t reserved[16]; int8_t reserved[16];
char schema[]; char schema[];
} SCreateTableMsg; } SCreateTableMsg;
...@@ -341,16 +340,37 @@ typedef struct { ...@@ -341,16 +340,37 @@ typedef struct {
} SAlterStbMsg; } SAlterStbMsg;
typedef struct { typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN]; SMsgHead head;
char db[TSDB_FULL_DB_NAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int16_t type; /* operation type */ uint64_t suid;
int16_t numOfCols; /* number of schema */ int32_t sverson;
int32_t tagValLen; uint32_t ttl;
SSchema schema[]; uint32_t keep;
// tagVal is padded after schema int32_t numOfTags;
// char tagVal[]; int32_t numOfColumns;
SSchema pSchema[];
} SCreateStbInternalMsg;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
uint64_t suid;
} SDropStbInternalMsg;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
int8_t type; /* operation type */
int32_t numOfCols; /* number of schema */
int32_t numOfTags;
char data[];
} SAlterTableMsg; } SAlterTableMsg;
typedef struct {
SMsgHead head;
char name[TSDB_TABLE_FNAME_LEN];
} SDropTableMsg;
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int64_t uid; int64_t uid;
...@@ -591,8 +611,8 @@ typedef struct { ...@@ -591,8 +611,8 @@ typedef struct {
int32_t daysToKeep0; int32_t daysToKeep0;
int32_t daysToKeep1; int32_t daysToKeep1;
int32_t daysToKeep2; int32_t daysToKeep2;
int32_t minRowsPerFileBlock; int32_t minRows;
int32_t maxRowsPerFileBlock; int32_t maxRows;
int32_t commitTime; int32_t commitTime;
int32_t fsyncPeriod; int32_t fsyncPeriod;
int8_t walLevel; int8_t walLevel;
...@@ -706,7 +726,7 @@ typedef struct { ...@@ -706,7 +726,7 @@ typedef struct {
SVnodeLoad data[]; SVnodeLoad data[];
} SVnodeLoads; } SVnodeLoads;
typedef struct SStatusMsg { typedef struct {
int32_t sver; int32_t sver;
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int32_t clusterId;
...@@ -756,6 +776,7 @@ typedef struct { ...@@ -756,6 +776,7 @@ typedef struct {
int32_t dnodeId; int32_t dnodeId;
char db[TSDB_FULL_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
uint64_t dbUid; uint64_t dbUid;
int32_t vgVersion;
int32_t cacheBlockSize; int32_t cacheBlockSize;
int32_t totalBlocks; int32_t totalBlocks;
int32_t daysPerFile; int32_t daysPerFile;
...@@ -942,18 +963,6 @@ typedef struct { ...@@ -942,18 +963,6 @@ typedef struct {
char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; // include subqueries' index, Obj IDs and states(C-complete/I-imcomplete) char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; // include subqueries' index, Obj IDs and states(C-complete/I-imcomplete)
} SQueryDesc; } SQueryDesc;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
char dstTable[TSDB_TABLE_NAME_LEN];
int32_t streamId;
int64_t num; // number of computing/cycles
int64_t useconds;
int64_t ctime;
int64_t stime;
int64_t slidingTime;
int64_t interval;
} SStreamDesc;
typedef struct { typedef struct {
int32_t connId; int32_t connId;
int32_t pid; int32_t pid;
...@@ -1090,6 +1099,7 @@ typedef struct { ...@@ -1090,6 +1099,7 @@ typedef struct {
} SUpdateTagValRsp; } SUpdateTagValRsp;
typedef struct SSchedulerQueryMsg { typedef struct SSchedulerQueryMsg {
uint64_t schedulerId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
uint32_t contentLen; uint32_t contentLen;
...@@ -1097,15 +1107,39 @@ typedef struct SSchedulerQueryMsg { ...@@ -1097,15 +1107,39 @@ typedef struct SSchedulerQueryMsg {
} SSchedulerQueryMsg; } SSchedulerQueryMsg;
typedef struct SSchedulerReadyMsg { typedef struct SSchedulerReadyMsg {
uint64_t schedulerId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
} SSchedulerReadyMsg; } SSchedulerReadyMsg;
typedef struct SSchedulerFetchMsg { typedef struct SSchedulerFetchMsg {
uint64_t schedulerId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
} SSchedulerFetchMsg; } SSchedulerFetchMsg;
typedef struct SSchedulerStatusMsg {
uint64_t schedulerId;
} SSchedulerStatusMsg;
typedef struct STaskStatus {
uint64_t queryId;
uint64_t taskId;
int8_t status;
} STaskStatus;
typedef struct SSchedulerStatusRsp {
uint32_t num;
STaskStatus status[];
} SSchedulerStatusRsp;
typedef struct SSchedulerCancelMsg {
uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
} SSchedulerCancelMsg;
#pragma pack(pop) #pragma pack(pop)
......
...@@ -35,7 +35,7 @@ enum { ...@@ -35,7 +35,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SELECT, "select" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SELECT, "select" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_FETCH, "fetch" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_INSERT, "insert" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAGS_VAL, "update-tag-val" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_UPDATE_TAG_VAL, "update-tag-val" )
// the SQL below is for mgmt node // the SQL below is for mgmt node
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
...@@ -54,7 +54,7 @@ enum { ...@@ -54,7 +54,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_TABLE, "alter-table" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_TABLE, "alter-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_DB, "alter-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_DB, "alter-db" )
TSDB_DEFINE_SQL_TYPE(TSDB_SQL_SYNC_DB_REPLICA, "sync db-replica") TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SYNC_DB_REPLICA, "sync db-replica")
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_MNODE, "create-mnode" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_MNODE, "create-mnode" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_MNODE, "drop-mnode" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_MNODE, "drop-mnode" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DNODE, "create-dnode" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DNODE, "create-dnode" )
...@@ -102,14 +102,6 @@ enum { ...@@ -102,14 +102,6 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MAX, "max" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MAX, "max" )
}; };
// create table operation type
enum TSQL_CREATE_TABLE_TYPE {
TSQL_CREATE_TABLE = 0x1,
TSQL_CREATE_STABLE = 0x2,
TSQL_CREATE_CTABLE = 0x3,
TSQL_CREATE_STREAM = 0x4,
};
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
#ifndef TDENGINE_TNAME_H #ifndef TDENGINE_TNAME_H
#define TDENGINE_TNAME_H #define TDENGINE_TNAME_H
//#include "taosmsg.h"
#define TSDB_DB_NAME_T 1 #define TSDB_DB_NAME_T 1
#define TSDB_TABLE_NAME_T 2 #define TSDB_TABLE_NAME_T 2
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "trpc.h"
typedef struct { typedef struct {
uint64_t numOfStartTask; uint64_t numOfStartTask;
...@@ -32,48 +32,6 @@ typedef struct { ...@@ -32,48 +32,6 @@ typedef struct {
uint64_t numOfErrors; uint64_t numOfErrors;
} SQnodeStat; } SQnodeStat;
/* start Task msg */
typedef struct {
uint32_t schedulerIp;
uint16_t schedulerPort;
int64_t taskId;
int64_t queryId;
uint32_t srcIp;
uint16_t srcPort;
} SQnodeStartTaskMsg;
/* stop Task msg */
typedef struct {
int64_t taskId;
} SQnodeStopTaskMsg;
/* start/stop Task msg response */
typedef struct {
int64_t taskId;
int32_t code;
} SQnodeTaskRespMsg;
/* Task status msg */
typedef struct {
int64_t taskId;
int32_t status;
int64_t queryId;
} SQnodeTaskStatusMsg;
/* Qnode/Scheduler heartbeat msg */
typedef struct {
int32_t status;
int32_t load;
} SQnodeHeartbeatMsg;
/* Qnode sent/received msg */
typedef struct {
int8_t msgType;
int32_t msgLen;
char msg[];
} SQnodeMsg;
/** /**
* Start one Qnode in Dnode. * Start one Qnode in Dnode.
......
...@@ -142,6 +142,36 @@ int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); ...@@ -142,6 +142,36 @@ int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
*/ */
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/**
* @brief Process a query message.
*
* @param pVnode The vnode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure
*/
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/**
* @brief Process a fetch message.
*
* @param pVnode The vnode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure
*/
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/**
* @brief Process a consume message.
*
* @param pVnode The vnode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure
*/
int vnodeProcessConsumeReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/* ------------------------ SVnodeCfg ------------------------ */ /* ------------------------ SVnodeCfg ------------------------ */
/** /**
* @brief Initialize VNODE options. * @brief Initialize VNODE options.
...@@ -180,73 +210,52 @@ typedef struct { ...@@ -180,73 +210,52 @@ typedef struct {
char info[]; char info[];
} SVnodeRsp; } SVnodeRsp;
#define VNODE_INIT_CREATE_STB_REQ(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) \ static FORCE_INLINE void vnodeSetCreateStbReq(SVnodeReq *pReq, char *name, uint32_t ttl, uint32_t keep, tb_uid_t suid,
{ .ver = 0, .ctReq = META_INIT_STB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) } STSchema *pSchema, STSchema *pTagSchema) {
pReq->ver = 0;
pReq->ctReq.name = name;
pReq->ctReq.ttl = ttl;
pReq->ctReq.keep = keep;
pReq->ctReq.type = META_SUPER_TABLE;
pReq->ctReq.stbCfg.suid = suid;
pReq->ctReq.stbCfg.pSchema = pSchema;
pReq->ctReq.stbCfg.pTagSchema = pTagSchema;
}
static FORCE_INLINE void vnodeSetCreateCtbReq(SVnodeReq *pReq, char *name, uint32_t ttl, uint32_t keep, tb_uid_t suid,
SKVRow pTag) {
pReq->ver = 0;
#define VNODE_INIT_CREATE_CTB_REQ(NAME, TTL, KEEP, SUID, PTAG) \ pReq->ctReq.name = name;
{ .ver = 0, .ctReq = META_INIT_CTB_CFG(NAME, TTL, KEEP, SUID, PTAG) } pReq->ctReq.ttl = ttl;
pReq->ctReq.keep = keep;
pReq->ctReq.type = META_CHILD_TABLE;
pReq->ctReq.ctbCfg.suid = suid;
pReq->ctReq.ctbCfg.pTag = pTag;
}
#define VNODE_INIT_CREATE_NTB_REQ(NAME, TTL, KEEP, SUID, PSCHEMA) \ static FORCE_INLINE void vnodeSetCreateNtbReq(SVnodeReq *pReq, char *name, uint32_t ttl, uint32_t keep,
{ .ver = 0, .ctReq = META_INIT_NTB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA) } STSchema *pSchema) {
pReq->ver = 0;
pReq->ctReq.name = name;
pReq->ctReq.ttl = ttl;
pReq->ctReq.keep = keep;
pReq->ctReq.type = META_NORMAL_TABLE;
pReq->ctReq.ntbCfg.pSchema = pSchema;
}
int vnodeBuildReq(void **buf, const SVnodeReq *pReq, uint8_t type); int vnodeBuildReq(void **buf, const SVnodeReq *pReq, uint8_t type);
void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type); void *vnodeParseReq(void *buf, SVnodeReq *pReq, uint8_t type);
/* ------------------------ FOR COMPILE ------------------------ */ /* ------------------------ FOR COMPILE ------------------------ */
#if 1
#include "taosmsg.h"
#include "trpc.h"
// typedef struct {
// char db[TSDB_FULL_DB_NAME_LEN];
// int32_t cacheBlockSize; // MB
// int32_t totalBlocks;
// int32_t daysPerFile;
// int32_t daysToKeep0;
// int32_t daysToKeep1;
// int32_t daysToKeep2;
// int32_t minRowsPerFileBlock;
// int32_t maxRowsPerFileBlock;
// int8_t precision; // time resolution
// int8_t compression;
// int8_t cacheLastRow;
// int8_t update;
// int8_t quorum;
// int8_t replica;
// int8_t selfIndex;
// int8_t walLevel;
// int32_t fsyncPeriod; // millisecond
// SReplica replicas[TSDB_MAX_REPLICA];
// } SVnodeCfg;
typedef enum {
VN_MSG_TYPE_WRITE = 1,
VN_MSG_TYPE_APPLY,
VN_MSG_TYPE_SYNC,
VN_MSG_TYPE_QUERY,
VN_MSG_TYPE_FETCH
} EVnMsgType;
typedef struct {
int32_t curNum;
int32_t allocNum;
SRpcMsg rpcMsg[];
} SVnodeMsg;
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
SVnodeMsg *vnodeInitMsg(int32_t msgNum);
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg);
void vnodeCleanupMsg(SVnodeMsg *pMsg);
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -23,63 +23,67 @@ ...@@ -23,63 +23,67 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SIndex SIndex; typedef struct SIndex SIndex;
typedef struct SIndexTerm SIndexTerm; typedef struct SIndexTerm SIndexTerm;
typedef struct SIndexOpts SIndexOpts; typedef struct SIndexOpts SIndexOpts;
typedef struct SIndexMultiTermQuery SIndexMultiTermQuery; typedef struct SIndexMultiTermQuery SIndexMultiTermQuery;
typedef struct SArray SIndexMultiTerm; typedef struct SArray SIndexMultiTerm;
typedef enum { typedef enum {
ADD_VALUE, // add index colume value ADD_VALUE, // add index colume value
DEL_VALUE, // delete index column value DEL_VALUE, // delete index column value
UPDATE_VALUE, // update index column value UPDATE_VALUE, // update index column value
ADD_INDEX, // add index on specify column ADD_INDEX, // add index on specify column
DROP_INDEX, // drop existed index DROP_INDEX, // drop existed index
DROP_SATBLE // drop stable DROP_SATBLE // drop stable
} SIndexOperOnColumn; } SIndexOperOnColumn;
typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType; typedef enum { MUST = 0, SHOULD = 1, NOT = 2 } EIndexOperatorType;
typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2,QUERY_REGEX = 3} EIndexQueryType; typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2, QUERY_REGEX = 3 } EIndexQueryType;
/* /*
* @param: oper * @param: oper
* *
*/ */
SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType oper); SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType oper);
void indexMultiTermQueryDestroy(SIndexMultiTermQuery *pQuery); void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery);
int indexMultiTermQueryAdd(SIndexMultiTermQuery *pQuery, SIndexTerm *term, EIndexQueryType type); int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EIndexQueryType type);
/* /*
* @param: * @param:
* @param: * @param:
*/ */
int indexOpen(SIndexOpts *opt, const char *path, SIndex **index); int indexOpen(SIndexOpts* opt, const char* path, SIndex** index);
void indexClose(SIndex *index); void indexClose(SIndex* index);
int indexPut(SIndex *index, SIndexMultiTerm *terms, uint64_t uid); int indexPut(SIndex* index, SIndexMultiTerm* terms, uint64_t uid);
int indexDelete(SIndex *index, SIndexMultiTermQuery *query); int indexDelete(SIndex* index, SIndexMultiTermQuery* query);
int indexSearch(SIndex *index, SIndexMultiTermQuery *query, SArray *result); int indexSearch(SIndex* index, SIndexMultiTermQuery* query, SArray* result);
int indexRebuild(SIndex *index, SIndexOpts *opt); int indexRebuild(SIndex* index, SIndexOpts* opt);
/* /*
* @param * @param
* @param * @param
*/ */
SIndexMultiTerm *indexMultiTermCreate(); SIndexMultiTerm* indexMultiTermCreate();
int indexMultiTermAdd(SIndexMultiTerm *terms, SIndexTerm *term); int indexMultiTermAdd(SIndexMultiTerm* terms, SIndexTerm* term);
void indexMultiTermDestroy(SIndexMultiTerm *terms); void indexMultiTermDestroy(SIndexMultiTerm* terms);
/* /*
* @param: * @param:
* @param: * @param:
*/ */
SIndexOpts *indexOptsCreate(); SIndexOpts* indexOptsCreate();
void indexOptsDestroy(SIndexOpts *opts); void indexOptsDestroy(SIndexOpts* opts);
/* /*
* @param: * @param:
* @param: * @param:
*/ */
SIndexTerm *indexTermCreate(int64_t suid, SIndexOperOnColumn operType, uint8_t colType, SIndexTerm* indexTermCreate(int64_t suid,
const char *colName, int32_t nColName, const char *colVal, int32_t nColVal); SIndexOperOnColumn operType,
void indexTermDestroy(SIndexTerm *p); uint8_t colType,
const char* colName,
int32_t nColName,
const char* colVal,
int32_t nColVal);
void indexTermDestroy(SIndexTerm* p);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_PARSENODES_H_
#define _TD_PARSENODES_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "catalog.h"
#include "common.h"
#include "function.h"
#include "tmsgtype.h"
#include "tname.h"
#include "tvariant.h"
/*
* The first field of a node of any type is guaranteed to be the int16_t.
* Hence the type of any node can be gotten by casting it to SQueryNode.
*/
typedef struct SQueryNode {
int16_t type;
} SQueryNode;
#define nodeType(nodeptr) (((const SQueryNode*)(nodeptr))->type)
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
int16_t bytes;
} SField;
typedef struct SParseBasicCtx {
const char *db;
int32_t acctId;
uint64_t requestId;
} SParseBasicCtx;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
SField *final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
char * cond;
} SCond;
typedef struct SJoinNode {
uint64_t uid;
int16_t tagColId;
SArray* tsJoin;
SArray* tagJoin;
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
SJoinNode *joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
typedef struct STagCond {
int16_t relType; // relation between tbname list and query condition, including : TK_AND or TK_OR
SCond tbnameCond; // tbname query condition, only support tbname query condition on one table
SJoinInfo joinInfo; // join condition, only support two tables join currently
SArray *pCond; // for different table, the query condition must be seperated
} STagCond;
typedef struct STableMetaInfo {
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
SVgroupsInfo *vgroupList;
SName name;
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
int16_t type; // normal column/tag/ user input constant column
} SColumnIndex;
// select statement
typedef struct SQueryStmtInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
STimeWindow window; // the whole query time window
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SStateWindow stateWindow; // state window query
SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray** exprList; // SArray<SExprInfo*>
SLimit limit;
SLimit slimit;
STagCond tagCond;
SArray * colCond;
SArray * order;
int16_t numOfTables;
int16_t curTableIdx;
STableMetaInfo **pTableMetaInfo;
struct STSBuf *tsBuf;
int16_t fillType; // final result fill type
int64_t * fillVal; // default value for fill
int32_t numOfFillVal; // fill value size
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int32_t bufLen;
char* buf;
SArray *pUdfInfo;
struct SQueryStmtInfo *sibling; // sibling
struct SQueryStmtInfo *pDownstream;
SMultiFunctionsDesc info;
SArray *pUpstream; // SArray<struct SQueryStmtInfo>
int32_t havingFieldNum;
int32_t exprListLevelIndex;
} SQueryStmtInfo;
typedef enum {
PAYLOAD_TYPE_KV = 0,
PAYLOAD_TYPE_RAW = 1,
} EPayloadType;
typedef struct SVgDataBlocks {
SVgroupInfo vg;
int32_t numOfTables; // number of tables in current submit block
uint32_t size;
char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
} SVgDataBlocks;
typedef struct SInsertStmtInfo {
int16_t nodeType;
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
} SInsertStmtInfo;
#ifdef __cplusplus
}
#endif
#endif /*_TD_PARSENODES_H_*/
...@@ -20,108 +20,7 @@ ...@@ -20,108 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#include "catalog.h" #include "parsenodes.h"
#include "common.h"
#include "tname.h"
#include "tvariant.h"
#include "function.h"
typedef struct SField {
char name[TSDB_COL_NAME_LEN];
uint8_t type;
int16_t bytes;
} SField;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
SField *final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SCond {
uint64_t uid;
int32_t len; // length of tag query condition data
char * cond;
} SCond;
typedef struct SJoinNode {
uint64_t uid;
int16_t tagColId;
SArray* tsJoin;
SArray* tagJoin;
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
SJoinNode *joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
typedef struct STagCond {
int16_t relType; // relation between tbname list and query condition, including : TK_AND or TK_OR
SCond tbnameCond; // tbname query condition, only support tbname query condition on one table
SJoinInfo joinInfo; // join condition, only support two tables join currently
SArray *pCond; // for different table, the query condition must be seperated
} STagCond;
typedef struct STableMetaInfo {
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
SVgroupsInfo *vgroupList;
SName name;
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
typedef struct SQueryStmtInfo {
int16_t command; // the command may be different for each subclause, so keep it seperately.
uint32_t type; // query/insert type
STimeWindow window; // the whole query time window
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SStateWindow stateWindow; // state window query
SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray** exprList; // SArray<SExprInfo*>
SLimit limit;
SLimit slimit;
STagCond tagCond;
SArray * colCond;
SArray * order;
int16_t numOfTables;
int16_t curTableIdx;
STableMetaInfo **pTableMetaInfo;
struct STSBuf *tsBuf;
int16_t fillType; // final result fill type
int64_t * fillVal; // default value for fill
int32_t numOfFillVal; // fill value size
char * msg; // pointer to the pCmd->payload to keep error message temporarily
int64_t clauseLimit; // limit for current sub clause
int64_t prjOffset; // offset value in the original sql expression, only applied at client side
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int32_t bufLen;
char* buf;
SArray *pUdfInfo;
struct SQueryStmtInfo *sibling; // sibling
struct SQueryStmtInfo *pDownstream;
SMultiFunctionsDesc info;
SArray *pUpstream; // SArray<struct SQueryStmtInfo>
int32_t havingFieldNum;
int32_t exprListLevelIndex;
} SQueryStmtInfo;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
int16_t type; // normal column/tag/ user input constant column
} SColumnIndex;
struct SInsertStmtInfo;
/** /**
* True will be returned if the input sql string is insert, false otherwise. * True will be returned if the input sql string is insert, false otherwise.
...@@ -132,17 +31,16 @@ struct SInsertStmtInfo; ...@@ -132,17 +31,16 @@ struct SInsertStmtInfo;
bool qIsInsertSql(const char* pStr, size_t length); bool qIsInsertSql(const char* pStr, size_t length);
typedef struct SParseContext { typedef struct SParseContext {
const char* pAcctId; SParseBasicCtx ctx;
const char* pDbname; void *pRpc;
void *pRpc; struct SCatalog *pCatalog;
const char* pClusterId; const SEpSet *pEpSet;
const SEpSet* pEpSet; int64_t id; // query id, generated by uuid generator
int64_t id; // query id, generated by uuid generator int8_t schemaAttached; // denote if submit block is built with table schema or not
int8_t schemaAttached; // denote if submit block is built with table schema or not const char *pSql; // sql string
const char* pSql; // sql string size_t sqlLen; // length of the sql string
size_t sqlLen; // length of the sql string char *pMsg; // extended error message if exists to help avoid the problem in sql statement.
char* pMsg; // extended error message if exists to help avoid the problem in sql statement. int32_t msgLen; // max length of the msg
int32_t msgLen; // max length of the msg
} SParseContext; } SParseContext;
/** /**
...@@ -153,27 +51,7 @@ typedef struct SParseContext { ...@@ -153,27 +51,7 @@ typedef struct SParseContext {
* @param msg extended error message if exists. * @param msg extended error message if exists.
* @return error code * @return error code
*/ */
int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t* type, void** pOutput, int32_t* outputLen, char* msg, int32_t msgLen); int32_t qParseQuerySql(const char* pStr, size_t length, SParseBasicCtx* pParseCtx, int32_t* type, void** pOutput, int32_t* outputLen, char* msg, int32_t msgLen);
typedef enum {
PAYLOAD_TYPE_KV = 0,
PAYLOAD_TYPE_RAW = 1,
} EPayloadType;
typedef struct SVgDataBlocks {
int64_t vgId; // virtual group id
int32_t numOfTables; // number of tables in current submit block
uint32_t size;
char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
} SVgDataBlocks;
typedef struct SInsertStmtInfo {
SArray* pDataBlocks; // data block for each vgroup, SArray<SVgDataBlocks*>.
int8_t schemaAttache; // denote if submit block is built with table schema or not
uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
uint32_t insertType; // insert data from [file|sql statement| bound statement]
const char* sql; // current sql statement position
} SInsertStmtInfo;
/** /**
* Parse the insert sql statement. * Parse the insert sql statement.
...@@ -210,9 +88,9 @@ typedef struct SSourceParam { ...@@ -210,9 +88,9 @@ typedef struct SSourceParam {
SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, const char* funcName, SSourceParam* pSource, SSchema* pResSchema, int16_t interSize); SExprInfo* createExprInfo(STableMetaInfo* pTableMetaInfo, const char* funcName, SSourceParam* pSource, SSchema* pResSchema, int16_t interSize);
int32_t copyExprInfoList(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); int32_t copyExprInfoList(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
int32_t copyAllExprInfo(SArray* dst, const SArray* src, bool deepcopy); int32_t copyAllExprInfo(SArray* dst, const SArray* src, bool deepcopy);
int32_t getExprFunctionLevel(SQueryStmtInfo* pQueryInfo); int32_t getExprFunctionLevel(const SQueryStmtInfo* pQueryInfo);
STableMetaInfo* getMetaInfo(SQueryStmtInfo* pQueryInfo, int32_t tableIndex); STableMetaInfo* getMetaInfo(const SQueryStmtInfo* pQueryInfo, int32_t tableIndex);
SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex); SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex);
SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name); SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name);
......
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
#define QUERY_TYPE_MERGE 1 #define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2 #define QUERY_TYPE_PARTIAL 2
#define QUERY_TYPE_SCAN 3 #define QUERY_TYPE_SCAN 3
#define QUERY_TYPE_MODIFY 4
enum OPERATOR_TYPE_E { enum OPERATOR_TYPE_E {
OP_Unknown, OP_Unknown,
...@@ -58,18 +59,17 @@ typedef struct SQueryNodeBasicInfo { ...@@ -58,18 +59,17 @@ typedef struct SQueryNodeBasicInfo {
typedef struct SDataSink { typedef struct SDataSink {
SQueryNodeBasicInfo info; SQueryNodeBasicInfo info;
SDataBlockSchema schema;
} SDataSink; } SDataSink;
typedef struct SDataDispatcher { typedef struct SDataDispatcher {
SDataSink sink; SDataSink sink;
// todo
} SDataDispatcher; } SDataDispatcher;
typedef struct SDataInserter { typedef struct SDataInserter {
SDataSink sink; SDataSink sink;
uint64_t uid; // unique id of the table int32_t numOfTables;
// todo data field uint32_t size;
char *pData;
} SDataInserter; } SDataInserter;
typedef struct SPhyNode { typedef struct SPhyNode {
...@@ -119,12 +119,13 @@ typedef struct SSubplanId { ...@@ -119,12 +119,13 @@ typedef struct SSubplanId {
typedef struct SSubplan { typedef struct SSubplan {
SSubplanId id; // unique id of the subplan SSubplanId id; // unique id of the subplan
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN
int32_t level; // the execution level of current subplan, starting from 0. int32_t level; // the execution level of current subplan, starting from 0.
SEpSet execEpSet; // for the scan sub plan, the optional execution node SEpSet execEpSet; // for the scan sub plan, the optional execution node
SArray *pChildern; // the datasource subplan,from which to fetch the result SArray *pChildern; // the datasource subplan,from which to fetch the result
SArray *pParents; // the data destination subplan, get data from current subplan SArray *pParents; // the data destination subplan, get data from current subplan
SPhyNode *pNode; // physical plan of current subplan SPhyNode *pNode; // physical plan of current subplan
SDataSink *pDataSink; // data of the subplan flow into the datasink
} SSubplan; } SSubplan;
typedef struct SQueryDag { typedef struct SQueryDag {
...@@ -133,10 +134,12 @@ typedef struct SQueryDag { ...@@ -133,10 +134,12 @@ typedef struct SQueryDag {
SArray *pSubplans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. SArray *pSubplans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
} SQueryDag; } SQueryDag;
struct SQueryNode;
/** /**
* Create the physical plan for the query, according to the AST. * Create the physical plan for the query, according to the AST.
*/ */
int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag); int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, struct SQueryDag** pDag);
// Set datasource of this subplan, multiple calls may be made to a subplan. // Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule // @subplan subplan to be schedule
...@@ -144,7 +147,7 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* ...@@ -144,7 +147,7 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
// @ep one execution location of this group of datasource subplans // @ep one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep);
int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str); int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str);
/** /**
* Convert to subplan to string for the scheduler to send to the executor * Convert to subplan to string for the scheduler to send to the executor
......
...@@ -24,6 +24,15 @@ extern "C" { ...@@ -24,6 +24,15 @@ extern "C" {
#include "thash.h" #include "thash.h"
#include "tlog.h" #include "tlog.h"
enum {
JOB_TASK_STATUS_NOT_START = 1,
JOB_TASK_STATUS_EXECUTING,
JOB_TASK_STATUS_SUCCEED,
JOB_TASK_STATUS_FAILED,
JOB_TASK_STATUS_CANCELLING,
JOB_TASK_STATUS_CANCELLED
};
typedef struct STableComInfo { typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision uint8_t precision; // the number of precision
...@@ -81,15 +90,27 @@ typedef struct STableMetaOutput { ...@@ -81,15 +90,27 @@ typedef struct STableMetaOutput {
STableMeta *tbMeta; STableMeta *tbMeta;
} STableMetaOutput; } STableMetaOutput;
typedef int32_t __async_exec_fn_t(void* param);
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); int32_t initTaskQueue();
extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize); int32_t cleanupTaskQueue();
/**
*
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
SSchema* tGetTbnameColumnSchema(); SSchema* tGetTbnameColumnSchema();
extern void msgInit(); void msgInit();
extern int32_t qDebugFlag; extern int32_t (*queryBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0) #define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", qDebugFlag, __VA_ARGS__); }} while(0)
#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0) #define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", qDebugFlag, __VA_ARGS__); }} while(0)
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_QWORKER_H_
#define _TD_QWORKER_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "trpc.h"
typedef struct SQWorkerCfg {
uint32_t maxSchedulerNum;
uint32_t maxResCacheNum;
uint32_t maxSchTaskNum;
} SQWorkerCfg;
typedef struct {
uint64_t numOfStartTask;
uint64_t numOfStopTask;
uint64_t numOfRecvedFetch;
uint64_t numOfSentHb;
uint64_t numOfSentFetch;
uint64_t numOfTaskInQueue;
uint64_t numOfFetchInQueue;
uint64_t numOfErrors;
} SQWorkerStat;
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt);
int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp);
void qWorkerDestroy(void **qWorkerMgmt);
#ifdef __cplusplus
}
#endif
#endif /*_TD_QWORKER_H_*/
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
typedef struct SSchedulerCfg { typedef struct SSchedulerCfg {
int32_t clusterType; int32_t clusterType;
int32_t maxJobNum;
} SSchedulerCfg; } SSchedulerCfg;
typedef struct SQueryProfileSummary { typedef struct SQueryProfileSummary {
......
...@@ -230,6 +230,10 @@ int32_t* taosGetErrno(); ...@@ -230,6 +230,10 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_FUNC_CODE TAOS_DEF_ERROR_CODE(0, 0x03C5) #define TSDB_CODE_MND_INVALID_FUNC_CODE TAOS_DEF_ERROR_CODE(0, 0x03C5)
#define TSDB_CODE_MND_INVALID_FUNC_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x03C6) #define TSDB_CODE_MND_INVALID_FUNC_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x03C6)
// mnode-trans
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
// dnode // dnode
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0401) #define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0401)
......
...@@ -193,11 +193,10 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p); ...@@ -193,11 +193,10 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p);
/** /**
* Get the corresponding key information for a given data in hash table * Get the corresponding key information for a given data in hash table
* @param pHashObj
* @param data * @param data
* @return * @return
*/ */
int32_t taosHashGetKey(SHashObj *pHashObj, void *data, void** key, size_t* keyLen); int32_t taosHashGetKey(void *data, void** key, size_t* keyLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -24,8 +24,8 @@ extern "C" { ...@@ -24,8 +24,8 @@ extern "C" {
#include "tdef.h" #include "tdef.h"
// create new thread // create new thread
pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param); pthread_t* taosCreateThread(void* (*__start_routine)(void*), void* param);
// destory thread // destory thread
bool taosDestoryThread(pthread_t* pthread); bool taosDestoryThread(pthread_t* pthread);
// thread running return true // thread running return true
bool taosThreadRunning(pthread_t* pthread); bool taosThreadRunning(pthread_t* pthread);
......
...@@ -86,26 +86,32 @@ typedef struct STscObj { ...@@ -86,26 +86,32 @@ typedef struct STscObj {
SAppInstInfo *pAppInfo; SAppInstInfo *pAppInfo;
} STscObj; } STscObj;
typedef struct SClientResultInfo { typedef struct SReqResultInfo {
const char *pMsg; const char *pRspMsg;
const char *pData; const char *pData;
TAOS_FIELD *fields; TAOS_FIELD *fields;
int32_t numOfCols; uint32_t numOfCols;
int32_t numOfRows;
int32_t current;
int32_t *length; int32_t *length;
TAOS_ROW row; TAOS_ROW row;
char **pCol; char **pCol;
} SClientResultInfo;
typedef struct SReqBody { uint32_t numOfRows;
tsem_t rspSem; // not used now uint32_t current;
void* fp; } SReqResultInfo;
void* param;
int32_t paramLen; typedef struct SReqMsg {
int64_t execId; // showId/queryId void *pMsg;
SClientResultInfo* pResInfo; uint32_t len;
} SRequestBody; } SReqMsgInfo;
typedef struct SRequestSendRecvBody {
tsem_t rspSem; // not used now
void* fp;
int64_t execId; // showId/queryId
SReqMsgInfo requestMsg;
SReqResultInfo resInfo;
} SRequestSendRecvBody;
#define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define ERROR_MSG_BUF_DEFAULT_SIZE 512
...@@ -115,7 +121,7 @@ typedef struct SRequestObj { ...@@ -115,7 +121,7 @@ typedef struct SRequestObj {
STscObj *pTscObj; STscObj *pTscObj;
SQueryExecMetric metric; SQueryExecMetric metric;
char *sqlstr; // sql string char *sqlstr; // sql string
SRequestBody body; SRequestSendRecvBody body;
int64_t self; int64_t self;
char *msgBuf; char *msgBuf;
int32_t code; int32_t code;
...@@ -123,11 +129,10 @@ typedef struct SRequestObj { ...@@ -123,11 +129,10 @@ typedef struct SRequestObj {
} SRequestObj; } SRequestObj;
typedef struct SRequestMsgBody { typedef struct SRequestMsgBody {
int32_t msgType; int32_t msgType;
void *pData; SReqMsgInfo msgInfo;
int32_t msgLen; uint64_t requestId;
uint64_t requestId; uint64_t requestObjRefId;
uint64_t requestObjRefId;
} SRequestMsgBody; } SRequestMsgBody;
extern SAppInfo appInfo; extern SAppInfo appInfo;
...@@ -143,13 +148,16 @@ int taos_init(); ...@@ -143,13 +148,16 @@ int taos_init();
void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t port, SAppInstInfo* pAppInfo); void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t port, SAppInstInfo* pAppInfo);
void destroyTscObj(void*pObj); void destroyTscObj(void*pObj);
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type); void *createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(SRequestObj* pRequest); void destroyRequest(SRequestObj* pRequest);
char *getConnectionDB(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db);
void taos_init_imp(void); void taos_init_imp(void);
int taos_options_imp(TSDB_OPTION option, const char *str); int taos_options_imp(TSDB_OPTION option, const char *str);
void* openTransporter(const char *user, const char *auth); void* openTransporter(const char *user, const char *auth, int32_t numOfThreads);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void initMsgHandleFp(); void initMsgHandleFp();
...@@ -158,7 +166,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -158,7 +166,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
void* doFetchRow(SRequestObj* pRequest); void* doFetchRow(SRequestObj* pRequest);
void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -13,17 +13,17 @@ ...@@ -13,17 +13,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "clientInt.h"
#include "clientLog.h"
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "query.h"
#include "clientInt.h"
#include "clientLog.h"
#include "tcache.h" #include "tcache.h"
#include "tconfig.h" #include "tconfig.h"
#include "tglobal.h" #include "tglobal.h"
#include "tnote.h" #include "tnote.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "tsched.h"
#include "ttime.h" #include "ttime.h"
#include "ttimezone.h" #include "ttimezone.h"
...@@ -33,10 +33,8 @@ ...@@ -33,10 +33,8 @@
SAppInfo appInfo; SAppInfo appInfo;
int32_t tscReqRef = -1; int32_t tscReqRef = -1;
int32_t tscConnRef = -1; int32_t tscConnRef = -1;
void *tscQhandle = NULL;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT; static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
int32_t tsNumOfThreads = 1;
volatile int32_t tscInitRes = 0; volatile int32_t tscInitRes = 0;
static void registerRequest(SRequestObj* pRequest) { static void registerRequest(SRequestObj* pRequest) {
...@@ -98,12 +96,12 @@ void closeTransporter(STscObj* pTscObj) { ...@@ -98,12 +96,12 @@ void closeTransporter(STscObj* pTscObj) {
} }
// TODO refactor // TODO refactor
void* openTransporter(const char *user, const char *auth) { void* openTransporter(const char *user, const char *auth, int32_t numOfThread) {
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0; rpcInit.localPort = 0;
rpcInit.label = "TSC"; rpcInit.label = "TSC";
rpcInit.numOfThreads = tsNumOfThreads; rpcInit.numOfThreads = numOfThread;
rpcInit.cfp = processMsgFromServer; rpcInit.cfp = processMsgFromServer;
rpcInit.sessions = tsMaxConnections; rpcInit.sessions = tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
...@@ -170,7 +168,7 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty ...@@ -170,7 +168,7 @@ void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t ty
pRequest->type = type; pRequest->type = type;
pRequest->pTscObj = pObj; pRequest->pTscObj = pObj;
pRequest->body.fp = fp; pRequest->body.fp = fp;
pRequest->body.param = param; // pRequest->body.requestMsg. = param;
pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); pRequest->msgBuf = calloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
tsem_init(&pRequest->body.rspSem, 0, 0); tsem_init(&pRequest->body.rspSem, 0, 0);
...@@ -188,11 +186,7 @@ static void doDestroyRequest(void* p) { ...@@ -188,11 +186,7 @@ static void doDestroyRequest(void* p) {
tfree(pRequest->sqlstr); tfree(pRequest->sqlstr);
tfree(pRequest->pInfo); tfree(pRequest->pInfo);
if (pRequest->body.pResInfo != NULL) { tfree(pRequest->body.resInfo.pRspMsg);
tfree(pRequest->body.pResInfo->pData);
tfree(pRequest->body.pResInfo->pMsg);
tfree(pRequest->body.pResInfo);
}
deregisterRequest(pRequest); deregisterRequest(pRequest);
tfree(pRequest); tfree(pRequest);
...@@ -233,18 +227,8 @@ void taos_init_imp(void) { ...@@ -233,18 +227,8 @@ void taos_init_imp(void) {
taosSetCoreDump(true); taosSetCoreDump(true);
double factor = 4.0; initTaskQueue();
int32_t numOfThreads = MAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2);
int32_t queueSize = tsMaxConnections * 2;
tscQhandle = taosInitScheduler(queueSize, numOfThreads, "tsc");
if (NULL == tscQhandle) {
tscError("failed to init task queue");
tscInitRes = -1;
return;
}
tscDebug("client task queue is initialized, numOfThreads: %d", numOfThreads);
tscConnRef = taosOpenRef(200, destroyTscObj); tscConnRef = taosOpenRef(200, destroyTscObj);
tscReqRef = taosOpenRef(40960, doDestroyRequest); tscReqRef = taosOpenRef(40960, doDestroyRequest);
......
...@@ -102,9 +102,8 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, ...@@ -102,9 +102,8 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
if (pInst == NULL) { if (pInst == NULL) {
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
p->mgmtEp = epSet; p->mgmtEp = epSet;
p->pTransporter = openTransporter(user, secretEncrypt); p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
pInst = &p; pInst = &p;
...@@ -152,11 +151,14 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -152,11 +151,14 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
int32_t type = 0; int32_t type = 0;
void* output = NULL; void* output = NULL;
int32_t outputLen = 0; int32_t outputLen = 0;
code = qParseQuerySql(pRequest->sqlstr, sqlLen, pRequest->requestId, &type, &output, &outputLen, pRequest->msgBuf, ERROR_MSG_BUF_DEFAULT_SIZE);
if (type == TSDB_SQL_CREATE_USER || type == TSDB_SQL_SHOW || type == TSDB_SQL_DROP_USER || type == TSDB_SQL_CREATE_DB) { SParseBasicCtx c = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj)};
code = qParseQuerySql(pRequest->sqlstr, sqlLen, &c, &type, &output, &outputLen, pRequest->msgBuf, ERROR_MSG_BUF_DEFAULT_SIZE);
if (type == TSDB_SQL_CREATE_USER || type == TSDB_SQL_SHOW || type == TSDB_SQL_DROP_USER ||
type == TSDB_SQL_DROP_ACCT || type == TSDB_SQL_CREATE_DB || type == TSDB_SQL_CREATE_ACCT ||
type == TSDB_SQL_CREATE_TABLE || type == TSDB_SQL_USE_DB) {
pRequest->type = type; pRequest->type = type;
pRequest->body.param = output; pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = output, .len = outputLen};
pRequest->body.paramLen = outputLen;
SRequestMsgBody body = {0}; SRequestMsgBody body = {0};
buildRequestMsgFp[type](pRequest, &body); buildRequestMsgFp[type](pRequest, &body);
...@@ -169,6 +171,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { ...@@ -169,6 +171,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
} else { } else {
assert(0); assert(0);
} }
tfree(c.db);
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -255,7 +259,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con ...@@ -255,7 +259,7 @@ STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, con
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT; pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT;
pMsgBody->msgLen = sizeof(SConnectMsg); pMsgBody->msgInfo.len = sizeof(SConnectMsg);
pMsgBody->requestObjRefId = pRequest->self; pMsgBody->requestObjRefId = pRequest->self;
SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg)); SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
...@@ -279,28 +283,28 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) ...@@ -279,28 +283,28 @@ static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody)
pConnect->startTime = htobe64(appInfo.startTime); pConnect->startTime = htobe64(appInfo.startTime);
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
pMsgBody->pData = pConnect; pMsgBody->msgInfo.pMsg = pConnect;
return 0; return 0;
} }
static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody) { static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody) {
assert(pMsgBody != NULL); assert(pMsgBody != NULL);
tfree(pMsgBody->pData); tfree(pMsgBody->msgInfo.pMsg);
} }
int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId) { int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId) {
char *pMsg = rpcMallocCont(pBody->msgLen); char *pMsg = rpcMallocCont(pBody->msgInfo.len);
if (NULL == pMsg) { if (NULL == pMsg) {
tscError("0x%"PRIx64" msg:%s malloc failed", pBody->requestId, taosMsg[pBody->msgType]); tscError("0x%"PRIx64" msg:%s malloc failed", pBody->requestId, taosMsg[pBody->msgType]);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1; return -1;
} }
memcpy(pMsg, pBody->pData, pBody->msgLen); memcpy(pMsg, pBody->msgInfo.pMsg, pBody->msgInfo.len);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = pBody->msgType, .msgType = pBody->msgType,
.pCont = pMsg, .pCont = pMsg,
.contLen = pBody->msgLen, .contLen = pBody->msgInfo.len,
.ahandle = (void*) pBody->requestObjRefId, .ahandle = (void*) pBody->requestObjRefId,
.handle = NULL, .handle = NULL,
.code = 0 .code = 0
...@@ -388,7 +392,7 @@ TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, c ...@@ -388,7 +392,7 @@ TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, c
void* doFetchRow(SRequestObj* pRequest) { void* doFetchRow(SRequestObj* pRequest) {
assert(pRequest != NULL); assert(pRequest != NULL);
SClientResultInfo* pResultInfo = pRequest->body.pResInfo; SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
pRequest->type = TSDB_SQL_RETRIEVE_MNODE; pRequest->type = TSDB_SQL_RETRIEVE_MNODE;
...@@ -421,7 +425,7 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -421,7 +425,7 @@ void* doFetchRow(SRequestObj* pRequest) {
return pResultInfo->row; return pResultInfo->row;
} }
void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL); assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
if (numOfRows == 0) { if (numOfRows == 0) {
return; return;
...@@ -436,8 +440,19 @@ void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 ...@@ -436,8 +440,19 @@ void setResultDataPtr(SClientResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
} }
} }
const char *taos_get_client_info() { return version; } char* getConnectionDB(STscObj* pObj) {
char *p = NULL;
pthread_mutex_lock(&pObj->mutex);
p = strndup(pObj->db, tListLen(pObj->db));
pthread_mutex_unlock(&pObj->mutex);
return p;
}
int taos_affected_rows(TAOS_RES *res) { return 1; } void setConnectionDB(STscObj* pTscObj, const char* db) {
assert(db != NULL && pTscObj != NULL);
pthread_mutex_lock(&pTscObj->mutex);
tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
pthread_mutex_unlock(&pTscObj->mutex);
}
int taos_result_precision(TAOS_RES *res) { return TSDB_TIME_PRECISION_MILLI; }
#include "os.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "os.h" #include "query.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tcache.h"
#include "tconfig.h"
#include "tglobal.h" #include "tglobal.h"
#include "tnote.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "tsched.h"
#include "ttime.h"
#include "ttimezone.h"
#define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0 #define TSC_VAR_RELEASED 0
...@@ -44,9 +39,7 @@ void taos_cleanup(void) { ...@@ -44,9 +39,7 @@ void taos_cleanup(void) {
tscReqRef = -1; tscReqRef = -1;
taosCloseRef(id); taosCloseRef(id);
void* p = tscQhandle; cleanupTaskQueue();
tscQhandle = NULL;
taosCleanUpScheduler(p);
id = tscConnRef; id = tscConnRef;
tscConnRef = -1; tscConnRef = -1;
...@@ -115,12 +108,7 @@ int taos_field_count(TAOS_RES *res) { ...@@ -115,12 +108,7 @@ int taos_field_count(TAOS_RES *res) {
} }
SRequestObj* pRequest = (SRequestObj*) res; SRequestObj* pRequest = (SRequestObj*) res;
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
SClientResultInfo* pResInfo = pRequest->body.pResInfo;
if (pResInfo == NULL) {
return 0;
}
return pResInfo->numOfCols; return pResInfo->numOfCols;
} }
...@@ -133,7 +121,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { ...@@ -133,7 +121,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return NULL; return NULL;
} }
SClientResultInfo* pResInfo = ((SRequestObj*) res)->body.pResInfo; SReqResultInfo* pResInfo = &(((SRequestObj*) res)->body.resInfo);
return pResInfo->fields; return pResInfo->fields;
} }
...@@ -248,7 +236,7 @@ int* taos_fetch_lengths(TAOS_RES *res) { ...@@ -248,7 +236,7 @@ int* taos_fetch_lengths(TAOS_RES *res) {
return NULL; return NULL;
} }
return ((SRequestObj*) res)->body.pResInfo->length; return ((SRequestObj*) res)->body.resInfo.length;
} }
const char *taos_data_type(int type) { const char *taos_data_type(int type) {
...@@ -267,3 +255,9 @@ const char *taos_data_type(int type) { ...@@ -267,3 +255,9 @@ const char *taos_data_type(int type) {
default: return "UNKNOWN"; default: return "UNKNOWN";
} }
} }
const char *taos_get_client_info() { return version; }
int taos_affected_rows(TAOS_RES *res) { return 1; }
int taos_result_precision(TAOS_RES *res) { return TSDB_TIME_PRECISION_MILLI; }
...@@ -16,15 +16,15 @@ ...@@ -16,15 +16,15 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <taoserror.h> #include <taoserror.h>
#include <iostream> #include <iostream>
#include "tglobal.h"
#pragma GCC diagnostic ignored "-Wwrite-strings" #pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function" #pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare" #pragma GCC diagnostic ignored "-Wsign-compare"
#include "../inc/clientInt.h"
#include "taos.h" #include "taos.h"
#include "tglobal.h"
#include "../inc/clientInt.h"
namespace { namespace {
} // namespace } // namespace
...@@ -40,13 +40,13 @@ TEST(testCase, driverInit_Test) { ...@@ -40,13 +40,13 @@ TEST(testCase, driverInit_Test) {
TEST(testCase, connect_Test) { TEST(testCase, connect_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL); // assert(pConn != NULL);
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, create_user_Test) { TEST(testCase, create_user_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL); // assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
...@@ -57,23 +57,36 @@ TEST(testCase, create_user_Test) { ...@@ -57,23 +57,36 @@ TEST(testCase, create_user_Test) {
taos_close(pConn); taos_close(pConn);
} }
//TEST(testCase, drop_user_Test) { TEST(testCase, create_account_Test) {
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "drop user abc");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
TEST(testCase, show_user_Test) { TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, drop_account_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, show_user_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "show users"); TAOS_RES* pRes = taos_query(pConn, "show users");
TAOS_ROW pRow = NULL; TAOS_ROW pRow = NULL;
...@@ -89,10 +102,23 @@ TEST(testCase, show_user_Test) { ...@@ -89,10 +102,23 @@ TEST(testCase, show_user_Test) {
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, show_db_Test) { TEST(testCase, drop_user_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "drop user abc");
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, show_db_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "show databases"); TAOS_RES* pRes = taos_query(pConn, "show databases");
TAOS_ROW pRow = NULL; TAOS_ROW pRow = NULL;
...@@ -112,7 +138,40 @@ TEST(testCase, create_db_Test) { ...@@ -112,7 +138,40 @@ TEST(testCase, create_db_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "create database abc"); TAOS_RES* pRes = taos_query(pConn, "create database abc1");
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_close(pConn);
}
TEST(testCase, use_db_test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
taos_close(pConn);
}
TEST(testCase, create_stable_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)");
TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL); ASSERT_TRUE(pFields == NULL);
......
此差异已折叠。
...@@ -31,7 +31,7 @@ static struct { ...@@ -31,7 +31,7 @@ static struct {
} global = {0}; } global = {0};
void dmnSigintHandle(int signum, void *info, void *ctx) { void dmnSigintHandle(int signum, void *info, void *ctx) {
uError("singal:%d is received", signum); uInfo("singal:%d is received", signum);
global.stop = true; global.stop = true;
} }
......
...@@ -73,7 +73,6 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -73,7 +73,6 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STB] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_VGROUP_LIST] = dndProcessMnodeReadMsg; pMgmt->msgFp[TSDB_MSG_TYPE_VGROUP_LIST] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_STREAM] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dndProcessMnodeReadMsg; pMgmt->msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SHOW] = dndProcessMnodeReadMsg; pMgmt->msgFp[TSDB_MSG_TYPE_SHOW] = dndProcessMnodeReadMsg;
......
...@@ -176,6 +176,12 @@ SDnode *dndInit(SDnodeOpt *pOption) { ...@@ -176,6 +176,12 @@ SDnode *dndInit(SDnodeOpt *pOption) {
return NULL; return NULL;
} }
if (vnodeInit(1) != 0) {
dError("failed to init vnode env");
dndCleanup(pDnode);
return NULL;
}
if (dndInitDnode(pDnode) != 0) { if (dndInitDnode(pDnode) != 0) {
dError("failed to init dnode"); dError("failed to init dnode");
dndCleanup(pDnode); dndCleanup(pDnode);
...@@ -222,8 +228,10 @@ void dndCleanup(SDnode *pDnode) { ...@@ -222,8 +228,10 @@ void dndCleanup(SDnode *pDnode) {
dndCleanupMnode(pDnode); dndCleanupMnode(pDnode);
dndCleanupVnodes(pDnode); dndCleanupVnodes(pDnode);
dndCleanupDnode(pDnode); dndCleanupDnode(pDnode);
vnodeClear();
walCleanUp(); walCleanUp();
rpcCleanup(); rpcCleanup();
dndCleanupEnv(pDnode); dndCleanupEnv(pDnode);
free(pDnode); free(pDnode);
dInfo("TDengine is cleaned up successfully"); dInfo("TDengine is cleaned up successfully");
......
...@@ -8,8 +8,8 @@ add_subdirectory(db) ...@@ -8,8 +8,8 @@ add_subdirectory(db)
add_subdirectory(dnode) add_subdirectory(dnode)
# add_subdirectory(func) # add_subdirectory(func)
# add_subdirectory(mnode) # add_subdirectory(mnode)
# add_subdirectory(profile) add_subdirectory(profile)
# add_subdirectory(show) add_subdirectory(show)
add_subdirectory(stb) add_subdirectory(stb)
# add_subdirectory(sync) # add_subdirectory(sync)
# add_subdirectory(telem) # add_subdirectory(telem)
...@@ -17,4 +17,4 @@ add_subdirectory(stb) ...@@ -17,4 +17,4 @@ add_subdirectory(stb)
add_subdirectory(user) add_subdirectory(user)
add_subdirectory(vgroup) add_subdirectory(vgroup)
# add_subdirectory(common) add_subdirectory(sut)
aux_source_directory(src SUT_SRC)
add_library(sut STATIC ${SUT_SRC})
target_link_libraries(
sut
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories(
sut
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册