提交 62743ebd 编写于 作者: H Haojun Liao

Merge branch '3.0' into feature/3.0_liaohj

# Small Materialized Aggragates
**SMA** (**S**mall **M**aterialized **A**ggrates) is used to speed up the query process on materialized data cube in TDengine. TDengine 3.0 gives more flexibility on the SMA configurations.
There are two kinds of SMA in TDengine:
1. Block-wise SMA
2. Time-range-wise SMA
<!--
```plantuml
@startmindmap mind_map_test
* SMA
** Block-wise SMA
** Time-range-wise SMA
@endmindmap
``` -->
![SMA in TDengine 3.0](http://www.plantuml.com/plantuml/png/Kr1GK70eBaaiAidDp4l9JInG0D7nG4PyIMfn2HTGMa5B8TZN4SBIKd3AoK_ErYtFB4v55Wt9p4tLBKhCIqz5bN981HeACHW0)
## Block-wise SMA
Block-wise SMA is created by default when the data are committed. Since time-series data are saved as block data in files, a corresponding SMA is create when the data block is written. The default block-wise SMA includes:
1. sum(*)
2. max(*)
3. min(*)
By default, the system will create SMA for each column except those columns with type *binary* and *nchar*. However, users can change the behavior by the keyword **NOSMA** to disable the SMA for a certain column like below:
```SQL
# create a super table with the SMA on column b disabled
create table st (ts timestamp, a int, b int NOSMA, c double) tags (tg1 binary(10), tg2 int);
```
## Time-range-wise SMA
In addition to the default block-wise SMA, users can create their own SMAs ondemand. Below is an example to create a SMA.
```SQL
# create a SMA every 10 minutes with SMA of sum, max and min
create sma_indx sma_10min on st (sum(*), max(*), min(*), twa(*)) interval(10m);
```
Users can also drop a time-range-wise SMA like below:
```SQL
# drop the sma index
drop sma_index sma_5min on st;
```
**NOTE: Creating an SMA index is a heavy operation which may take a long time and block the write operation. So create the time-range-wise SMA when creating the table or when there are not too much data.**
\ No newline at end of file
......@@ -36,91 +36,104 @@ enum {
TSDB_MESSAGE_NULL = 0,
#endif
// message from client to dnode
// message from client to vnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_TABLE, "create-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_TABLE, "drop-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_TABLE, "alter-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLE_META, "table-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLES_META, "tables-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" )
// message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_ACCT, "alter-acct" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_ACCT, "drop-acct" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_USER, "create-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_USER, "alter-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_USER, "drop-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DNODE, "create-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DNODE, "drop-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DB, "create-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DB, "drop-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_USE_DB, "use-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_DB, "alter-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_DB, "sync-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_TOPIC, "create-topic" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_TOPIC, "drop-topic" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_TOPIC, "alter-topic" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_FUNCTION, "create-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_FUNCTION, "alter-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE, "create-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_HEARTBEAT, "heartbeat" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE, "retrieve" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC, "retrieve-func" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE, "compact-vnode" )
// message from client to qnode
// message from client to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
// message from vnode to vnode
// message from vnode to mnode
// message from vnode to qnode
// message from vnode to dnode
// message from mnode to vnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" )
// message from mnode to mnode
// message from mnode to qnode
// message from mnode to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH_VNODE_IN, "auth-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" )
// message from qnode to vnode
// message from qnode to mnode
// message from qnode to qnode
// message from qnode to dnode
// message from dnode to vnode
// message from dnode to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STATUS, "status" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_GRANT, "grant" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_AUTH, "auth" )
// message from dnode to qnode
// message from dnode to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
// message from mnode to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_TABLE, "create-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_TABLE, "drop-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_TABLE, "alter-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_VNODE, "create-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_VNODE, "drop-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_SYNC_VNODE, "sync-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
// message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_ACCT, "create-acct" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_ACCT, "alter-acct" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_ACCT, "drop-acct" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_USER, "create-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_USER, "alter-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_USER, "drop-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DNODE, "create-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DNODE, "drop-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DB, "create-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_FUNCTION, "create-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DB, "drop-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_FUNCTION, "drop-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_DB, "use-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_DB, "alter-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SYNC_DB, "sync-db-replica" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TABLE, "create-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLE_META, "table-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "multiTable-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SHOW, "show" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE, "retrieve" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_QUERY, "kill-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_STREAM, "kill-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_CONN, "kill-conn" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONFIG_DNODE, "cm-config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_HEARTBEAT, "heartbeat" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE_FUNC, "retrieve-func" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" )
// message from dnode to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_TABLE, "config-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_VNODE, "config-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_STATUS, "status" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_GRANT, "grant" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_AUTH, "auth" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
// message for topic
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" )
//TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
#ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 147
......@@ -312,7 +325,7 @@ typedef struct {
typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN];
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
char db[TSDB_FULL_DB_NAME_LEN];
int16_t type; /* operation type */
int16_t numOfCols; /* number of schema */
int32_t tagValLen;
......@@ -404,10 +417,6 @@ typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN];
} SDropSTableMsg;
typedef struct {
int32_t vgId;
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
typedef struct SColIndex {
int16_t colId; // column id
int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag
......@@ -561,21 +570,7 @@ typedef struct SRetrieveTableRsp {
} SRetrieveTableRsp;
typedef struct {
int32_t vgId;
int32_t dbCfgVersion;
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
uint64_t vnodeVersion;
int32_t vgCfgVersion;
uint8_t status;
uint8_t role;
uint8_t replica;
uint8_t compact;
} SVnodeLoad;
typedef struct {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
char db[TSDB_FULL_DB_NAME_LEN];
int32_t cacheBlockSize; //MB
int32_t totalBlocks;
int32_t maxTables;
......@@ -640,28 +635,46 @@ typedef struct {
uint8_t ignoreNotExists;
} SDropDbMsg, SUseDbMsg, SSyncDbMsg;
// IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed
// TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE
typedef struct {
int64_t pointsWritten; // In unit of points
int64_t totalStorage; // In unit of bytes
int64_t compStorage; // In unit of bytes
int64_t queryTime; // In unit of second ??
char reserved[64];
} SVnodeStatisticInfo;
int32_t statusInterval;
int8_t reserved[4];
int64_t checkTime; // 1970-01-01 00:00:00.000
char timezone[TSDB_TIMEZONE_LEN]; // tsTimezone
char locale[TSDB_LOCALE_LEN]; // tsLocale
char charset[TSDB_LOCALE_LEN]; // tsCharset
} SClusterCfg;
typedef struct SVgroupAccess {
typedef struct {
int32_t vgId;
int8_t accessState;
} SVgroupAccess;
int8_t role;
int8_t reserved[3];
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
int64_t tablesNum;
} SVnodeLoad;
typedef struct {
int32_t num;
SVnodeLoad data[];
} SVnodeLoads;
typedef struct SStatusMsg {
uint32_t sversion;
int32_t dnodeId;
int64_t clusterId;
uint32_t rebootTime; // time stamp for last reboot
int32_t numOfCores;
char dnodeEp[TSDB_EP_LEN];
SClusterCfg clusterCfg;
SVnodeLoads vnodeLoads;
} SStatusMsg;
typedef struct {
int32_t dnodeId;
int8_t dropped;
char reserved[19];
char reserved[3];
int64_t clusterId;
int32_t numOfDnodes;
int32_t numOfVnodes;
} SDnodeCfg;
typedef struct {
......@@ -677,32 +690,9 @@ typedef struct {
SDnodeEp dnodeEps[];
} SDnodeEps;
typedef struct {
int32_t statusInterval; // tsStatusInterval
int8_t reserved[36];
int64_t checkTime; // 1970-01-01 00:00:00.000
char timezone[64]; // tsTimezone
char locale[TSDB_LOCALE_LEN]; // tsLocale
char charset[TSDB_LOCALE_LEN]; // tsCharset
} SClusterCfg;
typedef struct SStatusMsg {
uint32_t version;
int32_t dnodeId;
uint32_t lastReboot; // time stamp for last reboot
int32_t openVnodes;
int32_t numOfCores;
float diskAvailable;
int8_t reserved[36];
char dnodeEp[TSDB_EP_LEN];
int64_t clusterId;
SClusterCfg clusterCfg;
SVnodeLoad load[];
} SStatusMsg;
typedef struct {
SDnodeCfg dnodeCfg;
SVgroupAccess vgAccess[];
SDnodeEps dnodeEps;
} SStatusRsp;
typedef struct {
......@@ -711,7 +701,7 @@ typedef struct {
} SVnodeDesc;
typedef struct {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
char db[TSDB_FULL_DB_NAME_LEN];
uint32_t vgId;
int32_t cacheBlockSize;
int32_t totalBlocks;
......@@ -731,9 +721,18 @@ typedef struct {
int8_t replica;
int8_t quorum;
int8_t selfIndex;
SVnodeDesc nodes[TSDB_MAX_REPLICA];
SVnodeDesc replicas[TSDB_MAX_REPLICA];
} SCreateVnodeMsg, SAlterVnodeMsg;
typedef struct {
int32_t vgId;
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
typedef struct {
int32_t vgId;
int8_t accessState;
} SAuthVnodeMsg;
typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN];
int16_t createFlag;
......@@ -805,13 +804,13 @@ typedef struct {
*/
typedef struct {
int8_t type;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
char db[TSDB_FULL_DB_NAME_LEN];
uint16_t payloadLen;
char payload[];
} SShowMsg;
typedef struct {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
char db[TSDB_FULL_DB_NAME_LEN];
int32_t numOfVgroup;
int32_t vgid[];
} SCompactMsg;
......@@ -827,9 +826,7 @@ typedef struct {
typedef struct {
int32_t dnodeId;
int32_t mnodeNum;
SDnodeEp mnodeEps[];
} SCreateMnodeMsg;
} SCreateMnodeMsg, SDropMnodeMsg;
typedef struct {
int32_t dnodeId;
......@@ -895,16 +892,7 @@ typedef struct {
typedef struct {
char queryId[TSDB_KILL_MSG_LEN + 1];
} SKillQueryMsg, SKillStreamMsg, SKillConnMsg;
typedef struct {
int32_t vnode;
int32_t sid;
uint64_t uid;
uint64_t stime; // stream starting time
int32_t status;
char tableFname[TSDB_TABLE_FNAME_LEN];
} SAlterStreamMsg;
} SKillQueryMsg, SKillConnMsg;
typedef struct {
char user[TSDB_USER_LEN];
......@@ -919,8 +907,7 @@ typedef struct {
int8_t reserved1[7];
char name[TSDB_STEP_NAME_LEN];
char desc[TSDB_STEP_DESC_LEN];
char reserved2[64];
} SStartupStep;
} SStartupMsg;
// mq related
typedef struct {
......@@ -1008,6 +995,7 @@ typedef struct {
/* data */
} SUpdateTagValRsp;
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -198,7 +198,7 @@ extern SDiskCfg tsDiskCfg[];
void taosInitGlobalCfg();
int32_t taosCheckGlobalCfg();
bool taosCfgDynamicOptions(char *msg);
int32_t taosCfgDynamicOptions(char *msg);
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *dnodeId);
void taosAddDataDir(int index, char *v1, int level, int primary);
......
......@@ -22,22 +22,16 @@ extern "C" {
#include <stdbool.h>
#include <stdint.h>
#include "taosdef.h"
#include "taosmsg.h"
#define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1
extern int tsRpcHeadSize;
typedef struct SRpcEpSet {
int8_t inUse;
int8_t numOfEps;
uint16_t port[TSDB_MAX_REPLICA];
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
} SRpcEpSet;
typedef struct SRpcCorEpSet {
int32_t version;
SRpcEpSet epSet;
SEpSet epSet;
} SRpcCorEpSet;
typedef struct SRpcConnInfo {
......@@ -72,7 +66,7 @@ typedef struct SRpcInit {
char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app
void (*cfp)(SRpcMsg *, SRpcEpSet *);
void (*cfp)(SRpcMsg *, SEpSet *);
// call back to retrieve the client auth info, for server app only
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
......@@ -85,11 +79,11 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet);
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_H_
#define _TD_DNODE_H_
#ifdef __cplusplus
extern "C" {
#endif
struct SRpcEpSet;
struct SRpcMsg;
/**
* Initialize and start the dnode module.
*
* @return Error code.
*/
int32_t dnodeInit();
/**
* Stop and cleanup dnode module.
*/
void dnodeCleanup();
/**
* Send messages to other dnodes, such as create vnode message.
*
* @param epSet, the endpoint list of the dnodes.
* @param rpcMsg, message to be sent.
*/
void dnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
/**
* Send messages to mnode, such as config message.
*
* @param rpcMsg, message to be sent.
*/
void dnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
/**
* Send redirect message to dnode or shell.
*
* @param rpcMsg, message to be sent.
* @param forShell, used to identify whether to send to shell or dnode.
*/
void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
/**
* Get the corresponding endpoint information from dnodeId.
*
* @param dnodeId, the id ot dnode.
* @param ep, the endpoint of dnode.
* @param fqdn, the fqdn of dnode.
* @param port, the port of dnode.
*/
void dnodeGetEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
/**
* Report the startup progress.
*/
void dnodeReportStartup(char *name, char *desc);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_H_*/
......@@ -20,41 +20,23 @@
extern "C" {
#endif
typedef enum { MN_STATUS_UNINIT = 0, MN_STATUS_INIT = 1, MN_STATUS_READY = 2, MN_STATUS_CLOSING = 3 } EMnStatus;
typedef struct {
/**
* Send messages to other dnodes, such as create vnode message.
*
* @param epSet, the endpoint list of the dnodes.
* @param rpcMsg, message to be sent.
*/
void (*SendMsgToDnode)(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
int64_t numOfDnode;
int64_t numOfMnode;
int64_t numOfVgroup;
int64_t numOfDatabase;
int64_t numOfSuperTable;
int64_t numOfChildTable;
int64_t numOfColumn;
int64_t totalPoints;
int64_t totalStorage;
int64_t compStorage;
} SMnodeStat;
/**
* Send messages to mnode, such as config message.
*
* @param rpcMsg, message to be sent.
*/
typedef struct {
void (*SendMsgToDnode)(struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
void (*SendMsgToMnode)(struct SRpcMsg *rpcMsg);
/**
* Send redirect message to dnode or shell.
*
* @param rpcMsg, message to be sent.
* @param forShell, used to identify whether to send to shell or dnode.
*/
void (*SendRedirectMsg)(struct SRpcMsg *rpcMsg, bool forShell);
/**
* Get the corresponding endpoint information from dnodeId.
*
* @param dnode, the instance of dDnode module.
* @param dnodeId, the id ot dnode.
* @param ep, the endpoint of dnode.
* @param fqdn, the fqdn of dnode.
* @param port, the port of dnode.
*/
void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
} SMnodeFp;
......@@ -64,77 +46,16 @@ typedef struct {
int32_t dnodeId;
} SMnodePara;
/**
* Initialize and start mnode module.
*
* @param para, initialization parameters.
* @return Error code.
*/
int32_t mnodeInit(SMnodePara para);
/**
* Stop and cleanup mnode module.
*/
void mnodeCleanup();
/**
* Deploy mnode instances in dnode.
*
* @return Error Code.
*/
int32_t mnodeDeploy();
/**
* Delete the mnode instance deployed in dnode.
*/
void mnodeUnDeploy();
int32_t mnodeStart();
void mnodeStop();
/**
* Whether the mnode is in service.
*
* @return Server status.
*/
EMnStatus mnodeGetStatus();
typedef struct {
int64_t numOfDnode;
int64_t numOfMnode;
int64_t numOfVgroup;
int64_t numOfDatabase;
int64_t numOfSuperTable;
int64_t numOfChildTable;
int64_t numOfColumn;
int64_t totalPoints;
int64_t totalStorage;
int64_t compStorage;
} SMnodeStat;
/**
* Get the statistical information of Mnode.
*
* @param stat, statistical information.
* @return Error Code.
*/
int32_t mnodeGetStatistics(SMnodeStat *stat);
/**
* Get the auth information of Mnode.
*
* @param user, username.
* @param spi, security parameter index.
* @param encrypt, encrypt algorithm.
* @param secret, key for authentication.
* @param ckey, ciphering key.
* @return Error Code.
*/
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
/**
* Interface for processing messages.
*
* @param rpcMsg, message to be processed.
* @return Error code.
*/
void mnodeProcessMsg(SRpcMsg *rpcMsg);
#ifdef __cplusplus
......
......@@ -19,6 +19,9 @@
#include "os.h"
#include "tutil.h"
#define TQ_ACTION_INSERT 0x7f7f7f7fULL
#define TQ_ACTION_DELETE 0x80808080ULL
#ifdef __cplusplus
extern "C" {
#endif
......
......@@ -13,17 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MQTT_PLYLOAD_H
#define TDENGINE_MQTT_PLYLOAD_H
#ifndef _TD_TSDB_IMPL_H_
#define _TD_TSDB_IMPL_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
char* mqttConverJsonToSql(char* json, int maxSize);
struct STsdbOptions {
/* TODO */
};
#ifdef __cplusplus
}
#endif
#endif
#endif /*_TD_TSDB_IMPL_H_*/
\ No newline at end of file
......@@ -16,67 +16,24 @@
#ifndef _TD_TSDB_H_
#define _TD_TSDB_H_
#include "os.h"
#include "taosmsg.h"
#include "impl/tsdbImpl.h"
#ifdef __cplusplus
extern "C" {
#endif
// Types exported
// TYPES EXPOSED
typedef struct STsdb STsdb;
typedef struct STsdbOptions STsdbOptions;
typedef struct STsdbSMAOptions STsdbSMAOptions; // SMA stands for Small Materialized Aggregation
typedef struct STsdbReadOptions STsdbReadOptions;
typedef struct STsdbSnapshot STsdbSnapshot;
typedef struct STsdbQueryHandle STsdbQueryHandle;
// DB operations
int tsdbCreate(const char *path);
int tsdbDestroy(const char *path);
STsdb *tsdbOpen(const STsdbOptions *options);
// STsdb
STsdb *tsdbOpen(const char *path, const STsdbOptions *);
void tsdbClose(STsdb *);
int tsdbReset(STsdb *, const STsdbOptions *);
int tsdbInsert(STsdb *, SSubmitReq *, SSubmitRsp *);
int tsdbCommit(STsdb *);
int tsdbCompact(STsdb *);
void tsdbRemove(const char *path);
// Options
STsdbOptions *tsdbOptionsCreate();
void tsdbOptionsDestroy(STsdbOptions *);
void tsdbOptionsSetId(STsdbOptions *, int id);
void tsdbOptionsSetHoursPerFile(STsdbOptions *, int hours);
void tsdbOptionsSetRetention(STsdbOptions *, int keep, int keep1, int keep2);
void tsdbOptionsSetMinAndMaxRows(STsdbOptions *, int minRows, int maxRows);
void tsdbOptionsSetPrecision(STsdbOptions *, int);
void tsdbOptionsSetCache(STsdbOptions *, int);
typedef enum { TSDB_NO_UPDATE = 0, TSDB_WHOLE_ROW_UPDATE = 1, TSDB_PARTIAL_ROW_UPDATE = 2 } ETsdbUpdateType;
void tsdbOptionsSetUpdate(STsdbOptions *, ETsdbUpdateType);
void tsdbOptionsSetSMA(STsdbOptions *, STsdbSMAOptions *);
// STsdbSMAOptions
STsdbSMAOptions *tsdbSMAOptionsCreate();
void tsdbSMAOptionsDestroy(STsdbSMAOptions *);
// void tsdbSMAOptionsSetFuncs(STsdbSMAOptions *, SArray * /*Array of function to perform on each block*/);
// void tsdbSMAOptionsSetIntervals(STsdbSMAOptions *, SArray *);
// void tsdbSMAOptionsSetColTypes(STsdbSMAOptions *, SArray *);
// STsdbQueryHandle
STsdbQueryHandle *tsdbQueryHandleCreate(STsdb *, STsdbReadOptions *);
void tsdbQueryHandleDestroy(STsdbQueryHandle *);
void tsdbResetQueryHandle(STsdbQueryHandle *, STsdbReadOptions *);
bool tsdbNextDataBlock(STsdbQueryHandle *);
// void tsdbGetDataBlockInfo(STsdbQueryHandle *, SDataBlockInfo *);
// void tsdbGetDataBlockStatisInfo(STsdbQueryHandle *, SDataStatis **);
// STsdbReadOptions
STsdbReadOptions *tsdbReadOptionsCreate();
void tsdbReadOptionsDestroy(STsdbReadOptions *);
void tsdbReadOptionsSetSnapshot(STsdbReadOptions *, STsdbSnapshot *);
// STsdbSnapshot
STsdbSnapshot *tsdbSnapshotCreate(STsdb *);
void tsdbSnapshotDestroy(STsdbSnapshot *);
// STsdbOptions
int tsdbOptionsInit(STsdbOptions *);
void tsdbOptionsClear(STsdbOptions *);
#ifdef __cplusplus
}
......
......@@ -16,93 +16,74 @@
#ifndef _TD_VNODE_H_
#define _TD_VNODE_H_
#include "os.h"
#include "taosmsg.h"
#include "trpc.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
/**
* Send messages to other dnodes, such as create vnode message.
*
* @param epSet, the endpoint list of dnodes.
* @param rpcMsg, message to be sent.
*/
void (*SendMsgToDnode)(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
/**
* Send messages to mnode, such as config message.
*
* @param rpcMsg, message to be sent.
*/
void (*SendMsgToMnode)(struct SRpcMsg *rpcMsg);
/**
* Get the corresponding endpoint information from dnodeId.
*
* @param dnodeId, the id ot dnode.
* @param ep, the endpoint of dnode.
* @param fqdn, the fqdn of dnode.
* @param port, the port of dnode.
*/
void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
typedef struct SVnode SVnode;
/**
* Report the startup progress.
*/
void (*ReportStartup)(char *name, char *desc);
typedef struct {
char db[TSDB_FULL_DB_NAME_LEN];
int32_t cacheBlockSize; // MB
int32_t totalBlocks;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int8_t precision; // time resolution
int8_t compression;
int8_t cacheLastRow;
int8_t update;
int8_t quorum;
int8_t replica;
int8_t walLevel;
int32_t fsyncPeriod; // millisecond
SVnodeDesc replicas[TSDB_MAX_REPLICA];
} SVnodeCfg;
typedef enum {
VN_MSG_TYPE_WRITE = 1,
VN_MSG_TYPE_APPLY,
VN_MSG_TYPE_SYNC,
VN_MSG_TYPE_QUERY,
VN_MSG_TYPE_FETCH
} EVnMsgType;
} SVnodeFp;
typedef struct {
int32_t curNum;
int32_t allocNum;
SRpcMsg rpcMsg[];
} SVnodeMsg;
typedef struct {
SVnodeFp fp;
void (*SendMsgToDnode)(SEpSet *pEpSet, SRpcMsg *pMsg);
void (*SendMsgToMnode)(SRpcMsg *pMsg);
int32_t (*PutMsgIntoApplyQueue)(int32_t vgId, SVnodeMsg *pMsg);
} SVnodePara;
/**
* Start initialize vnode module.
*
* @param para, initialization parameters.
* @return Error code.
*/
int32_t vnodeInit(SVnodePara para);
/**
* Cleanup vnode module.
*/
int32_t vnodeInit(SVnodePara);
void vnodeCleanup();
typedef struct {
int32_t unused;
} SVnodeStat;
/**
* Get the statistical information of vnode.
*
* @param stat, statistical information.
* @return Error Code.
*/
int32_t vnodeGetStatistics(SVnodeStat *stat);
/**
* Get the status of all vnodes.
*
* @param status, status msg.
*/
void vnodeGetStatus(struct SStatusMsg *status);
SVnode *vnodeOpen(int32_t vgId, const char *path);
void vnodeClose(SVnode *pVnode);
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg);
void vnodeDrop(SVnode *pVnode);
int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode);
/**
* Set access permissions for all vnodes.
*
* @param access, access permissions of vnodes.
* @param numOfVnodes, the size of vnodes.
*/
void vnodeSetAccess(struct SVgroupAccess *access, int32_t numOfVnodes);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
/**
* Interface for processing messages.
*
* @param msg, message to be processed.
*/
void vnodeProcessMsg(SRpcMsg *msg);
SVnodeMsg *vnodeInitMsg(int32_t msgNum);
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg);
void vnodeCleanupMsg(SVnodeMsg *pMsg);
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType);
#ifdef __cplusplus
}
......
......@@ -216,11 +216,14 @@ int32_t* taosGetErrno();
// dnode
#define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) //"Message not processed")
#define TSDB_CODE_DND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0401) //"Dnode out of memory")
#define TSDB_CODE_DND_NO_WRITE_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0402) //"No permission for disk files in dnode")
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0403) //"Invalid message length")
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0404) //"Action in progress")
#define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0405) //"Too many vnode directories")
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0406) //"Dnode is exiting"
#define TSDB_CODE_DND_DNODE_ID_NOT_MATCHED TAOS_DEF_ERROR_CODE(0, 0x0402) //"Dnode Id not matched")
#define TSDB_CODE_DND_MNODE_ALREADY_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0403) //"Mnode already deployed")
#define TSDB_CODE_DND_NO_WRITE_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0404) //"No permission for disk files in dnode")
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0405) //"Invalid message length")
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0406) //"Action in progress")
#define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0407) //"Too many vnode directories")
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0408) //"Dnode is exiting"
#define TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0409) //"Parse vnodes.json error")
// vnode
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress")
......
......@@ -162,11 +162,12 @@ do { \
#define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 33
#define TSDB_FULL_DB_NAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN)
#define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_CODE_LEN (65535 - 512)
#define TSDB_FUNC_BUF_SIZE 512
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN)
#define TSDB_TABLE_FNAME_LEN (TSDB_FULL_DB_NAME_LEN + TSDB_TABLE_NAME_LEN)
#define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
......@@ -220,7 +221,7 @@ do { \
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_CQ_SQL_SIZE 1024
#define TSDB_MIN_VNODES 64
#define TSDB_MAX_VNODES 2048
#define TSDB_MAX_VNODES 512
#define TSDB_MIN_VNODES_PER_DB 2
#define TSDB_MAX_VNODES_PER_DB 64
......
......@@ -37,21 +37,25 @@ shall be used to set up the protection.
*/
typedef void* taos_queue;
typedef void* taos_qset;
typedef void* taos_qall;
typedef void *taos_queue;
typedef void *taos_qset;
typedef void *taos_qall;
typedef void (*FProcessItem)(void *ahandle, void *pItem);
typedef void (*FProcessItems)(void *ahandle, taos_qall qall, int numOfItems);
taos_queue taosOpenQueue();
void taosCloseQueue(taos_queue);
void taosSetQueueFp(taos_queue, FProcessItem, FProcessItems);
void *taosAllocateQitem(int size);
void taosFreeQitem(void *item);
int taosWriteQitem(taos_queue, int type, void *item);
int taosReadQitem(taos_queue, int *type, void **pitem);
void taosFreeQitem(void *pItem);
int taosWriteQitem(taos_queue, void *pItem);
int taosReadQitem(taos_queue, void **pItem);
bool taosQueueEmpty(taos_queue);
taos_qall taosAllocateQall();
void taosFreeQall(taos_qall);
int taosReadAllQitems(taos_queue, taos_qall);
int taosGetQitem(taos_qall, int *type, void **pitem);
int taosGetQitem(taos_qall, void **pItem);
void taosResetQitems(taos_qall);
taos_qset taosOpenQset();
......@@ -61,8 +65,8 @@ int taosAddIntoQset(taos_qset, taos_queue, void *ahandle);
void taosRemoveFromQset(taos_qset, taos_queue);
int taosGetQueueNumber(taos_qset);
int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle);
int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **handle);
int taosReadQitemFromQset(taos_qset, void **pItem, void **ahandle, FProcessItem *);
int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **ahandle, FProcessItems *);
int taosGetQueueItemsNumber(taos_queue param);
int taosGetQsetItemsNumber(taos_qset param);
......
......@@ -22,13 +22,6 @@
extern "C" {
#endif
typedef int32_t (*ProcessStartFp)(void *ahandle, void *pMsg, int32_t qtype);
typedef void (*ProcessEndFp)(void *ahandle, void *pMsg, int32_t qtype, int32_t code);
typedef bool (*ProcessWriteStartFp)(void *ahandle, void *pMsg, int32_t qtype);
typedef void (*ProcessWriteSyncFp)(void *ahandle, int32_t code);
typedef void (*ProcessWriteEndFp)(void *ahandle, void *pMsg, int32_t qtype);
typedef struct SWorker {
int32_t id; // worker ID
pthread_t thread; // thread
......@@ -40,41 +33,36 @@ typedef struct SWorkerPool {
int32_t min; // min number of workers
int32_t num; // current number of workers
taos_qset qset;
const char * name;
ProcessStartFp startFp;
ProcessEndFp endFp;
SWorker * workers;
const char *name;
SWorker *workers;
pthread_mutex_t mutex;
} SWorkerPool;
typedef struct SWriteWorker {
typedef struct SMWorker {
int32_t id; // worker id
pthread_t thread; // thread
taos_qall qall;
taos_qset qset; // queue set
struct SWriteWorkerPool *pool;
} SWriteWorker;
struct SMWorkerPool *pool;
} SMWorker;
typedef struct SWriteWorkerPool {
typedef struct SMWorkerPool {
int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic
const char * name;
ProcessWriteStartFp startFp;
ProcessWriteSyncFp syncFp;
ProcessWriteEndFp endFp;
SWriteWorker * workers;
const char *name;
SMWorker *workers;
pthread_mutex_t mutex;
} SWriteWorkerPool;
} SMWorkerPool;
int32_t tWorkerInit(SWorkerPool *pool);
void tWorkerCleanup(SWorkerPool *pool);
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle);
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp);
void tWorkerFreeQueue(SWorkerPool *pool, taos_queue queue);
int32_t tWriteWorkerInit(SWriteWorkerPool *pool);
void tWriteWorkerCleanup(SWriteWorkerPool *pool);
taos_queue tWriteWorkerAllocQueue(SWriteWorkerPool *pool, void *ahandle);
void tWriteWorkerFreeQueue(SWriteWorkerPool *pool, taos_queue queue);
int32_t tMWorkerInit(SMWorkerPool *pool);
void tMWorkerCleanup(SMWorkerPool *pool);
taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp);
void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue);
#ifdef __cplusplus
}
......
......@@ -3,4 +3,4 @@ add_subdirectory(util)
add_subdirectory(common)
add_subdirectory(libs)
add_subdirectory(client)
add_subdirectory(server)
\ No newline at end of file
add_subdirectory(dnode)
\ No newline at end of file
......@@ -277,13 +277,13 @@ void taosSetAllDebugFlag() {
}
}
bool taosCfgDynamicOptions(char *msg) {
int32_t taosCfgDynamicOptions(char *msg) {
char *option, *value;
int32_t olen, vlen;
int32_t vint = 0;
paGetToken(msg, &option, &olen);
if (olen == 0) return false;;
if (olen == 0) return -1;;
paGetToken(option + olen + 1, &value, &vlen);
if (vlen == 0)
......@@ -324,18 +324,18 @@ bool taosCfgDynamicOptions(char *msg) {
uError("monitor can't be updated, for monitor not initialized");
}
}
return true;
return 0;
}
if (strncasecmp(cfg->option, "debugFlag", olen) == 0) {
taosSetAllDebugFlag();
}
return true;
return 0;
}
if (strncasecmp(option, "resetlog", 8) == 0) {
taosResetLog();
taosPrintGlobalCfg();
return true;
return 0;
}
if (strncasecmp(option, "resetQueryCache", 15) == 0) {
......
......@@ -201,7 +201,7 @@ int32_t tNameExtractFullName(const SName* name, char* dst) {
return -1;
}
int32_t len = snprintf(dst, TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN, "%s.%s", name->acctId, name->dbname);
int32_t len = snprintf(dst, TSDB_FULL_DB_NAME_LEN, "%s.%s", name->acctId, name->dbname);
size_t tnameLen = strlen(name->tname);
if (tnameLen > 0) {
......
add_subdirectory(mnode)
add_subdirectory(vnode)
add_subdirectory(qnode)
add_subdirectory(dnode)
aux_source_directory(. TAOSD_SRC)
add_executable(taosd ${TAOSD_SRC})
target_link_libraries(
taosd
PUBLIC dnode
PUBLIC util
)
\ No newline at end of file
add_subdirectory(mgmt)
aux_source_directory(src DNODE_SRC)
add_library(dnode ${DNODE_SRC})
add_executable(taosd ${DNODE_SRC})
target_link_libraries(
dnode
taosd
PUBLIC cjson
PUBLIC mnode
PUBLIC vnode
......@@ -10,7 +10,7 @@ target_link_libraries(
PUBLIC taos
)
target_include_directories(
dnode
taosd
PUBLIC "${CMAKE_SOURCE_DIR}/include/server/dnode"
private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
......@@ -13,30 +13,27 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_EPS_H_
#define _TD_DNODE_EPS_H_
#ifndef _TD_DNODE_DNODE_H_
#define _TD_DNODE_DNODE_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitEps();
void dnodeCleanupEps();
int32_t dnodeInitDnode();
void dnodeCleanupDnode();
void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeUpdateCfg(SDnodeCfg *data);
void dnodeUpdateDnodeEps(SDnodeEps *data);
void dnodeUpdateMnodeEps(SRpcEpSet *pEpSet);
int32_t dnodeGetDnodeId();
int64_t dnodeGetClusterId();
void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetEpSetForShell(SRpcEpSet *epSet);
void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
void dnodeGetMnodeEpSetForPeer(SEpSet *epSet);
void dnodeGetMnodeEpSetForShell(SEpSet *epSet);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_EPS_H_*/
\ No newline at end of file
#endif /*_TD_DNODE_DNODE_H_*/
\ No newline at end of file
......@@ -24,8 +24,6 @@ extern "C" {
#include "tglobal.h"
#include "tlog.h"
#include "trpc.h"
#include "ttimer.h"
#include "dnode.h"
extern int32_t dDebugFlag;
......@@ -37,10 +35,17 @@ extern int32_t dDebugFlag;
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat;
typedef void (*MsgFp)(SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dnodeInit();
void dnodeCleanup();
EDnStat dnodeGetRunStat();
void dnodeSetRunStat();
void dnodeGetStartup(SStartupStep *);
void dnodeReportStartup(char *name, char *desc);
void dnodeReportStartupFinished(char *name, char *desc);
void dnodeGetStartup(SStartupMsg *);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_MNODE_H_
#define _TD_DNODE_MNODE_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitMnode();
void dnodeCleanupMnode();
void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dnodeGetUserAuthFromMnode(char *user, char *spi, char *encrypt, char *secret, char *ckey);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_MNODE_H_*/
\ No newline at end of file
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_TRANS_H_
#define _TD_DNODE_TRANS_H_
#ifndef _TD_DNODE_TRANSPORT_H_
#define _TD_DNODE_TRANSPORT_H_
#ifdef __cplusplus
extern "C" {
......@@ -24,10 +24,10 @@ extern "C" {
int32_t dnodeInitTrans();
void dnodeCleanupTrans();
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_TRANS_H_*/
#endif /*_TD_DNODE_TRANSPORT_H_*/
......@@ -13,23 +13,26 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_STATUS_H_
#define _TD_DNODE_STATUS_H_
#ifndef _TD_DNODE_VNODES_H_
#define _TD_DNODE_VNODES_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitMsg();
void dnodeCleanupMsg();
void dnodeProcessStatusRsp(SRpcMsg *pMsg);
void dnodeProcessStartupReq(SRpcMsg *pMsg);
void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg);
void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg);
int32_t dnodeInitVnodes();
void dnodeCleanupVnodes();
void dnodeGetVnodeLoads(SVnodeLoads *pVloads);
void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_STATUS_H_*/
\ No newline at end of file
#endif /*_TD_DNODE_VNODES_H_*/
\ No newline at end of file
......@@ -12,9 +12,9 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "ulog.h"
#include "dnode.h"
#define _DEFAULT_SOURCE
#include "dnodeInt.h"
static bool stop = false;
static void sigintHandler(int32_t signum, void *info, void *ctx) { stop = true; }
......@@ -31,17 +31,15 @@ int main(int argc, char const *argv[]) {
int32_t code = dnodeInit();
if (code != 0) {
uInfo("Failed to start TDengine, please check the log at:%s", tsLogDir);
dInfo("Failed to start TDengine, please check the log at:%s", tsLogDir);
exit(EXIT_FAILURE);
}
uInfo("Started TDengine service successfully.");
while (!stop) {
taosMsleep(100);
}
uInfo("TDengine is shut down!");
dInfo("TDengine is shut down!");
dnodeCleanup();
return 0;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeDnode.h"
#include "dnodeTransport.h"
#include "dnodeVnodes.h"
#include "cJSON.h"
#include "thash.h"
#include "tthread.h"
#include "ttime.h"
static struct {
int32_t dnodeId;
int64_t clusterId;
SDnodeEps *dnodeEps;
SHashObj *dnodeHash;
SEpSet mnodeEpSetForShell;
SEpSet mnodeEpSetForPeer;
char file[PATH_MAX + 20];
uint32_t rebootTime;
int8_t dropped;
int8_t threadStop;
pthread_t *threadId;
pthread_mutex_t mutex;
} tsDnode = {0};
int32_t dnodeGetDnodeId() {
int32_t dnodeId = 0;
pthread_mutex_lock(&tsDnode.mutex);
dnodeId = tsDnode.dnodeId;
pthread_mutex_unlock(&tsDnode.mutex);
return dnodeId;
}
int64_t dnodeGetClusterId() {
int64_t clusterId = 0;
pthread_mutex_lock(&tsDnode.mutex);
clusterId = tsDnode.clusterId;
pthread_mutex_unlock(&tsDnode.mutex);
return clusterId;
}
void dnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port) {
pthread_mutex_lock(&tsDnode.mutex);
SDnodeEp *pEp = taosHashGet(tsDnode.dnodeHash, &dnodeId, sizeof(int32_t));
if (pEp != NULL) {
if (port) *port = pEp->dnodePort;
if (fqdn) tstrncpy(fqdn, pEp->dnodeFqdn, TSDB_FQDN_LEN);
if (ep) snprintf(ep, TSDB_EP_LEN, "%s:%u", pEp->dnodeFqdn, pEp->dnodePort);
}
pthread_mutex_unlock(&tsDnode.mutex);
}
void dnodeGetMnodeEpSetForPeer(SEpSet *pEpSet) {
pthread_mutex_lock(&tsDnode.mutex);
*pEpSet = tsDnode.mnodeEpSetForPeer;
pthread_mutex_unlock(&tsDnode.mutex);
}
void dnodeGetMnodeEpSetForShell(SEpSet *pEpSet) {
pthread_mutex_lock(&tsDnode.mutex);
*pEpSet = tsDnode.mnodeEpSetForShell;
pthread_mutex_unlock(&tsDnode.mutex);
}
void dnodeSendRedirectMsg(SRpcMsg *pMsg, bool forShell) {
int32_t msgType = pMsg->msgType;
SEpSet epSet = {0};
if (forShell) {
dnodeGetMnodeEpSetForShell(&epSet);
} else {
dnodeGetMnodeEpSetForPeer(&epSet);
}
dDebug("RPC %p, msg:%s is redirected, num:%d use:%d", pMsg->handle, taosMsg[msgType], epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]);
if (strcmp(epSet.fqdn[i], tsLocalFqdn) == 0) {
if ((epSet.port[i] == tsServerPort + TSDB_PORT_DNODEDNODE && !forShell) ||
(epSet.port[i] == tsServerPort && forShell)) {
epSet.inUse = (i + 1) % epSet.numOfEps;
dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse);
}
}
epSet.port[i] = htons(epSet.port[i]);
}
rpcSendRedirectRsp(pMsg->handle, &epSet);
}
static void dnodeUpdateMnodeEpSet(SEpSet *pEpSet) {
if (pEpSet == NULL || pEpSet->numOfEps <= 0) {
dError("mnode is changed, but content is invalid, discard it");
return;
} else {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
}
pthread_mutex_lock(&tsDnode.mutex);
tsDnode.mnodeEpSetForPeer = *pEpSet;
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
}
tsDnode.mnodeEpSetForShell = *pEpSet;
pthread_mutex_unlock(&tsDnode.mutex);
}
static void dnodePrintDnodes() {
dDebug("print dnode endpoint list, num:%d", tsDnode.dnodeEps->dnodeNum);
for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; i++) {
SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i];
dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort, ep->isMnode);
}
}
static void dnodeResetDnodes(SDnodeEps *pEps) {
assert(pEps != NULL);
int32_t size = sizeof(SDnodeEps) + pEps->dnodeNum * sizeof(SDnodeEp);
if (pEps->dnodeNum > tsDnode.dnodeEps->dnodeNum) {
SDnodeEps *tmp = calloc(1, size);
if (tmp == NULL) return;
tfree(tsDnode.dnodeEps);
tsDnode.dnodeEps = tmp;
}
if (tsDnode.dnodeEps != pEps) {
memcpy(tsDnode.dnodeEps, pEps, size);
}
tsDnode.mnodeEpSetForPeer.inUse = 0;
tsDnode.mnodeEpSetForShell.inUse = 0;
int32_t mIndex = 0;
for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; i++) {
SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i];
if (!ep->isMnode) continue;
if (mIndex >= TSDB_MAX_REPLICA) continue;
strcpy(tsDnode.mnodeEpSetForShell.fqdn[mIndex], ep->dnodeFqdn);
strcpy(tsDnode.mnodeEpSetForPeer.fqdn[mIndex], ep->dnodeFqdn);
tsDnode.mnodeEpSetForShell.port[mIndex] = ep->dnodePort;
tsDnode.mnodeEpSetForShell.port[mIndex] = ep->dnodePort + tsDnodeDnodePort;
mIndex++;
}
for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i];
taosHashPut(tsDnode.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp));
}
dnodePrintDnodes();
}
static bool dnodeIsEpChanged(int32_t dnodeId, char *epStr) {
bool changed = false;
pthread_mutex_lock(&tsDnode.mutex);
SDnodeEp *pEp = taosHashGet(tsDnode.dnodeHash, &dnodeId, sizeof(int32_t));
if (pEp != NULL) {
char epSaved[TSDB_EP_LEN + 1];
snprintf(epSaved, TSDB_EP_LEN, "%s:%u", pEp->dnodeFqdn, pEp->dnodePort);
changed = strcmp(epStr, epSaved) != 0;
}
pthread_mutex_unlock(&tsDnode.mutex);
return changed;
}
static int32_t dnodeReadDnodes() {
int32_t len = 0;
int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
FILE *fp = NULL;
fp = fopen(tsDnode.file, "r");
if (!fp) {
dDebug("file %s not exist", tsDnode.file);
goto PRASE_DNODE_OVER;
}
len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s since content is null", tsDnode.file);
goto PRASE_DNODE_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s since invalid json format", tsDnode.file);
goto PRASE_DNODE_OVER;
}
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) {
dError("failed to read %s since dnodeId not found", tsDnode.file);
goto PRASE_DNODE_OVER;
}
tsDnode.dnodeId = atoi(dnodeId->valuestring);
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", tsDnode.file);
goto PRASE_DNODE_OVER;
}
tsDnode.clusterId = atoll(clusterId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_String) {
dError("failed to read %s since dropped not found", tsDnode.file);
goto PRASE_DNODE_OVER;
}
tsDnode.dropped = atoi(dropped->valuestring);
cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
dError("failed to read %s since dnodeInfos not found", tsDnode.file);
goto PRASE_DNODE_OVER;
}
int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
if (dnodeInfosSize <= 0) {
dError("failed to read %s since dnodeInfos size:%d invalid", tsDnode.file, dnodeInfosSize);
goto PRASE_DNODE_OVER;
}
tsDnode.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps));
if (tsDnode.dnodeEps == NULL) {
dError("failed to calloc dnodeEpList since %s", strerror(errno));
goto PRASE_DNODE_OVER;
}
tsDnode.dnodeEps->dnodeNum = dnodeInfosSize;
for (int32_t i = 0; i < dnodeInfosSize; ++i) {
cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
if (dnodeInfo == NULL) break;
SDnodeEp *pEp = &tsDnode.dnodeEps->dnodeEps[i];
cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_String) {
dError("failed to read %s, dnodeId not found", tsDnode.file);
goto PRASE_DNODE_OVER;
}
pEp->dnodeId = atoi(dnodeId->valuestring);
cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode");
if (!isMnode || isMnode->type != cJSON_String) {
dError("failed to read %s, isMnode not found", tsDnode.file);
goto PRASE_DNODE_OVER;
}
pEp->isMnode = atoi(isMnode->valuestring);
cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s, dnodeFqdn not found", tsDnode.file);
goto PRASE_DNODE_OVER;
}
tstrncpy(pEp->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
if (!dnodePort || dnodePort->type != cJSON_String) {
dError("failed to read %s, dnodePort not found", tsDnode.file);
goto PRASE_DNODE_OVER;
}
pEp->dnodePort = atoi(dnodePort->valuestring);
}
dInfo("succcessed to read file %s", tsDnode.file);
dnodePrintDnodes();
PRASE_DNODE_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
if (dnodeIsEpChanged(tsDnode.dnodeId, tsLocalEp)) {
dError("localEp %s different with %s and need reconfigured", tsLocalEp, tsDnode.file);
return -1;
}
dnodeResetDnodes(tsDnode.dnodeEps);
terrno = 0;
return 0;
}
static int32_t dnodeWriteDnodes() {
FILE *fp = fopen(tsDnode.file, "w");
if (!fp) {
dError("failed to write %s since %s", tsDnode.file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 30000;
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsDnode.dnodeId);
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsDnode.clusterId);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsDnode.dropped);
len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
for (int32_t i = 0; i < tsDnode.dnodeEps->dnodeNum; ++i) {
SDnodeEp *ep = &tsDnode.dnodeEps->dnodeEps[i];
len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", ep->dnodeId);
len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", ep->isMnode);
len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn);
len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", ep->dnodePort);
if (i < tsDnode.dnodeEps->dnodeNum - 1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
taosFsyncFile(fileno(fp));
fclose(fp);
free(content);
terrno = 0;
dInfo("successed to write %s", tsDnode.file);
return 0;
}
static void dnodeSendStatusMsg() {
int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SStatusMsg *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) {
dError("failed to malloc status message");
return;
}
pStatus->sversion = htonl(tsVersion);
pStatus->dnodeId = htonl(dnodeGetDnodeId());
pStatus->clusterId = htobe64(dnodeGetClusterId());
pStatus->rebootTime = htonl(tsDnode.rebootTime);
pStatus->numOfCores = htonl(tsNumOfCores);
tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval);
pStatus->clusterCfg.checkTime = 0;
tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, TSDB_TIMEZONE_LEN);
tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);
char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
dnodeGetVnodeLoads(&pStatus->vnodeLoads);
contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad);
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS};
dnodeSendMsgToMnode(&rpcMsg);
}
static void dnodeUpdateCfg(SDnodeCfg *pCfg) {
if (tsDnode.dnodeId == 0) return;
if (tsDnode.dropped) return;
pthread_mutex_lock(&tsDnode.mutex);
tsDnode.dnodeId = pCfg->dnodeId;
tsDnode.clusterId = pCfg->clusterId;
tsDnode.dropped = pCfg->dropped;
dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, pCfg->dnodeId, pCfg->clusterId);
dnodeWriteDnodes();
pthread_mutex_unlock(&tsDnode.mutex);
}
static void dnodeUpdateDnodeEps(SDnodeEps *pEps) {
if (pEps == NULL || pEps->dnodeNum <= 0) return;
pthread_mutex_lock(&tsDnode.mutex);
if (pEps->dnodeNum != tsDnode.dnodeEps->dnodeNum) {
dnodeResetDnodes(pEps);
dnodeWriteDnodes();
} else {
int32_t size = pEps->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps);
if (memcmp(tsDnode.dnodeEps, pEps, size) != 0) {
dnodeResetDnodes(pEps);
dnodeWriteDnodes();
}
}
pthread_mutex_unlock(&tsDnode.mutex);
}
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) return;
SStatusRsp *pStatusRsp = pMsg->pCont;
SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htobe64(pCfg->clusterId);
dnodeUpdateCfg(pCfg);
if (pCfg->dropped) return;
SDnodeEps *pEps = &pStatusRsp->dnodeEps;
pEps->dnodeNum = htonl(pEps->dnodeNum);
for (int32_t i = 0; i < pEps->dnodeNum; ++i) {
pEps->dnodeEps[i].dnodeId = htonl(pEps->dnodeEps[i].dnodeId);
pEps->dnodeEps[i].dnodePort = htons(pEps->dnodeEps[i].dnodePort);
}
dnodeUpdateDnodeEps(pEps);
}
static void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) {
SCfgDnodeMsg *pCfg = pMsg->pCont;
int32_t code = taosCfgDynamicOptions(pCfg->config);
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
static void dnodeProcessStartupReq(SRpcMsg *pMsg) {
dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont);
SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg));
dnodeGetStartup(pStartup);
dInfo("startup msg is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStartup, .contLen = sizeof(SStartupMsg)};
rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont);
}
static void *dnodeThreadRoutine(void *param) {
int32_t ms = tsStatusInterval * 1000;
while (!tsDnode.threadStop) {
if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
continue;
} else {
dnodeSendStatusMsg();
}
taosMsleep(ms);
}
}
int32_t dnodeInitDnode() {
tsDnode.dnodeId = 0;
tsDnode.clusterId = 0;
tsDnode.dnodeEps = NULL;
snprintf(tsDnode.file, sizeof(tsDnode.file), "%s/dnode.json", tsDnodeDir);
tsDnode.rebootTime = taosGetTimestampSec();
tsDnode.dropped = 0;
pthread_mutex_init(&tsDnode.mutex, NULL);
tsDnode.threadStop = false;
tsDnode.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsDnode.dnodeHash == NULL) {
dError("failed to init dnode hash");
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
tsDnode.threadId = taosCreateThread(dnodeThreadRoutine, NULL);
if (tsDnode.threadId == NULL) {
dError("failed to init dnode thread");
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
int32_t code = dnodeReadDnodes();
if (code != 0) {
dError("failed to read file:%s since %s", tsDnode.file, tstrerror(code));
return code;
}
dInfo("dnode-dnode is initialized");
return 0;
}
void dnodeCleanupDnode() {
if (tsDnode.threadId != NULL) {
tsDnode.threadStop = true;
taosDestoryThread(tsDnode.threadId);
tsDnode.threadId = NULL;
}
pthread_mutex_lock(&tsDnode.mutex);
if (tsDnode.dnodeEps != NULL) {
free(tsDnode.dnodeEps);
tsDnode.dnodeEps = NULL;
}
if (tsDnode.dnodeHash) {
taosHashCleanup(tsDnode.dnodeHash);
tsDnode.dnodeHash = NULL;
}
pthread_mutex_unlock(&tsDnode.mutex);
pthread_mutex_destroy(&tsDnode.mutex);
dInfo("dnode-dnode is cleaned up");
}
void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_STATUS_RSP && pEpSet) {
dnodeUpdateMnodeEpSet(pEpSet);
}
switch (msgType) {
case TSDB_MSG_TYPE_NETWORK_TEST:
dnodeProcessStartupReq(pMsg);
break;
case TSDB_MSG_TYPE_CONFIG_DNODE_IN:
dnodeProcessConfigDnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_STATUS_RSP:
dnodeProcessStatusRsp(pMsg);
break;
default:
dError("RPC %p, %s not processed", pMsg->handle, taosMsg[msgType]);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
}
......@@ -14,108 +14,42 @@
*/
#define _DEFAULT_SOURCE
#include "dnodeCheck.h"
#include "dnodeEps.h"
#include "dnodeMsg.h"
#include "dnodeTrans.h"
#include "mnode.h"
#include "dnodeDnode.h"
#include "dnodeMnode.h"
#include "dnodeTransport.h"
#include "dnodeVnodes.h"
#include "sync.h"
#include "tcache.h"
#include "tconfig.h"
#include "tnote.h"
#include "tstep.h"
#include "vnode.h"
#include "wal.h"
static struct {
EDnStat runStatus;
SStartupStep startup;
SStartupMsg startup;
EDnStat runStat;
SSteps *steps;
} tsDnode;
} tsInt;
EDnStat dnodeGetRunStat() { return tsDnode.runStatus; }
EDnStat dnodeGetRunStat() { return tsInt.runStat; }
void dnodeSetRunStat(EDnStat stat) { tsDnode.runStatus = stat; }
void dnodeSetRunStat(EDnStat stat) { tsInt.runStat = stat; }
void dnodeReportStartup(char *name, char *desc) {
SStartupStep *startup = &tsDnode.startup;
tstrncpy(startup->name, name, strlen(startup->name));
tstrncpy(startup->desc, desc, strlen(startup->desc));
startup->finished = 0;
SStartupMsg *pStartup = &tsInt.startup;
tstrncpy(pStartup->name, name, strlen(pStartup->name));
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
pStartup->finished = 0;
}
static void dnodeReportStartupFinished(char *name, char *desc) {
SStartupStep *startup = &tsDnode.startup;
tstrncpy(startup->name, name, strlen(startup->name));
tstrncpy(startup->desc, desc, strlen(startup->desc));
startup->finished = 1;
void dnodeReportStartupFinished(char *name, char *desc) {
SStartupMsg *pStartup = &tsInt.startup;
tstrncpy(pStartup->name, name, strlen(pStartup->name));
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
pStartup->finished = 1;
}
void dnodeGetStartup(SStartupStep *pStep) { memcpy(pStep, &tsDnode.startup, sizeof(SStartupStep)); }
static int32_t dnodeInitVnode() {
SVnodePara para;
para.fp.GetDnodeEp = dnodeGetEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.fp.ReportStartup = dnodeReportStartup;
return vnodeInit(para);
}
static int32_t dnodeInitMnode() {
SMnodePara para;
para.fp.GetDnodeEp = dnodeGetEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.fp.SendRedirectMsg = dnodeSendRedirectMsg;
para.dnodeId = dnodeGetDnodeId();
para.clusterId = dnodeGetClusterId();
return mnodeInit(para);
}
static int32_t dnodeInitTfs() {}
static int32_t dnodeInitMain() {
tsDnode.runStatus = DN_RUN_STAT_STOPPED;
tscEmbedded = 1;
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
taosInitGlobalCfg();
taosReadGlobalLogCfg();
taosSetCoreDump(tsEnableCoreFile);
if (!taosMkDir(tsLogDir)) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
return -1;
}
char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/taosdlog", tsLogDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
printf("failed to init log file\n");
}
if (!taosReadGlobalCfg()) {
taosPrintGlobalCfg();
dError("TDengine read global config failed");
return -1;
}
dInfo("start to initialize TDengine");
taosInitNotes();
return taosCheckGlobalCfg();
}
static void dnodeCleanupMain() {
taos_cleanup();
taosCloseLog();
taosStopCacheRefreshWorker();
}
void dnodeGetStartup(SStartupMsg *pStartup) { memcpy(pStartup, &tsInt.startup, sizeof(SStartupMsg)); }
static int32_t dnodeCheckRunning(char *dir) {
char filepath[256] = {0};
......@@ -164,27 +98,68 @@ static int32_t dnodeInitDir() {
return 0;
}
static void dnodeCleanupDir() {}
static int32_t dnodeInitMain() {
tsInt.runStat = DN_RUN_STAT_STOPPED;
tscEmbedded = 1;
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
taosInitGlobalCfg();
taosReadGlobalLogCfg();
taosSetCoreDump(tsEnableCoreFile);
if (!taosMkDir(tsLogDir)) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
return -1;
}
char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/taosdlog", tsLogDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
printf("failed to init log file\n");
}
if (!taosReadGlobalCfg()) {
taosPrintGlobalCfg();
dError("TDengine read global config failed");
return -1;
}
dInfo("start to initialize TDengine");
taosInitNotes();
if (taosCheckGlobalCfg() != 0) {
return -1;
}
dnodeInitDir();
return -1;
}
static void dnodeCleanupMain() {
taos_cleanup();
taosCloseLog();
taosStopCacheRefreshWorker();
}
int32_t dnodeInit() {
SSteps *steps = taosStepInit(24, dnodeReportStartup);
if (steps == NULL) return -1;
taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain);
taosStepAdd(steps, "dnode-dir", dnodeInitDir, dnodeCleanupDir);
taosStepAdd(steps, "dnode-check", dnodeInitCheck, dnodeCleanupCheck);
taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup);
taosStepAdd(steps, "dnode-tfs", dnodeInitTfs, NULL);
taosStepAdd(steps, "dnode-tfs", NULL, NULL);
taosStepAdd(steps, "dnode-wal", walInit, walCleanUp);
taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp);
taosStepAdd(steps, "dnode-eps", dnodeInitEps, dnodeCleanupEps);
taosStepAdd(steps, "dnode-vnode", dnodeInitVnode, vnodeCleanup);
taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, mnodeCleanup);
taosStepAdd(steps, "dnode-dnode", dnodeInitDnode, dnodeCleanupDnode);
taosStepAdd(steps, "dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes);
taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, dnodeCleanupMnode);
taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans);
taosStepAdd(steps, "dnode-msg", dnodeInitMsg, dnodeCleanupMsg);
tsDnode.steps = steps;
taosStepExec(tsDnode.steps);
tsInt.steps = steps;
taosStepExec(tsInt.steps);
dnodeSetRunStat(DN_RUN_STAT_RUNNING);
dnodeReportStartupFinished("TDengine", "initialized successfully");
......@@ -196,7 +171,7 @@ int32_t dnodeInit() {
void dnodeCleanup() {
if (dnodeGetRunStat() != DN_RUN_STAT_STOPPED) {
dnodeSetRunStat(DN_RUN_STAT_STOPPED);
taosStepCleanup(tsDnode.steps);
tsDnode.steps = NULL;
taosStepCleanup(tsInt.steps);
tsInt.steps = NULL;
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeMnode.h"
#include "dnodeDnode.h"
#include "dnodeTransport.h"
#include "cJSON.h"
#include "mnode.h"
static struct {
int8_t deployed;
int8_t dropped;
char file[PATH_MAX + 20];
pthread_mutex_t mutex;
} tsMnode = {0};
static int32_t dnodeReadMnode() {
int32_t len = 0;
int32_t maxLen = 300;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
FILE *fp = NULL;
fp = fopen(tsMnode.file, "r");
if (!fp) {
dDebug("file %s not exist", tsMnode.file);
goto PRASE_MNODE_OVER;
}
len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s since content is null", tsMnode.file);
goto PRASE_MNODE_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s since invalid json format", tsMnode.file);
goto PRASE_MNODE_OVER;
}
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
if (!deployed || deployed->type != cJSON_String) {
dError("failed to read %s since deployed not found", tsMnode.file);
goto PRASE_MNODE_OVER;
}
tsMnode.deployed = atoi(deployed->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_String) {
dError("failed to read %s since dropped not found", tsMnode.file);
goto PRASE_MNODE_OVER;
}
tsMnode.dropped = atoi(dropped->valuestring);
dInfo("succcessed to read file %s", tsMnode.file);
PRASE_MNODE_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
return 0;
}
static int32_t dnodeWriteMnode() {
FILE *fp = fopen(tsMnode.file, "w");
if (!fp) {
dError("failed to write %s since %s", tsMnode.file, strerror(errno));
return -1;
}
int32_t len = 0;
int32_t maxLen = 300;
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"deployed\": \"%d\",\n", tsMnode.dropped);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsMnode.dropped);
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
taosFsyncFile(fileno(fp));
fclose(fp);
free(content);
terrno = 0;
dInfo("successed to write %s", tsMnode.file);
return 0;
}
static int32_t dnodeStartMnode(SCreateMnodeMsg *pCfg) {
int32_t code = 0;
if (pCfg->dnodeId != dnodeGetDnodeId()) {
code = TSDB_CODE_DND_DNODE_ID_NOT_MATCHED;
dError("failed to start mnode since %s", tstrerror(code));
return code;
}
if (tsMnode.dropped) {
code = TSDB_CODE_DND_MNODE_ALREADY_DROPPED;
dError("failed to start mnode since %s", tstrerror(code));
return code;
}
if (tsMnode.deployed) {
dError("failed to start mnode since its already deployed");
return 0;
}
tsMnode.deployed = 1;
tsMnode.dropped = 0;
code = dnodeWriteMnode();
if (code != 0) {
tsMnode.deployed = 0;
dError("failed to start mnode since %s", tstrerror(code));
return code;
}
code = mnodeDeploy();
if (code != 0) {
tsMnode.deployed = 0;
dError("failed to start mnode since %s", tstrerror(code));
return code;
}
code = mnodeStart();
if (code != 0) {
tsMnode.deployed = 0;
dError("failed to start mnode since %s", tstrerror(code));
return code;
}
tsMnode.deployed = 1;
return 0;
}
static int32_t dnodeDropMnode(SDropMnodeMsg *pCfg) {
int32_t code = 0;
if (pCfg->dnodeId != dnodeGetDnodeId()) {
code = TSDB_CODE_DND_DNODE_ID_NOT_MATCHED;
dError("failed to drop mnode since %s", tstrerror(code));
return code;
}
if (tsMnode.dropped) {
code = TSDB_CODE_DND_MNODE_ALREADY_DROPPED;
dError("failed to drop mnode since %s", tstrerror(code));
return code;
}
if (!tsMnode.deployed) {
dError("failed to drop mnode since not deployed");
return 0;
}
mnodeStop();
tsMnode.deployed = 0;
tsMnode.dropped = 1;
code = dnodeWriteMnode();
if (code != 0) {
tsMnode.deployed = 1;
tsMnode.dropped = 0;
dError("failed to drop mnode since %s", tstrerror(code));
return code;
}
mnodeUnDeploy();
tsMnode.deployed = 0;
return 0;
}
static void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) {
SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
int32_t code = dnodeStartMnode(pCfg);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = code};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
static void dnodeProcessDropMnodeReq(SRpcMsg *pMsg) {
SDropMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
int32_t code = dnodeDropMnode(pCfg);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = code};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
static bool dnodeNeedDeployMnode() {
if (dnodeGetDnodeId() > 0) return false;
if (dnodeGetClusterId() > 0) return false;
if (strcmp(tsFirst, tsLocalEp) != 0) return false;
return true;
}
int32_t dnodeInitMnode() {
tsMnode.dropped = 0;
tsMnode.deployed = 0;
snprintf(tsMnode.file, sizeof(tsMnode.file), "%s/mnode.json", tsDnodeDir);
SMnodePara para;
para.fp.GetDnodeEp = dnodeGetDnodeEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.fp.SendRedirectMsg = dnodeSendRedirectMsg;
para.dnodeId = dnodeGetDnodeId();
para.clusterId = dnodeGetClusterId();
int32_t code = mnodeInit(para);
if (code != 0) {
dError("failed to init mnode module since %s", tstrerror(code));
return code;
}
code = dnodeReadMnode();
if (code != 0) {
dError("failed to read file:%s since %s", tsMnode.file, tstrerror(code));
return code;
}
if (tsMnode.dropped) {
dError("mnode already dropped, undeploy it");
mnodeUnDeploy();
return 0;
}
if (!tsMnode.deployed) {
bool needDeploy = dnodeNeedDeployMnode();
if (needDeploy) {
code = mnodeDeploy();
} else {
return 0;
}
if (code != 0) {
dError("failed to deploy mnode since %s", tstrerror(code));
return code;
}
tsMnode.deployed = 1;
}
return mnodeStart();
}
void dnodeCleanupMnode() {
if (tsMnode.deployed) {
mnodeStop();
}
mnodeCleanup();
}
void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
switch (pMsg->msgType) {
case TSDB_MSG_TYPE_CREATE_MNODE_IN:
dnodeProcessCreateMnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_DROP_MNODE_IN:
dnodeProcessDropMnodeReq(pMsg);
break;
default:
mnodeProcessMsg(pMsg);
}
}
int32_t dnodeGetUserAuthFromMnode(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
return mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
}
\ No newline at end of file
......@@ -20,30 +20,116 @@
*/
#define _DEFAULT_SOURCE
#include "dnodeTrans.h"
#include "dnodeEps.h"
#include "dnodeMsg.h"
#include "mnode.h"
#include "vnode.h"
typedef void (*RpcMsgFp)(SRpcMsg *pMsg);
#include "dnodeTransport.h"
#include "dnodeDnode.h"
#include "dnodeMnode.h"
#include "dnodeVnodes.h"
static struct {
void *serverRpc;
void *clientRpc;
void *peerRpc;
void *shellRpc;
int32_t queryReqNum;
int32_t submitReqNum;
RpcMsgFp peerMsgFp[TSDB_MSG_TYPE_MAX];
RpcMsgFp shellMsgFp[TSDB_MSG_TYPE_MAX];
void *clientRpc;
MsgFp msgFp[TSDB_MSG_TYPE_MAX];
} tsTrans;
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
static void dnodeInitMsgFp() {
// msg from client to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = dnodeProcessVnodeFetchMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dnodeProcessVnodeQueryMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = dnodeProcessVnodeWriteMsg;
// msg from client to mnode
tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_USER] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_USER] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_USER] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DB] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DB] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_USE_DB] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_DB] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_DB] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dnodeProcessMnodeMsg;
// message from client to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessDnodeMsg;
// message from mnode to vnode
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessVnodeWriteMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dnodeProcessMnodeMsg;
// message from mnode to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dnodeProcessVnodeMgmtMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = dnodeProcessDnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = dnodeProcessMnodeMsg;
// message from dnode to mnode
tsTrans.msgFp[TSDB_MSG_TYPE_AUTH] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dnodeProcessDnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_GRANT] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_GRANT_RSP] = dnodeProcessDnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_STATUS] = dnodeProcessMnodeMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessDnodeMsg;
}
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg rspMsg = {.handle = pMsg->handle};
int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
dnodeProcessStartupReq(pMsg);
dnodeProcessDnodeMsg(pMsg, pEpSet);
return;
}
......@@ -61,10 +147,10 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
return;
}
RpcMsgFp fp = tsTrans.peerMsgFp[msgType];
MsgFp fp = tsTrans.msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg);
(*fp)(pMsg, pEpSet);
} else {
dError("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[msgType]);
rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
......@@ -73,36 +159,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}
}
static int32_t dnodeInitServer() {
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeReq;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeReq;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
static int32_t dnodeInitPeerServer() {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeDnodePort;
......@@ -113,8 +170,8 @@ static int32_t dnodeInitServer() {
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
tsTrans.serverRpc = rpcOpen(&rpcInit);
if (tsTrans.serverRpc == NULL) {
tsTrans.peerRpc = rpcOpen(&rpcInit);
if (tsTrans.peerRpc == NULL) {
dError("failed to init peer rpc server");
return -1;
}
......@@ -123,15 +180,15 @@ static int32_t dnodeInitServer() {
return 0;
}
static void dnodeCleanupServer() {
if (tsTrans.serverRpc) {
rpcClose(tsTrans.serverRpc);
tsTrans.serverRpc = NULL;
static void dnodeCleanupPeerServer() {
if (tsTrans.peerRpc) {
rpcClose(tsTrans.peerRpc);
tsTrans.peerRpc = NULL;
dInfo("dnode peer server is closed");
}
}
static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SEpSet *pEpSet) {
int32_t msgType = pMsg->msgType;
if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
......@@ -141,14 +198,10 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
return;
}
if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
dnodeUpdateMnodeEps(pEpSet);
}
RpcMsgFp fp = tsTrans.peerMsgFp[msgType];
MsgFp fp = tsTrans.msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg);
dTrace("RPC %p, peer rsp:%s will be processed, code:%s", pMsg->handle, taosMsg[msgType], tstrerror(pMsg->code));
(*fp)(pMsg, pEpSet);
} else {
dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
}
......@@ -157,29 +210,8 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}
static int32_t dnodeInitClient() {
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
char secret[TSDB_KEY_LEN] = "secret";
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C";
......@@ -210,8 +242,8 @@ static void dnodeCleanupClient() {
}
}
static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
static void dnodeProcessShellReq(SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg rspMsg = {.handle = pMsg->handle};
int32_t msgType = pMsg->msgType;
if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
......@@ -234,17 +266,10 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
return;
}
if (msgType == TSDB_MSG_TYPE_QUERY) {
atomic_fetch_add_32(&tsTrans.queryReqNum, 1);
} else if (msgType == TSDB_MSG_TYPE_SUBMIT) {
atomic_fetch_add_32(&tsTrans.submitReqNum, 1);
} else {
}
RpcMsgFp fp = tsTrans.shellMsgFp[msgType];
MsgFp fp = tsTrans.msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg);
(*fp)(pMsg, pEpSet);
} else {
dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[msgType]);
rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
......@@ -253,41 +278,29 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}
}
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); }
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(&epSet);
dnodeSendMsgToDnode(&epSet, rpcMsg);
}
static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(&epSet);
SEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet);
rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp);
}
static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
int32_t code = dnodeGetUserAuthFromMnode(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_APP_NOT_READY) return code;
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, sizeof(pMsg->user));
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pMsg;
rpcMsg.contLen = sizeof(SAuthMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH;
dDebug("user:%s, send auth msg to mnodes", user);
SRpcMsg rpcMsg = {.pCont = pMsg, .contLen = sizeof(SAuthMsg), .msgType = TSDB_MSG_TYPE_AUTH};
SRpcMsg rpcRsp = {0};
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
} else {
SAuthRsp *pRsp = rpcRsp.pCont;
dDebug("user:%s, auth msg received from mnodes", user);
SAuthRsp *pRsp = rpcRsp.pCont;
memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
*spi = pRsp->spi;
......@@ -298,55 +311,7 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c
return rpcRsp.code;
}
static int32_t dnodeInitShell() {
tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
// the following message shall be treated as mnode write
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessMsg;
// the following message shall be treated as mnode query
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq;
static int32_t dnodeInitShellServer() {
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) {
numOfThreads = 1;
......@@ -373,7 +338,7 @@ static int32_t dnodeInitShell() {
return 0;
}
static void dnodeCleanupShell() {
static void dnodeCleanupShellServer() {
if (tsTrans.shellRpc) {
rpcClose(tsTrans.shellRpc);
tsTrans.shellRpc = NULL;
......@@ -385,11 +350,11 @@ int32_t dnodeInitTrans() {
return -1;
}
if (dnodeInitServer() != 0) {
if (dnodeInitPeerServer() != 0) {
return -1;
}
if (dnodeInitShell() != 0) {
if (dnodeInitShellServer() != 0) {
return -1;
}
......@@ -397,7 +362,15 @@ int32_t dnodeInitTrans() {
}
void dnodeCleanupTrans() {
dnodeCleanupShell();
dnodeCleanupServer();
dnodeCleanupShellServer();
dnodeCleanupPeerServer();
dnodeCleanupClient();
}
void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); }
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
SEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet);
dnodeSendMsgToDnode(&epSet, rpcMsg);
}
\ No newline at end of file
此差异已折叠。
......@@ -120,7 +120,7 @@ typedef struct SDnodeObj {
int64_t createdTime;
int64_t updateTime;
int64_t lastAccess;
int64_t lastReboot; // time stamp for last reboot
int64_t rebootTime; // time stamp for last reboot
char fqdn[TSDB_FQDN_LEN];
char ep[TSDB_EP_LEN];
uint16_t port;
......@@ -208,7 +208,7 @@ typedef struct {
typedef struct SDbObj {
SdbHead head;
char name[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
char name[TSDB_FULL_DB_NAME_LEN];
char acct[TSDB_USER_LEN];
int64_t createdTime;
int64_t updateTime;
......@@ -236,7 +236,7 @@ typedef struct SVgObj {
int64_t updateTime;
int32_t lbDnodeId;
int32_t lbTime;
char dbName[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
char dbName[TSDB_FULL_DB_NAME_LEN];
int8_t inUse;
int8_t accessState;
int8_t status;
......@@ -288,7 +288,7 @@ typedef struct {
void *pIter;
void *pVgIter;
void **ppShow;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
char db[TSDB_FULL_DB_NAME_LEN];
int16_t offset[TSDB_MAX_COLUMNS];
int32_t bytes[TSDB_MAX_COLUMNS];
char payload[];
......
......@@ -22,12 +22,14 @@
extern "C" {
#endif
typedef enum { MN_STATUS_UNINIT = 0, MN_STATUS_INIT = 1, MN_STATUS_READY = 2, MN_STATUS_CLOSING = 3 } EMnStatus;
tmr_h mnodeGetTimer();
int32_t mnodeGetDnodeId();
int64_t mnodeGetClusterId();
EMnStatus mnodeGetStatus();
void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
void mnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
......
......@@ -24,8 +24,8 @@ extern "C" {
int32_t mnodeInitMnode();
void mnodeCleanupMnode();
void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect);
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect);
void mnodeGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect);
void mnodeGetMnodeEpSetForShell(SEpSet *epSet, bool redirect);
#ifdef __cplusplus
}
......
......@@ -20,5 +20,5 @@
int32_t mnodeInitMnode() { return 0; }
void mnodeCleanupMnode() {}
void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect) {}
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect) {}
\ No newline at end of file
void mnodeGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect) {}
void mnodeGetMnodeEpSetForShell(SEpSet *epSet, bool redirect) {}
\ No newline at end of file
......@@ -54,7 +54,7 @@ int64_t mnodeGetClusterId() { return tsMint.clusterId; }
EMnStatus mnodeGetStatus() { return tsMint.state; }
void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) {
void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg) {
(*tsMint.fp.SendMsgToDnode)(epSet, rpcMsg);
}
......@@ -250,3 +250,6 @@ void mnodeCleanup() {
mInfo("mnode is cleaned up");
}
}
int32_t mnodeStart() { return 0; }
void mnodeStop() {}
\ No newline at end of file
add_subdirectory(meta)
add_subdirectory(tq)
add_subdirectory(tsdb)
add_subdirectory(impl)
\ No newline at end of file
add_subdirectory(meta)
add_subdirectory(tq)
add_subdirectory(tsdb)
aux_source_directory(src VNODE_SRC)
add_library(vnode STATIC ${VNODE_SRC})
target_include_directories(
......
......@@ -16,6 +16,8 @@
#ifndef _TD_VNODE_COMMIT_H_
#define _TD_VNODE_COMMIT_H_
#include "vnodeInt.h"
#ifdef __cplusplus
extern "C" {
#endif
......
......@@ -16,19 +16,14 @@
#ifndef _TD_VNODE_INT_H_
#define _TD_VNODE_INT_H_
#include "os.h"
#include "vnode.h"
#include "amalloc.h"
#include "meta.h"
#include "sync.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tlog.h"
#include "tq.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "tworker.h"
#include "vnode.h"
#include "wal.h"
#ifdef __cplusplus
......@@ -44,71 +39,16 @@ extern int32_t vDebugFlag;
#define vDebug(...) { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
#define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
typedef struct STsdbCfg {
int32_t cacheBlockSize; // MB
int32_t totalBlocks;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
uint8_t precision; // time resolution
int8_t compression;
int8_t cacheLastRow;
int8_t update;
} STsdbCfg;
typedef struct SMetaCfg {
} SMetaCfg;
typedef struct SVnodeCfg {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int8_t dropped;
int8_t quorum;
SWalCfg wal;
STsdbCfg tsdb;
SMetaCfg meta;
SSyncCluster sync;
} SVnodeCfg;
typedef struct {
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
SMemAllocator *allocator;
typedef struct SVnode {
int32_t vgId;
SVnodeCfg cfg;
SMeta *pMeta;
STsdb *pTsdb;
STQ *pTQ;
SWal *pWal;
void *pQuery;
SSyncNode *pSync;
taos_queue pWriteQ; // write queue
taos_queue pQueryQ; // read query queue
taos_queue pFetchQ; // read fetch/cancel queue
SVnodeCfg cfg;
SSyncServerState term;
int64_t queuedWMsgSize;
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t numOfQHandle; // current initialized and existed query handle in current dnode
int8_t role;
int8_t accessState;
int8_t dropped;
int8_t status;
pthread_mutex_t statusMutex;
} SVnode;
typedef struct {
int32_t len;
void *rsp;
void *qhandle; // used by query and retrieve msg
} SVnRsp;
void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
void vnodeReportStartup(char *name, char *desc);
#ifdef __cplusplus
}
#endif
......
......@@ -16,7 +16,7 @@
#ifndef _TD_VNODE_MEM_ALLOCATOR_H_
#define _TD_VNODE_MEM_ALLOCATOR_H_
#include "os.h"
#include "vnodeInt.h"
#ifdef __cplusplus
extern "C" {
......
......@@ -13,19 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_CHECK_H_
#define _TD_DNODE_CHECK_H_
#ifndef _TD_VNODE_READ_H_
#define _TD_VNODE_READ_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
#include "vnodeInt.h"
int32_t dnodeInitCheck();
void dnodeCleanupCheck();
void vnodeProcessReadMsg(SVnode *pVnode, SVnodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_CHECK_H_*/
#endif /*_TD_VNODE_READ_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_WRITE_H_
#define _TD_VNODE_WRITE_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
void vnodeProcessWriteMsg(SVnode* pVnode, SVnodeMsg* pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_WRITE_H_*/
......@@ -19,6 +19,7 @@ static int vnodeStartCommit(SVnode *pVnode);
static int vnodeEndCommit(SVnode *pVnode);
int vnodeAsyncCommit(SVnode *pVnode) {
#if 0
if (vnodeStartCommit(pVnode) < 0) {
// TODO
}
......@@ -39,6 +40,7 @@ int vnodeAsyncCommit(SVnode *pVnode) {
// TODO
}
#endif
return 0;
}
......
......@@ -14,47 +14,59 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "httpInt.h"
#include "httpResp.h"
#include "httpContext.h"
#include "httpHandle.h"
bool httpDecodeRequest(HttpContext* pContext) {
if (pContext->decodeMethod->fpDecode == NULL) {
return false;
}
#include "vnodeInt.h"
#include "tqueue.h"
return (*pContext->decodeMethod->fpDecode)(pContext);
}
int32_t vnodeInit(SVnodePara para) { return 0; }
void vnodeCleanup() {}
/**
* Process the request from http pServer
*/
bool httpProcessData(HttpContext* pContext) {
if (!httpAlterContextState(pContext, HTTP_CONTEXT_STATE_READY, HTTP_CONTEXT_STATE_HANDLING)) {
httpTrace("context:%p, fd:%d, state:%s not in ready state, stop process request", pContext, pContext->fd,
httpContextStateStr(pContext->state));
pContext->error = true;
httpCloseContextByApp(pContext);
return false;
}
SVnode *vnodeOpen(int32_t vgId, const char *path) { return NULL; }
void vnodeClose(SVnode *pVnode) {}
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; }
void vnodeDrop(SVnode *pVnode) {}
int32_t vnodeCompact(SVnode *pVnode) { return 0; }
int32_t vnodeSync(SVnode *pVnode) { return 0; }
// handle Cross-domain request
if (strcmp(pContext->parser->method, "OPTIONS") == 0) {
httpTrace("context:%p, fd:%d, process options request", pContext, pContext->fd);
httpSendOptionResp(pContext, "process options request success");
} else {
if (!httpDecodeRequest(pContext)) {
/*
* httpCloseContextByApp has been called when parsing the error
*/
// httpCloseContextByApp(pContext);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; }
SVnodeMsg *vnodeInitMsg(int32_t msgNum) {
SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg));
if (pMsg == NULL) {
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
return NULL;
} else {
httpClearParser(pContext->parser);
httpProcessRequest(pContext);
pMsg->allocNum = msgNum;
return pMsg;
}
}
int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) {
if (pMsg->curNum >= pMsg->allocNum) {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
return true;
pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg;
}
void vnodeCleanupMsg(SVnodeMsg *pMsg) {
for (int32_t i = 0; i < pMsg->curNum; ++i) {
rpcFreeCont(pMsg->rpcMsg[i].pCont);
}
taosFreeQitem(pMsg);
}
void vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg, EVnMsgType msgType) {
switch (msgType) {
case VN_MSG_TYPE_WRITE:
break;
case VN_MSG_TYPE_APPLY:
break;
case VN_MSG_TYPE_SYNC:
break;
case VN_MSG_TYPE_QUERY:
break;
case VN_MSG_TYPE_FETCH:
break;
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "vnodeRead.h"
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "vnodeWrite.h"
......@@ -6,4 +6,4 @@ target_sources(VMATest
"vnodeMemAllocatorTest.cpp"
)
target_include_directories(VMATest PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../inc")
target_link_libraries(VMATest os gtest_main)
\ No newline at end of file
target_link_libraries(VMATest os gtest_main vnode)
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册