提交 a92aa1c5 编写于 作者: H Hongze Cheng

Merge branch '3.0' into feature/vnode

...@@ -36,7 +36,7 @@ enum { ...@@ -36,7 +36,7 @@ enum {
TSDB_MESSAGE_NULL = 0, TSDB_MESSAGE_NULL = 0,
#endif #endif
// message from client to dnode // message from client to vnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
...@@ -46,25 +46,12 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_TABLE, "alter-table" ) ...@@ -46,25 +46,12 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_TABLE, "alter-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLE_META, "table-meta" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLE_META, "table-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLES_META, "tables-meta" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLES_META, "tables-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" )
// message from client to mnode // message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )
...@@ -88,6 +75,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_FUNCTION, "create-function" ) ...@@ -88,6 +75,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_FUNCTION, "create-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_FUNCTION, "alter-function" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_FUNCTION, "alter-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE, "create-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE, "create-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" )
...@@ -97,54 +85,55 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" ) ...@@ -97,54 +85,55 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE, "retrieve" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE, "retrieve" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC, "retrieve-func" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC, "retrieve-func" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE, "compact-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" ) // message from client to qnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" ) // message from client to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY15, "dummy15" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY16, "dummy16" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY17, "dummy17" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY18, "dummy18" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY19, "dummy19" )
// message from mnode to dnode // message from vnode to vnode
// message from vnode to mnode
// message from vnode to qnode
// message from vnode to dnode
// message from mnode to vnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" )
// message from mnode to mnode
// message from mnode to qnode
// message from mnode to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY20, "dummy20" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY21, "dummy21" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY22, "dummy22" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY23, "dummy23" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY24, "dummy24" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY25, "dummy25" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY26, "dummy26" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY27, "dummy27" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY28, "dummy28" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY29, "dummy29" )
// message from qnode to vnode
// message from qnode to mnode
// message from qnode to qnode
// message from qnode to dnode
// message from dnode to vnode
// message from dnode to mnode // message from dnode to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STATUS, "status" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STATUS, "status" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_GRANT, "grant" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_GRANT, "grant" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH, "auth" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH, "auth" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY30, "dummy30" ) // message from dnode to qnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY31, "dummy31" ) // message from dnode to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY32, "dummy32" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY33, "dummy33" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY34, "dummy34" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY35, "dummy35" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY36, "dummy36" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY37, "dummy37" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY38, "dummy38" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY39, "dummy39" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" )
#ifndef TAOS_MESSAGE_C #ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 147 TSDB_MSG_TYPE_MAX // 147
...@@ -336,7 +325,7 @@ typedef struct { ...@@ -336,7 +325,7 @@ typedef struct {
typedef struct { typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN];
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
int16_t type; /* operation type */ int16_t type; /* operation type */
int16_t numOfCols; /* number of schema */ int16_t numOfCols; /* number of schema */
int32_t tagValLen; int32_t tagValLen;
...@@ -428,10 +417,6 @@ typedef struct { ...@@ -428,10 +417,6 @@ typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN];
} SDropSTableMsg; } SDropSTableMsg;
typedef struct {
int32_t vgId;
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
typedef struct SColIndex { typedef struct SColIndex {
int16_t colId; // column id int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
...@@ -585,7 +570,7 @@ typedef struct SRetrieveTableRsp { ...@@ -585,7 +570,7 @@ typedef struct SRetrieveTableRsp {
} SRetrieveTableRsp; } SRetrieveTableRsp;
typedef struct { typedef struct {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
int32_t cacheBlockSize; //MB int32_t cacheBlockSize; //MB
int32_t totalBlocks; int32_t totalBlocks;
int32_t maxTables; int32_t maxTables;
...@@ -661,9 +646,8 @@ typedef struct { ...@@ -661,9 +646,8 @@ typedef struct {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int8_t status;
int8_t role; int8_t role;
int8_t reserved[2]; int8_t reserved[3];
int64_t totalStorage; int64_t totalStorage;
int64_t compStorage; int64_t compStorage;
int64_t pointsWritten; int64_t pointsWritten;
...@@ -671,8 +655,8 @@ typedef struct { ...@@ -671,8 +655,8 @@ typedef struct {
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
int32_t vnodeNum; int32_t num;
SVnodeLoad vnodeLoads[]; SVnodeLoad data[];
} SVnodeLoads; } SVnodeLoads;
typedef struct SStatusMsg { typedef struct SStatusMsg {
...@@ -717,7 +701,7 @@ typedef struct { ...@@ -717,7 +701,7 @@ typedef struct {
} SVnodeDesc; } SVnodeDesc;
typedef struct { typedef struct {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
uint32_t vgId; uint32_t vgId;
int32_t cacheBlockSize; int32_t cacheBlockSize;
int32_t totalBlocks; int32_t totalBlocks;
...@@ -737,9 +721,18 @@ typedef struct { ...@@ -737,9 +721,18 @@ typedef struct {
int8_t replica; int8_t replica;
int8_t quorum; int8_t quorum;
int8_t selfIndex; int8_t selfIndex;
SVnodeDesc nodes[TSDB_MAX_REPLICA]; SVnodeDesc replicas[TSDB_MAX_REPLICA];
} SCreateVnodeMsg, SAlterVnodeMsg; } SCreateVnodeMsg, SAlterVnodeMsg;
typedef struct {
int32_t vgId;
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
typedef struct {
int32_t vgId;
int8_t accessState;
} SAuthVnodeMsg;
typedef struct { typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN];
int16_t createFlag; int16_t createFlag;
...@@ -811,13 +804,13 @@ typedef struct { ...@@ -811,13 +804,13 @@ typedef struct {
*/ */
typedef struct { typedef struct {
int8_t type; int8_t type;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
uint16_t payloadLen; uint16_t payloadLen;
char payload[]; char payload[];
} SShowMsg; } SShowMsg;
typedef struct { typedef struct {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
int32_t numOfVgroup; int32_t numOfVgroup;
int32_t vgid[]; int32_t vgid[];
} SCompactMsg; } SCompactMsg;
...@@ -1002,6 +995,7 @@ typedef struct { ...@@ -1002,6 +995,7 @@ typedef struct {
/* data */ /* data */
} SUpdateTagValRsp; } SUpdateTagValRsp;
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -27,7 +27,7 @@ extern "C" { ...@@ -27,7 +27,7 @@ extern "C" {
typedef struct SVnode SVnode; typedef struct SVnode SVnode;
typedef struct { typedef struct {
char dbName[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
int32_t cacheBlockSize; // MB int32_t cacheBlockSize; // MB
int32_t totalBlocks; int32_t totalBlocks;
int32_t daysPerFile; int32_t daysPerFile;
...@@ -47,40 +47,43 @@ typedef struct { ...@@ -47,40 +47,43 @@ typedef struct {
SVnodeDesc replicas[TSDB_MAX_REPLICA]; SVnodeDesc replicas[TSDB_MAX_REPLICA];
} SVnodeCfg; } SVnodeCfg;
typedef struct { typedef enum {
int64_t totalStorage; VN_MSG_TYPE_WRITE = 1,
int64_t compStorage; VN_MSG_TYPE_APPLY,
int64_t pointsWritten; VN_MSG_TYPE_SYNC,
int64_t tablesNum; VN_MSG_TYPE_QUERY,
} SVnodeStatisic; VN_MSG_TYPE_FETCH
} EVnMsgType;
typedef struct { typedef struct {
int8_t syncRole; int32_t curNum;
} SVnodeStatus; int32_t allocNum;
SRpcMsg rpcMsg[];
typedef struct SVnodeMsg {
int32_t msgType;
int32_t code;
SRpcMsg rpcMsg; // original message from rpc
int32_t contLen;
char pCont[];
} SVnodeMsg; } SVnodeMsg;
int32_t vnodeInit(); typedef struct {
void vnodeCleanup(); void (*SendMsgToDnode)(SEpSet *pEpSet, SRpcMsg *pMsg);
void (*SendMsgToMnode)(SRpcMsg *pMsg);
int32_t (*PutMsgIntoApplyQueue)(int32_t vgId, SVnodeMsg *pMsg);
} SVnodePara;
int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat); int32_t vnodeInit(SVnodePara);
int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus); void vnodeCleanup();
SVnode *vnodeOpen(int32_t vgId, const char *path); SVnode *vnodeOpen(int32_t vgId, const char *path);
void vnodeClose(SVnode *pVnode); void vnodeClose(SVnode *pVnode);
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg); SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg);
int32_t vnodeDrop(SVnode *pVnode); void vnodeDrop(SVnode *pVnode);
int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode);
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
SVnodeMsg *vnodeInitMsg(int32_t msgNum);
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg);
void vnodeCleanupMsg(SVnodeMsg *pMsg);
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -223,6 +223,7 @@ int32_t* taosGetErrno(); ...@@ -223,6 +223,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0406) //"Action in progress") #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0406) //"Action in progress")
#define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0407) //"Too many vnode directories") #define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0407) //"Too many vnode directories")
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0408) //"Dnode is exiting" #define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0408) //"Dnode is exiting"
#define TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0409) //"Parse vnodes.json error")
// vnode // vnode
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress") #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress")
......
...@@ -153,11 +153,12 @@ do { \ ...@@ -153,11 +153,12 @@ do { \
#define TSDB_NODE_NAME_LEN 64 #define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 33 #define TSDB_DB_NAME_LEN 33
#define TSDB_FULL_DB_NAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN)
#define TSDB_FUNC_NAME_LEN 65 #define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_CODE_LEN (65535 - 512) #define TSDB_FUNC_CODE_LEN (65535 - 512)
#define TSDB_FUNC_BUF_SIZE 512 #define TSDB_FUNC_BUF_SIZE 512
#define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN) #define TSDB_TABLE_FNAME_LEN (TSDB_FULL_DB_NAME_LEN + TSDB_TABLE_NAME_LEN)
#define TSDB_COL_NAME_LEN 65 #define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
...@@ -211,7 +212,7 @@ do { \ ...@@ -211,7 +212,7 @@ do { \
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_CQ_SQL_SIZE 1024 #define TSDB_CQ_SQL_SIZE 1024
#define TSDB_MIN_VNODES 64 #define TSDB_MIN_VNODES 64
#define TSDB_MAX_VNODES 2048 #define TSDB_MAX_VNODES 512
#define TSDB_MIN_VNODES_PER_DB 2 #define TSDB_MIN_VNODES_PER_DB 2
#define TSDB_MAX_VNODES_PER_DB 64 #define TSDB_MAX_VNODES_PER_DB 64
......
...@@ -40,8 +40,8 @@ shall be used to set up the protection. ...@@ -40,8 +40,8 @@ shall be used to set up the protection.
typedef void *taos_queue; typedef void *taos_queue;
typedef void *taos_qset; typedef void *taos_qset;
typedef void *taos_qall; typedef void *taos_qall;
typedef void *(*FProcessItem)(void *pItem, void *ahandle); typedef void (*FProcessItem)(void *ahandle, void *pItem);
typedef void *(*FProcessItems)(taos_qall qall, int numOfItems, void *ahandle); typedef void (*FProcessItems)(void *ahandle, taos_qall qall, int numOfItems);
taos_queue taosOpenQueue(); taos_queue taosOpenQueue();
void taosCloseQueue(taos_queue); void taosCloseQueue(taos_queue);
...@@ -50,6 +50,7 @@ void *taosAllocateQitem(int size); ...@@ -50,6 +50,7 @@ void *taosAllocateQitem(int size);
void taosFreeQitem(void *pItem); void taosFreeQitem(void *pItem);
int taosWriteQitem(taos_queue, void *pItem); int taosWriteQitem(taos_queue, void *pItem);
int taosReadQitem(taos_queue, void **pItem); int taosReadQitem(taos_queue, void **pItem);
bool taosQueueEmpty(taos_queue);
taos_qall taosAllocateQall(); taos_qall taosAllocateQall();
void taosFreeQall(taos_qall); void taosFreeQall(taos_qall);
......
...@@ -201,7 +201,7 @@ int32_t tNameExtractFullName(const SName* name, char* dst) { ...@@ -201,7 +201,7 @@ int32_t tNameExtractFullName(const SName* name, char* dst) {
return -1; return -1;
} }
int32_t len = snprintf(dst, TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN, "%s.%s", name->acctId, name->dbname); int32_t len = snprintf(dst, TSDB_FULL_DB_NAME_LEN, "%s.%s", name->acctId, name->dbname);
size_t tnameLen = strlen(name->tname); size_t tnameLen = strlen(name->tname);
if (tnameLen > 0) { if (tnameLen > 0) {
......
...@@ -42,7 +42,10 @@ void dnodeCleanup(); ...@@ -42,7 +42,10 @@ void dnodeCleanup();
EDnStat dnodeGetRunStat(); EDnStat dnodeGetRunStat();
void dnodeSetRunStat(); void dnodeSetRunStat();
void dnodeGetStartup(SStartupMsg *);
void dnodeReportStartup(char *name, char *desc);
void dnodeReportStartupFinished(char *name, char *desc);
void dnodeGetStartup(SStartupMsg *);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -23,8 +23,13 @@ extern "C" { ...@@ -23,8 +23,13 @@ extern "C" {
int32_t dnodeInitVnodes(); int32_t dnodeInitVnodes();
void dnodeCleanupVnodes(); void dnodeCleanupVnodes();
void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dnodeGetVnodeLoads(SVnodeLoads *pVloads);
void dnodeGetVnodes(SVnodeLoads *pVloads);
void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -372,8 +372,8 @@ static void dnodeSendStatusMsg() { ...@@ -372,8 +372,8 @@ static void dnodeSendStatusMsg() {
char timestr[32] = "1970-01-01 00:00:00.00"; char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
dnodeGetVnodes(&pStatus->vnodeLoads); dnodeGetVnodeLoads(&pStatus->vnodeLoads);
contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.vnodeNum * sizeof(SVnodeLoad); contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad);
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS}; SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS};
dnodeSendMsgToMnode(&rpcMsg); dnodeSendMsgToMnode(&rpcMsg);
......
...@@ -35,14 +35,14 @@ EDnStat dnodeGetRunStat() { return tsInt.runStat; } ...@@ -35,14 +35,14 @@ EDnStat dnodeGetRunStat() { return tsInt.runStat; }
void dnodeSetRunStat(EDnStat stat) { tsInt.runStat = stat; } void dnodeSetRunStat(EDnStat stat) { tsInt.runStat = stat; }
static void dnodeReportStartup(char *name, char *desc) { void dnodeReportStartup(char *name, char *desc) {
SStartupMsg *pStartup = &tsInt.startup; SStartupMsg *pStartup = &tsInt.startup;
tstrncpy(pStartup->name, name, strlen(pStartup->name)); tstrncpy(pStartup->name, name, strlen(pStartup->name));
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc)); tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
pStartup->finished = 0; pStartup->finished = 0;
} }
static void dnodeReportStartupFinished(char *name, char *desc) { void dnodeReportStartupFinished(char *name, char *desc) {
SStartupMsg *pStartup = &tsInt.startup; SStartupMsg *pStartup = &tsInt.startup;
tstrncpy(pStartup->name, name, strlen(pStartup->name)); tstrncpy(pStartup->name, name, strlen(pStartup->name));
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc)); tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
......
...@@ -34,24 +34,22 @@ static struct { ...@@ -34,24 +34,22 @@ static struct {
static void dnodeInitMsgFp() { static void dnodeInitMsgFp() {
// msg from client to dnode // msg from client to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodeFetchMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodesMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg;
// msg from client to mnode // msg from client to mnode
tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dnodeProcessMnodeMsg;
...@@ -77,6 +75,7 @@ static void dnodeInitMsgFp() { ...@@ -77,6 +75,7 @@ static void dnodeInitMsgFp() {
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dnodeProcessMnodeMsg;
...@@ -85,22 +84,29 @@ static void dnodeInitMsgFp() { ...@@ -85,22 +84,29 @@ static void dnodeInitMsgFp() {
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dnodeProcessMnodeMsg;
// message from mnode to dnode // message from client to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg;
// message from mnode to vnode
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodesMsg;
// message from mnode to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodesMsg; tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dnodeProcessMnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dnodeProcessMnodeMsg;
......
...@@ -14,13 +14,1016 @@ ...@@ -14,13 +14,1016 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dnodeDnode.h" #include "dnodeVnodes.h"
#include "dnodeTransport.h"
#include "cJSON.h"
#include "thash.h"
#include "tlockfree.h"
#include "tqueue.h"
#include "tstep.h"
#include "tthread.h"
#include "tworker.h"
#include "vnode.h" #include "vnode.h"
int32_t dnodeInitVnodes() { return vnodeInit(); } typedef struct {
int32_t vgId;
int32_t refCount;
int8_t dropped;
int8_t accessState;
SVnode *pImpl;
taos_queue pWriteQ;
taos_queue pSyncQ;
taos_queue pApplyQ;
taos_queue pQueryQ;
taos_queue pFetchQ;
} SVnodeObj;
void dnodeCleanupVnodes() { vnodeCleanup(); } typedef struct {
pthread_t *threadId;
int32_t threadIndex;
int32_t failed;
int32_t opened;
int32_t vnodeNum;
SVnodeObj *pVnodes;
} SVThread;
void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { vnodeProcessMsg(NULL, NULL); } static struct {
SHashObj *hash;
SWorkerPool mgmtPool;
SWorkerPool queryPool;
SWorkerPool fetchPool;
SMWorkerPool syncPool;
SMWorkerPool writePool;
taos_queue pMgmtQ;
SSteps *pSteps;
int32_t openVnodes;
int32_t totalVnodes;
SRWLatch latch;
} tsVnodes;
void dnodeGetVnodes(SVnodeLoads *pVloads) {} static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode);
\ No newline at end of file static void dnodeFreeVnodeQueryQueue(SVnodeObj *pVnode);
static int32_t dnodeAllocVnodeFetchQueue(SVnodeObj *pVnode);
static void dnodeFreeVnodeFetchQueue(SVnodeObj *pVnode);
static int32_t dnodeAllocVnodeWriteQueue(SVnodeObj *pVnode);
static void dnodeFreeVnodeWriteQueue(SVnodeObj *pVnode);
static int32_t dnodeAllocVnodeApplyQueue(SVnodeObj *pVnode);
static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode);
static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode);
static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode);
static SVnodeObj *dnodeAcquireVnode(int32_t vgId) {
SVnodeObj *pVnode = NULL;
int32_t refCount = 0;
taosRLockLatch(&tsVnodes.latch);
taosHashGetClone(tsVnodes.hash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
} else {
refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
}
taosRUnLockLatch(&tsVnodes.latch);
dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount);
return pVnode;
}
static void dnodeReleaseVnode(SVnodeObj *pVnode) {
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount);
}
static int32_t dnodeCreateVnodeWrapper(int32_t vgId, SVnode *pImpl) {
SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
pVnode->vgId = vgId;
pVnode->refCount = 0;
pVnode->dropped = 0;
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
pVnode->pImpl = pImpl;
int32_t code = dnodeAllocVnodeQueryQueue(pVnode);
if (code != 0) {
return code;
}
code = dnodeAllocVnodeFetchQueue(pVnode);
if (code != 0) {
return code;
}
code = dnodeAllocVnodeWriteQueue(pVnode);
if (code != 0) {
return code;
}
code = dnodeAllocVnodeApplyQueue(pVnode);
if (code != 0) {
return code;
}
code = dnodeAllocVnodeSyncQueue(pVnode);
if (code != 0) {
return code;
}
taosWLockLatch(&tsVnodes.latch);
code = taosHashPut(tsVnodes.hash, &vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
taosWUnLockLatch(&tsVnodes.latch);
return code;
}
static void dnodeDropVnodeWrapper(SVnodeObj *pVnode) {
taosWLockLatch(&tsVnodes.latch);
taosHashRemove(tsVnodes.hash, &pVnode->vgId, sizeof(int32_t));
taosWUnLockLatch(&tsVnodes.latch);
// wait all queue empty
dnodeReleaseVnode(pVnode);
while (pVnode->refCount > 0) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
dnodeFreeVnodeQueryQueue(pVnode);
dnodeFreeVnodeFetchQueue(pVnode);
dnodeFreeVnodeWriteQueue(pVnode);
dnodeFreeVnodeApplyQueue(pVnode);
dnodeFreeVnodeSyncQueue(pVnode);
}
static SVnodeObj **dnodeGetVnodesFromHash(int32_t *numOfVnodes) {
taosRLockLatch(&tsVnodes.latch);
int32_t num = 0;
int32_t size = taosHashGetSize(tsVnodes.hash);
SVnodeObj **pVnodes = calloc(size, sizeof(SVnodeObj *));
void *pIter = taosHashIterate(tsVnodes.hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
SVnodeObj *pVnode = *ppVnode;
if (pVnode) {
num++;
if (num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, accquire vnode, refCount:%d", pVnode->vgId, refCount);
pVnodes[num] = (*ppVnode);
}
}
pIter = taosHashIterate(tsVnodes.hash, pIter);
}
taosRUnLockLatch(&tsVnodes.latch);
*numOfVnodes = num;
return pVnodes;
}
static int32_t dnodeGetVnodesFromFile(SVnodeObj **ppVnodes, int32_t *numOfVnodes) {
int32_t code = TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR;
int32_t len = 0;
int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
FILE *fp = NULL;
char file[PATH_MAX + 20] = {0};
SVnodeObj *pVnodes = NULL;
snprintf(file, PATH_MAX + 20, "%s/vnodes.json", tsVnodeDir);
fp = fopen(file, "r");
if (!fp) {
dDebug("file %s not exist", file);
code = 0;
goto PRASE_VNODE_OVER;
}
len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s since content is null", file);
goto PRASE_VNODE_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s since invalid json format", file);
goto PRASE_VNODE_OVER;
}
cJSON *vnodes = cJSON_GetObjectItem(root, "vnodes");
if (!vnodes || vnodes->type != cJSON_Array) {
dError("failed to read %s since vnodes not found", file);
goto PRASE_VNODE_OVER;
}
int32_t vnodesNum = cJSON_GetArraySize(vnodes);
if (vnodesNum <= 0) {
dError("failed to read %s since vnodes size:%d invalid", file, vnodesNum);
goto PRASE_VNODE_OVER;
}
pVnodes = calloc(vnodesNum, sizeof(SVnodeObj));
if (pVnodes == NULL) {
dError("failed to read %s since out of memory", file);
goto PRASE_VNODE_OVER;
}
for (int32_t i = 0; i < vnodesNum; ++i) {
cJSON *vnode = cJSON_GetArrayItem(vnodes, i);
SVnodeObj *pVnode = &pVnodes[i];
cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
if (!vgId || vgId->type != cJSON_String) {
dError("failed to read %s since vgId not found", file);
goto PRASE_VNODE_OVER;
}
pVnode->vgId = atoi(vgId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(vnode, "dropped");
if (!dropped || dropped->type != cJSON_String) {
dError("failed to read %s since dropped not found", file);
goto PRASE_VNODE_OVER;
}
pVnode->dropped = atoi(vnode->valuestring);
}
code = 0;
dInfo("succcessed to read file %s", file);
PRASE_VNODE_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
return code;
}
static int32_t dnodeWriteVnodesToFile() {
char file[PATH_MAX + 20] = {0};
char realfile[PATH_MAX + 20] = {0};
snprintf(file, PATH_MAX + 20, "%s/vnodes.json.bak", tsVnodeDir);
snprintf(realfile, PATH_MAX + 20, "%s/vnodes.json", tsVnodeDir);
FILE *fp = fopen(file, "w");
if (!fp) {
dError("failed to write %s since %s", file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1);
int32_t numOfVnodes = 0;
SVnodeObj **pVnodes = dnodeGetVnodesFromHash(&numOfVnodes);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"vnodes\": [{\n");
for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = pVnodes[i];
len += snprintf(content + len, maxLen - len, " \"vgId\": \"%d\",\n", pVnode->vgId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", pVnode->dropped);
if (i < numOfVnodes - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
taosFsyncFile(fileno(fp));
fclose(fp);
free(content);
terrno = 0;
for (int32_t i = 0; i < numOfVnodes; ++i) {
SVnodeObj *pVnode = pVnodes[i];
dnodeReleaseVnode(pVnode);
}
if (pVnodes != NULL) {
free(pVnodes);
}
dInfo("successed to write %s", file);
return taosRenameFile(file, realfile);
}
static int32_t dnodeCreateVnode(int32_t vgId, SVnodeCfg *pCfg) {
int32_t code = 0;
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path),"%s/vnode%d", tsVnodeDir, vgId);
SVnode *pImpl = vnodeCreate(vgId, path, pCfg);
if (pImpl == NULL) {
code = terrno;
return code;
}
code = dnodeCreateVnodeWrapper(vgId, pImpl);
if (code != 0) {
vnodeDrop(pImpl);
return code;
}
code = dnodeWriteVnodesToFile();
if (code != 0) {
vnodeDrop(pImpl);
return code;
}
return code;
}
static int32_t dnodeDropVnode(SVnodeObj *pVnode) {
pVnode->dropped = 1;
int32_t code = dnodeWriteVnodesToFile();
if (code != 0) {
pVnode->dropped = 0;
return code;
}
dnodeDropVnodeWrapper(pVnode);
vnodeDrop(pVnode->pImpl);
dnodeWriteVnodesToFile();
return 0;
}
static void *dnodeOpenVnodeFunc(void *param) {
SVThread *pThread = param;
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("open-vnodes");
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
SVnodeObj *pVnode = &pThread->pVnodes[v];
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to restore, %d of %d have been opened", pVnode->vgId,
tsVnodes.openVnodes, tsVnodes.totalVnodes);
dnodeReportStartup("open-vnodes", stepDesc);
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path),"%s/vnode%d", tsVnodeDir, pVnode->vgId);
SVnode *pImpl = vnodeOpen(pVnode->vgId, path);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex);
pThread->failed++;
} else {
dnodeCreateVnodeWrapper(pVnode->vgId, pImpl);
dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex);
pThread->opened++;
}
atomic_add_fetch_32(&tsVnodes.openVnodes, 1);
}
dDebug("thread:%d, total vnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
pThread->failed);
return NULL;
}
static int32_t dnodeOpenVnodes() {
taosInitRWLatch(&tsVnodes.latch);
tsVnodes.hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVnodes.hash == NULL) {
dError("failed to init vnode hash");
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
SVnodeObj *pVnodes = NULL;
int32_t numOfVnodes = 0;
int32_t code = dnodeGetVnodesFromFile(&pVnodes, &numOfVnodes);
if (code != TSDB_CODE_SUCCESS) {
dInfo("failed to get vnode list from disk since %s", tstrerror(code));
return code;
}
tsVnodes.totalVnodes = numOfVnodes;
int32_t threadNum = tsNumOfCores;
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SVThread *threads = calloc(threadNum, sizeof(SVThread));
for (int32_t t = 0; t < threadNum; ++t) {
threads[t].threadIndex = t;
threads[t].pVnodes = calloc(vnodesPerThread, sizeof(SVnodeObj));
}
for (int32_t v = 0; v < numOfVnodes; ++v) {
int32_t t = v % threadNum;
SVThread *pThread = &threads[t];
pThread->pVnodes[pThread->vnodeNum++] = pVnodes[v];
}
dInfo("start %d threads to open %d vnodes", threadNum, numOfVnodes);
for (int32_t t = 0; t < threadNum; ++t) {
SVThread *pThread = &threads[t];
if (pThread->vnodeNum == 0) continue;
pThread->threadId = taosCreateThread(dnodeOpenVnodeFunc, pThread);
if (pThread->threadId == NULL) {
dError("thread:%d, failed to create thread to open vnode, reason:%s", pThread->threadIndex, strerror(errno));
}
}
for (int32_t t = 0; t < threadNum; ++t) {
SVThread *pThread = &threads[t];
taosDestoryThread(pThread->threadId);
pThread->threadId = NULL;
free(pThread->pVnodes);
}
free(threads);
if (tsVnodes.openVnodes != tsVnodes.totalVnodes) {
dError("there are total vnodes:%d, opened:%d", tsVnodes.totalVnodes, tsVnodes.openVnodes);
return -1;
} else {
dInfo("total vnodes:%d open successfully", tsVnodes.totalVnodes);
}
return TSDB_CODE_SUCCESS;
}
static void dnodeCloseVnodes() {
int32_t numOfVnodes = 0;
SVnodeObj **pVnodes = dnodeGetVnodesFromHash(&numOfVnodes);
for (int32_t i = 0; i < numOfVnodes; ++i) {
dnodeDropVnodeWrapper(pVnodes[i]);
}
if (pVnodes != NULL) {
free(pVnodes);
}
if (tsVnodes.hash != NULL) {
taosHashCleanup(tsVnodes.hash);
tsVnodes.hash = NULL;
}
dInfo("total vnodes:%d are all closed", numOfVnodes);
}
static int32_t dnodeParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont;
*vgId = htonl(pCreate->vgId);
tstrncpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN);
pCfg->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCfg->totalBlocks = htonl(pCreate->totalBlocks);
pCfg->daysPerFile = htonl(pCreate->daysPerFile);
pCfg->daysToKeep0 = htonl(pCreate->daysToKeep0);
pCfg->daysToKeep1 = htonl(pCreate->daysToKeep1);
pCfg->daysToKeep2 = htonl(pCreate->daysToKeep2);
pCfg->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
pCfg->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
pCfg->precision = pCreate->precision;
pCfg->compression = pCreate->compression;
pCfg->cacheLastRow = pCreate->cacheLastRow;
pCfg->update = pCreate->update;
pCfg->quorum = pCreate->quorum;
pCfg->replica = pCreate->replica;
pCfg->walLevel = pCreate->walLevel;
pCfg->fsyncPeriod = htonl(pCreate->fsyncPeriod);
for (int32_t i = 0; i < pCfg->replica; ++i) {
pCfg->replicas[i].port = htons(pCreate->replicas[i].port);
tstrncpy(pCfg->replicas[i].fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
}
return 0;
}
static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId);
return pDrop;
}
static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) {
SAuthVnodeMsg *pAuth = rpcMsg->pCont;
pAuth->vgId = htonl(pAuth->vgId);
return pAuth;
}
static int32_t vnodeProcessCreateVnodeReq(SRpcMsg *rpcMsg) {
SVnodeCfg vnodeCfg = {0};
int32_t vgId = 0;
dnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg);
dDebug("vgId:%d, create vnode req is received", vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode != NULL) {
dDebug("vgId:%d, already exist, return success", vgId);
dnodeReleaseVnode(pVnode);
return 0;
}
int32_t code = dnodeCreateVnode(vgId, &vnodeCfg);
if (code != 0) {
dError("vgId:%d, failed to create vnode since %s", vgId, tstrerror(code));
}
return code;
}
static int32_t vnodeProcessAlterVnodeReq(SRpcMsg *rpcMsg) {
SVnodeCfg vnodeCfg = {0};
int32_t vgId = 0;
int32_t code = 0;
dnodeParseCreateVnodeReq(rpcMsg, &vgId, &vnodeCfg);
dDebug("vgId:%d, alter vnode req is received", vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode == NULL) {
code = terrno;
dDebug("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code));
return code;
}
code = vnodeAlter(pVnode->pImpl, &vnodeCfg);
if (code != 0) {
dError("vgId:%d, failed to alter vnode since %s", vgId, tstrerror(code));
}
dnodeReleaseVnode(pVnode);
return code;
}
static int32_t vnodeProcessDropVnodeReq(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pDrop->vgId;
dDebug("vgId:%d, drop vnode req is received", vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode == NULL) {
code = terrno;
dDebug("vgId:%d, failed to drop since %s", vgId, tstrerror(code));
return code;
}
code = dnodeDropVnode(pVnode);
if (code != 0) {
dnodeReleaseVnode(pVnode);
dError("vgId:%d, failed to drop vnode since %s", vgId, tstrerror(code));
}
return code;
}
static int32_t vnodeProcessAuthVnodeReq(SRpcMsg *rpcMsg) {
SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pAuth->vgId;
dDebug("vgId:%d, auth vnode req is received", vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode == NULL) {
code = terrno;
dDebug("vgId:%d, failed to auth since %s", vgId, tstrerror(code));
return code;
}
pVnode->accessState = pAuth->accessState;
dnodeReleaseVnode(pVnode);
return code;
}
static int32_t vnodeProcessSyncVnodeReq(SRpcMsg *rpcMsg) {
SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pAuth->vgId;
dDebug("vgId:%d, auth vnode req is received", vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode == NULL) {
code = terrno;
dDebug("vgId:%d, failed to auth since %s", vgId, tstrerror(code));
return code;
}
code = vnodeSync(pVnode->pImpl);
if (code != 0) {
dError("vgId:%d, failed to auth vnode since %s", vgId, tstrerror(code));
}
dnodeReleaseVnode(pVnode);
return code;
}
static int32_t vnodeProcessCompactVnodeReq(SRpcMsg *rpcMsg) {
SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg);
int32_t code = 0;
int32_t vgId = pCompact->vgId;
dDebug("vgId:%d, compact vnode req is received", vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode == NULL) {
code = terrno;
dDebug("vgId:%d, failed to compact since %s", vgId, tstrerror(code));
return code;
}
code = vnodeCompact(pVnode->pImpl);
if (code != 0) {
dError("vgId:%d, failed to compact vnode since %s", vgId, tstrerror(code));
}
dnodeReleaseVnode(pVnode);
return code;
}
static void dnodeProcessVnodeMgmtQueue(void *unused, SRpcMsg *pMsg) {
int32_t code = 0;
switch (pMsg->msgType) {
case TSDB_MSG_TYPE_CREATE_VNODE_IN:
code = vnodeProcessCreateVnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_ALTER_VNODE_IN:
code = vnodeProcessAlterVnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_DROP_VNODE_IN:
code = vnodeProcessDropVnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_AUTH_VNODE_IN:
code = vnodeProcessAuthVnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_SYNC_VNODE_IN:
code = vnodeProcessSyncVnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_COMPACT_VNODE_IN:
code = vnodeProcessCompactVnodeReq(pMsg);
break;
default:
code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
break;
}
SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void dnodeProcessVnodeQueryQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) {
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_QUERY);
}
static void dnodeProcessVnodeFetchQueue(SVnodeObj *pVnode, SVnodeMsg *pMsg) {
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_FETCH);
}
static void dnodeProcessVnodeWriteQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
SVnodeMsg *pMsg = vnodeInitMsg(numOfMsgs);
SRpcMsg *pRpcMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg);
vnodeAppendMsg(pMsg, pRpcMsg);
taosFreeQitem(pRpcMsg);
}
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_WRITE);
}
static void dnodeProcessVnodeApplyQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
SVnodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg);
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_APPLY);
}
}
static void dnodeProcessVnodeSyncQueue(SVnodeObj *pVnode, taos_qall qall, int32_t numOfMsgs) {
SVnodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg);
vnodeProcessMsg(pVnode->pImpl, pMsg, VN_MSG_TYPE_SYNC);
}
}
static int32_t dnodeWriteRpcMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
int32_t code = 0;
if (pQueue == NULL) {
code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
} else {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg));
if (pMsg == NULL) {
code = TSDB_CODE_DND_OUT_OF_MEMORY;
} else {
*pMsg = *pRpcMsg;
code = taosWriteQitem(pQueue, pMsg);
}
}
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
rpcSendResponse(&rsp);
rpcFreeCont(pRpcMsg->pCont);
}
}
static int32_t dnodeWriteVnodeMsgToVnodeQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
int32_t code = 0;
if (pQueue == NULL) {
code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
} else {
SVnodeMsg *pMsg = vnodeInitMsg(1);
if (pMsg == NULL) {
code = TSDB_CODE_DND_OUT_OF_MEMORY;
} else {
vnodeAppendMsg(pMsg, pRpcMsg);
code = taosWriteQitem(pQueue, pMsg);
}
}
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
rpcSendResponse(&rsp);
rpcFreeCont(pRpcMsg->pCont);
}
}
static SVnodeObj *dnodeAcquireVnodeFromMsg(SRpcMsg *pMsg) {
SMsgHead *pHead = (SMsgHead *)pMsg->pCont;
pHead->vgId = htonl(pHead->vgId);
SVnodeObj *pVnode = dnodeAcquireVnode(pHead->vgId);
if (pVnode == NULL) {
SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno};
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
}
return pVnode;
}
void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteRpcMsgToVnodeQueue(tsVnodes.pMgmtQ, pMsg); }
void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg);
if (pVnode != NULL) {
dnodeWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg);
dnodeReleaseVnode(pVnode);
}
}
void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg);
if (pVnode != NULL) {
dnodeWriteVnodeMsgToVnodeQueue(pVnode->pSyncQ, pMsg);
dnodeReleaseVnode(pVnode);
}
}
void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg);
if (pVnode != NULL) {
dnodeWriteVnodeMsgToVnodeQueue(pVnode->pQueryQ, pMsg);
dnodeReleaseVnode(pVnode);
}
}
void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dnodeAcquireVnodeFromMsg(pMsg);
if (pVnode != NULL) {
dnodeWriteVnodeMsgToVnodeQueue(pVnode->pFetchQ, pMsg);
dnodeReleaseVnode(pVnode);
}
}
static int32_t dnodePutMsgIntoVnodeApplyQueue(int32_t vgId, SVnodeMsg *pMsg) {
SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode == NULL) {
return terrno;
}
int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg);
dnodeReleaseVnode(pVnode);
return code;
}
static int32_t dnodeInitVnodeMgmtWorker() {
SWorkerPool *pPool = &tsVnodes.mgmtPool;
pPool->name = "vnode-mgmt";
pPool->min = 1;
pPool->max = 1;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
tsVnodes.pMgmtQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)dnodeProcessVnodeMgmtQueue);
if (tsVnodes.pMgmtQ == NULL) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeCleanupVnodeMgmtWorker() {
tWorkerFreeQueue(&tsVnodes.mgmtPool, tsVnodes.pMgmtQ);
tWorkerCleanup(&tsVnodes.mgmtPool);
tsVnodes.pMgmtQ = NULL;
}
static int32_t dnodeAllocVnodeQueryQueue(SVnodeObj *pVnode) {
pVnode->pQueryQ = tWorkerAllocQueue(&tsVnodes.queryPool, pVnode, (FProcessItem)dnodeProcessVnodeQueryQueue);
if (pVnode->pQueryQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeVnodeQueryQueue(SVnodeObj *pVnode) {
tWorkerFreeQueue(&tsVnodes.queryPool, pVnode->pQueryQ);
pVnode->pQueryQ = NULL;
}
static int32_t dnodeAllocVnodeFetchQueue(SVnodeObj *pVnode) {
pVnode->pFetchQ = tWorkerAllocQueue(&tsVnodes.fetchPool, pVnode, (FProcessItem)dnodeProcessVnodeFetchQueue);
if (pVnode->pFetchQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeVnodeFetchQueue(SVnodeObj *pVnode) {
tWorkerFreeQueue(&tsVnodes.fetchPool, pVnode->pFetchQ);
pVnode->pFetchQ = NULL;
}
static int32_t dnodeInitVnodeReadWorker() {
int32_t maxFetchThreads = 4;
float threadsForQuery = MAX(tsNumOfCores * tsRatioOfQueryCores, 1);
SWorkerPool *pPool = &tsVnodes.queryPool;
pPool->name = "vnode-query";
pPool->min = (int32_t)threadsForQuery;
pPool->max = pPool->min;
if (tWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
pPool = &tsVnodes.fetchPool;
pPool->name = "vnode-fetch";
pPool->min = MIN(maxFetchThreads, tsNumOfCores);
pPool->max = pPool->min;
if (tWorkerInit(pPool) != 0) {
TSDB_CODE_VND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeCleanupVnodeReadWorker() {
tWorkerCleanup(&tsVnodes.fetchPool);
tWorkerCleanup(&tsVnodes.queryPool);
}
static int32_t dnodeAllocVnodeWriteQueue(SVnodeObj *pVnode) {
pVnode->pWriteQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeWriteQueue);
if (pVnode->pWriteQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeVnodeWriteQueue(SVnodeObj *pVnode) {
tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pWriteQ);
pVnode->pWriteQ = NULL;
}
static int32_t dnodeAllocVnodeApplyQueue(SVnodeObj *pVnode) {
pVnode->pApplyQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeApplyQueue);
if (pVnode->pApplyQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeVnodeApplyQueue(SVnodeObj *pVnode) {
tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pApplyQ);
pVnode->pApplyQ = NULL;
}
static int32_t dnodeInitVnodeWriteWorker() {
SMWorkerPool *pPool = &tsVnodes.writePool;
pPool->name = "vnode-write";
pPool->max = tsNumOfCores;
if (tMWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeCleanupVnodeWriteWorker() { tMWorkerCleanup(&tsVnodes.writePool); }
static int32_t dnodeAllocVnodeSyncQueue(SVnodeObj *pVnode) {
pVnode->pSyncQ = tMWorkerAllocQueue(&tsVnodes.writePool, pVnode, (FProcessItems)dnodeProcessVnodeSyncQueue);
if (pVnode->pSyncQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeVnodeSyncQueue(SVnodeObj *pVnode) {
tMWorkerFreeQueue(&tsVnodes.writePool, pVnode->pSyncQ);
pVnode->pSyncQ = NULL;
}
static int32_t dnodeInitVnodeSyncWorker() {
int32_t maxThreads = tsNumOfCores / 2;
if (maxThreads < 1) maxThreads = 1;
SMWorkerPool *pPool = &tsVnodes.writePool;
pPool->name = "vnode-sync";
pPool->max = maxThreads;
if (tMWorkerInit(pPool) != 0) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeCleanupVnodeSyncWorker() { tMWorkerCleanup(&tsVnodes.syncPool); }
static int32_t dnodeInitVnodeModule() {
SVnodePara para;
para.SendMsgToDnode = dnodeSendMsgToDnode;
para.SendMsgToMnode = dnodeSendMsgToMnode;
para.PutMsgIntoApplyQueue = dnodePutMsgIntoVnodeApplyQueue;
return vnodeInit(para);
}
int32_t dnodeInitVnodes() {
dInfo("dnode-vnodes start to init");
SSteps *pSteps = taosStepInit(3, dnodeReportStartup);
taosStepAdd(pSteps, "dnode-vnode-env", dnodeInitVnodeModule, vnodeCleanup);
taosStepAdd(pSteps, "dnode-vnode-mgmt", dnodeInitVnodeMgmtWorker, dnodeCleanupVnodeMgmtWorker);
taosStepAdd(pSteps, "dnode-vnode-read", dnodeInitVnodeReadWorker, dnodeCleanupVnodeReadWorker);
taosStepAdd(pSteps, "dnode-vnode-write", dnodeInitVnodeWriteWorker, dnodeCleanupVnodeWriteWorker);
taosStepAdd(pSteps, "dnode-vnode-sync", dnodeInitVnodeSyncWorker, dnodeCleanupVnodeSyncWorker);
taosStepAdd(pSteps, "dnode-vnodes", dnodeOpenVnodes, dnodeCleanupVnodes);
tsVnodes.pSteps = pSteps;
return taosStepExec(pSteps);
}
void dnodeCleanupVnodes() {
if (tsVnodes.pSteps != NULL) {
dInfo("dnode-vnodes start to clean up");
taosStepCleanup(tsVnodes.pSteps);
tsVnodes.pSteps = NULL;
dInfo("dnode-vnodes is cleaned up");
}
}
void dnodeGetVnodeLoads(SVnodeLoads *pLoads) {
pLoads->num = taosHashGetSize(tsVnodes.hash);
int32_t v = 0;
void *pIter = taosHashIterate(tsVnodes.hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
if (ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode;
if (pVnode == NULL) continue;
SVnodeLoad *pLoad = &pLoads->data[v++];
vnodeGetLoad(pVnode->pImpl, pLoad);
pLoad->vgId = htonl(pLoad->vgId);
pLoad->totalStorage = htobe64(pLoad->totalStorage);
pLoad->compStorage = htobe64(pLoad->compStorage);
pLoad->pointsWritten = htobe64(pLoad->pointsWritten);
pLoad->tablesNum = htobe64(pLoad->tablesNum);
pIter = taosHashIterate(tsVnodes.hash, pIter);
}
}
...@@ -208,7 +208,7 @@ typedef struct { ...@@ -208,7 +208,7 @@ typedef struct {
typedef struct SDbObj { typedef struct SDbObj {
SdbHead head; SdbHead head;
char name[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char name[TSDB_FULL_DB_NAME_LEN];
char acct[TSDB_USER_LEN]; char acct[TSDB_USER_LEN];
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
...@@ -236,7 +236,7 @@ typedef struct SVgObj { ...@@ -236,7 +236,7 @@ typedef struct SVgObj {
int64_t updateTime; int64_t updateTime;
int32_t lbDnodeId; int32_t lbDnodeId;
int32_t lbTime; int32_t lbTime;
char dbName[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char dbName[TSDB_FULL_DB_NAME_LEN];
int8_t inUse; int8_t inUse;
int8_t accessState; int8_t accessState;
int8_t status; int8_t status;
...@@ -288,7 +288,7 @@ typedef struct { ...@@ -288,7 +288,7 @@ typedef struct {
void *pIter; void *pIter;
void *pVgIter; void *pVgIter;
void **ppShow; void **ppShow;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
int32_t bytes[TSDB_MAX_COLUMNS]; int32_t bytes[TSDB_MAX_COLUMNS];
char payload[]; char payload[];
......
...@@ -15,19 +15,58 @@ ...@@ -15,19 +15,58 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "vnodeInt.h" #include "vnodeInt.h"
#include "tqueue.h"
int32_t vnodeInit() { return 0; } int32_t vnodeInit(SVnodePara para) { return 0; }
void vnodeCleanup() {} void vnodeCleanup() {}
int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat) { return 0; }
int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus) { return 0; }
SVnode *vnodeOpen(int32_t vgId, const char *path) { return NULL; } SVnode *vnodeOpen(int32_t vgId, const char *path) { return NULL; }
void vnodeClose(SVnode *pVnode) {} void vnodeClose(SVnode *pVnode) {}
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; } SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; }
int32_t vnodeDrop(SVnode *pVnode) { return 0; } void vnodeDrop(SVnode *pVnode) {}
int32_t vnodeCompact(SVnode *pVnode) { return 0; } int32_t vnodeCompact(SVnode *pVnode) { return 0; }
int32_t vnodeSync(SVnode *pVnode) { return 0; } int32_t vnodeSync(SVnode *pVnode) { return 0; }
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg) {} int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; }
SVnodeMsg *vnodeInitMsg(int32_t msgNum) {
SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg));
if (pMsg == NULL) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
return NULL;
} else {
pMsg->allocNum = msgNum;
return pMsg;
}
}
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) {
if (pMsg->curNum >= pMsg->allocNum) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg;
}
void vnodeCleanupMsg(SVnodeMsg *pMsg) {
for (int32_t i = 0; i < pMsg->curNum; ++i) {
rpcFreeCont(pMsg->rpcMsg[i].pCont);
}
taosFreeQitem(pMsg);
}
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType) {
switch (msgType) {
case VN_MSG_TYPE_WRITE:
break;
case VN_MSG_TYPE_APPLY:
break;
case VN_MSG_TYPE_SYNC:
break;
case VN_MSG_TYPE_QUERY:
break;
case VN_MSG_TYPE_FETCH:
break;
}
}
...@@ -1934,7 +1934,7 @@ char* cloneCurrentDBName(SSqlObj* pSql) { ...@@ -1934,7 +1934,7 @@ char* cloneCurrentDBName(SSqlObj* pSql) {
case TAOS_REQ_FROM_HTTP: case TAOS_REQ_FROM_HTTP:
pCtx = pSql->param; pCtx = pSql->param;
if (pCtx && pCtx->db[0] != '\0') { if (pCtx && pCtx->db[0] != '\0') {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN] = {0}; char db[TSDB_FULL_DB_NAME_LEN] = {0};
int32_t len = sprintf(db, "%s%s%s", pTscObj->acctId, TS_PATH_DELIMITER, pCtx->db); int32_t len = sprintf(db, "%s%s%s", pTscObj->acctId, TS_PATH_DELIMITER, pCtx->db);
assert(len <= sizeof(db)); assert(len <= sizeof(db));
......
...@@ -235,6 +235,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message lengt ...@@ -235,6 +235,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message lengt
TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_TOO_MANY_VNODES, "Too many vnode directories") TAOS_DEFINE_ERROR(TSDB_CODE_DND_TOO_MANY_VNODES, "Too many vnode directories")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_EXITING, "Dnode is exiting") TAOS_DEFINE_ERROR(TSDB_CODE_DND_EXITING, "Dnode is exiting")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR, "Parse vnodes.json error")
// vnode // vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, "Action in progress")
......
...@@ -98,6 +98,20 @@ void taosCloseQueue(taos_queue param) { ...@@ -98,6 +98,20 @@ void taosCloseQueue(taos_queue param) {
uTrace("queue:%p is closed", queue); uTrace("queue:%p is closed", queue);
} }
bool taosQueueEmpty(taos_queue param) {
if (param == NULL) return true;
STaosQueue *queue = (STaosQueue *)param;
bool empty = false;
pthread_mutex_lock(&queue->mutex);
if (queue->head == NULL && queue->tail == NULL) {
empty = true;
}
pthread_mutex_destroy(&queue->mutex);
return empty;
}
void *taosAllocateQitem(int size) { void *taosAllocateQitem(int size) {
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
......
...@@ -76,7 +76,7 @@ static void *tWorkerThreadFp(SWorker *worker) { ...@@ -76,7 +76,7 @@ static void *tWorkerThreadFp(SWorker *worker) {
} }
if (fp) { if (fp) {
(*fp)(msg, ahandle); (*fp)(ahandle, msg);
} }
} }
...@@ -186,7 +186,7 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) { ...@@ -186,7 +186,7 @@ static void *tWriteWorkerThreadFp(SMWorker *worker) {
} }
if (fp) { if (fp) {
(*fp)(worker->qall, numOfMsgs, ahandle); (*fp)(ahandle, worker->qall, numOfMsgs);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册