提交 0214eda6 编写于 作者: L lichuang

Merge branch '3.0' into feature/sync-implementation

# Small Materialized Aggragates
**SMA** (**S**mall **M**aterialized **A**ggrates) is used to speed up the query process on materialized data cube in TDengine. TDengine 3.0 gives more flexibility on the SMA configurations.
There are two kinds of SMA in TDengine:
1. Block-wise SMA
2. Time-range-wise SMA
<!--
```plantuml
@startmindmap mind_map_test
* SMA
** Block-wise SMA
** Time-range-wise SMA
@endmindmap
``` -->
![SMA in TDengine 3.0](http://www.plantuml.com/plantuml/png/Kr1GK70eBaaiAidDp4l9JInG0D7nG4PyIMfn2HTGMa5B8TZN4SBIKd3AoK_ErYtFB4v55Wt9p4tLBKhCIqz5bN981HeACHW0)
## Block-wise SMA
Block-wise SMA is created by default when the data are committed. Since time-series data are saved as block data in files, a corresponding SMA is create when the data block is written. The default block-wise SMA includes:
1. sum(*)
2. max(*)
3. min(*)
By default, the system will create SMA for each column except those columns with type *binary* and *nchar*. However, users can change the behavior by the keyword **NOSMA** to disable the SMA for a certain column like below:
```SQL
# create a super table with the SMA on column b disabled
create table st (ts timestamp, a int, b int NOSMA, c double) tags (tg1 binary(10), tg2 int);
```
## Time-range-wise SMA
In addition to the default block-wise SMA, users can create their own SMAs ondemand. Below is an example to create a SMA.
```SQL
# create a SMA every 10 minutes with SMA of sum, max and min
create sma_indx sma_10min on st (sum(*), max(*), min(*), twa(*)) interval(10m);
```
Users can also drop a time-range-wise SMA like below:
```SQL
# drop the sma index
drop sma_index sma_5min on st;
```
**NOTE: Creating an SMA index is a heavy operation which may take a long time and block the write operation. So create the time-range-wise SMA when creating the table or when there are not too much data.**
\ No newline at end of file
......@@ -40,87 +40,112 @@ enum {
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_TABLE, "create-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_TABLE, "drop-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_TABLE, "alter-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLE_META, "table-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLES_META, "tables-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
// message from mnode to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_TABLE, "create-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_TABLE, "drop-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_TABLE, "alter-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_VNODE, "create-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_VNODE, "drop-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_SYNC_VNODE, "sync-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
// message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_ACCT, "create-acct" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_ACCT, "alter-acct" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_ACCT, "drop-acct" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_USER, "create-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_USER, "alter-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_USER, "drop-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DNODE, "create-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DNODE, "drop-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DB, "create-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_FUNCTION, "create-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DB, "drop-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_FUNCTION, "drop-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_DB, "use-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_DB, "alter-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SYNC_DB, "sync-db-replica" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TABLE, "create-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLE_META, "table-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "multiTable-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SHOW, "show" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE, "retrieve" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_QUERY, "kill-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_STREAM, "kill-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_CONN, "kill-conn" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONFIG_DNODE, "cm-config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_HEARTBEAT, "heartbeat" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE_FUNC, "retrieve-func" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" )
// message from mnode to dnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY15, "dummy15" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY16, "dummy16" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY17, "dummy17" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY18, "dummy18" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY19, "dummy19" )
// message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_ACCT, "alter-acct" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_ACCT, "drop-acct" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_USER, "create-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_USER, "alter-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_USER, "drop-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DNODE, "create-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DNODE, "drop-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DB, "create-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DB, "drop-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_USE_DB, "use-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_DB, "alter-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_DB, "sync-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_TOPIC, "create-topic" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_TOPIC, "drop-topic" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_TOPIC, "alter-topic" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_FUNCTION, "create-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_FUNCTION, "alter-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE, "create-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_HEARTBEAT, "heartbeat" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE, "retrieve" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC, "retrieve-func" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY20, "dummy20" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY21, "dummy21" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY22, "dummy22" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY23, "dummy23" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY24, "dummy24" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY25, "dummy25" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY26, "dummy26" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY27, "dummy27" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY28, "dummy28" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY29, "dummy29" )
// message from dnode to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_TABLE, "config-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_VNODE, "config-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_STATUS, "status" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_GRANT, "grant" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_AUTH, "auth" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
// message for topic
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" )
//TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY30, "dummy30" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY31, "dummy31" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY32, "dummy32" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY33, "dummy33" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY34, "dummy34" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY35, "dummy35" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY36, "dummy36" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY37, "dummy37" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY38, "dummy38" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY39, "dummy39" )
#ifndef TAOS_MESSAGE_C
TSDB_MSG_TYPE_MAX // 147
......
......@@ -16,93 +16,111 @@
#ifndef _TD_VNODE_H_
#define _TD_VNODE_H_
#include "os.h"
#include "taosmsg.h"
#include "trpc.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SVnode SVnode;
typedef struct {
char dbName[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int32_t cacheBlockSize; // MB
int32_t totalBlocks;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
int8_t precision; // time resolution
int8_t compression;
int8_t cacheLastRow;
int8_t update;
int8_t quorum;
int8_t replica;
int8_t walLevel;
int32_t fsyncPeriod; // millisecond
SVnodeDesc replicas[TSDB_MAX_REPLICA];
} SVnodeCfg;
typedef struct {
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
int64_t tablesNum;
} SVnodeStatisic;
typedef struct {
/**
* Send messages to other dnodes, such as create vnode message.
*
* @param epSet, the endpoint list of dnodes.
* @param rpcMsg, message to be sent.
*/
void (*SendMsgToDnode)(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
/**
* Send messages to mnode, such as config message.
*
* @param rpcMsg, message to be sent.
*/
void (*SendMsgToMnode)(struct SRpcMsg *rpcMsg);
/**
* Get the corresponding endpoint information from dnodeId.
*
* @param dnodeId, the id ot dnode.
* @param ep, the endpoint of dnode.
* @param fqdn, the fqdn of dnode.
* @param port, the port of dnode.
*/
void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
/**
* Report the startup progress.
*/
void (*ReportStartup)(char *name, char *desc);
} SVnodeFp;
int8_t syncRole;
} SVnodeStatus;
typedef struct {
SVnodeFp fp;
} SVnodePara;
int32_t accessState;
} SVnodeAccess;
typedef struct SVnodeMsg {
int32_t msgType;
int32_t code;
SRpcMsg rpcMsg; // original message from rpc
int32_t contLen;
char pCont[];
} SVnodeMsg;
/**
* Start initialize vnode module.
*
* @param para, initialization parameters.
* @return Error code.
*/
int32_t vnodeInit(SVnodePara para);
int32_t vnodeInit();
/**
* Cleanup vnode module.
*/
void vnodeCleanup();
typedef struct {
int32_t unused;
} SVnodeStat;
/**
* Get the statistical information of vnode.
*
* @param stat, statistical information.
* @param pVnode,
* @param pStat, statistical information.
* @return Error Code.
*/
int32_t vnodeGetStatistics(SVnodeStat *stat);
int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat);
/**
* Get the status of all vnodes.
*
* @param status, status msg.
* @param pVnode,
* @param status, status information.
* @return Error Code.
*/
void vnodeGetStatus(struct SStatusMsg *status);
int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus);
/**
* Set access permissions for all vnodes.
* Operation functions of vnode
*
* @param access, access permissions of vnodes.
* @param numOfVnodes, the size of vnodes.
* @return Error Code.
*/
void vnodeSetAccess(struct SVgroupAccess *access, int32_t numOfVnodes);
SVnode *vnodeOpen(int32_t vgId, const char *path);
void vnodeClose(SVnode *pVnode);
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg);
int32_t vnodeDrop(SVnode *pVnode);
int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode);
/**
* Interface for processing messages.
*
* @param msg, message to be processed.
* @param pVnode,
* @param pMsg, message to be processed.
*
*/
void vnodeProcessMsg(SRpcMsg *msg);
int32_t vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -37,21 +37,24 @@ shall be used to set up the protection.
*/
typedef void* taos_queue;
typedef void* taos_qset;
typedef void* taos_qall;
typedef void *taos_queue;
typedef void *taos_qset;
typedef void *taos_qall;
typedef void *(*FProcessItem)(void *pItem, void *ahandle);
typedef void *(*FProcessItems)(taos_qall qall, int numOfItems, void *ahandle);
taos_queue taosOpenQueue();
void taosCloseQueue(taos_queue);
void taosSetQueueFp(taos_queue, FProcessItem, FProcessItems);
void *taosAllocateQitem(int size);
void taosFreeQitem(void *item);
int taosWriteQitem(taos_queue, int type, void *item);
int taosReadQitem(taos_queue, int *type, void **pitem);
void taosFreeQitem(void *pItem);
int taosWriteQitem(taos_queue, void *pItem);
int taosReadQitem(taos_queue, void **pItem);
taos_qall taosAllocateQall();
void taosFreeQall(taos_qall);
int taosReadAllQitems(taos_queue, taos_qall);
int taosGetQitem(taos_qall, int *type, void **pitem);
int taosGetQitem(taos_qall, void **pItem);
void taosResetQitems(taos_qall);
taos_qset taosOpenQset();
......@@ -61,8 +64,8 @@ int taosAddIntoQset(taos_qset, taos_queue, void *ahandle);
void taosRemoveFromQset(taos_qset, taos_queue);
int taosGetQueueNumber(taos_qset);
int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle);
int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **handle);
int taosReadQitemFromQset(taos_qset, void **pItem, void **ahandle, FProcessItem *);
int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **ahandle, FProcessItems *);
int taosGetQueueItemsNumber(taos_queue param);
int taosGetQsetItemsNumber(taos_qset param);
......
......@@ -22,13 +22,6 @@
extern "C" {
#endif
typedef int32_t (*ProcessStartFp)(void *ahandle, void *pMsg, int32_t qtype);
typedef void (*ProcessEndFp)(void *ahandle, void *pMsg, int32_t qtype, int32_t code);
typedef bool (*ProcessWriteStartFp)(void *ahandle, void *pMsg, int32_t qtype);
typedef void (*ProcessWriteSyncFp)(void *ahandle, int32_t code);
typedef void (*ProcessWriteEndFp)(void *ahandle, void *pMsg, int32_t qtype);
typedef struct SWorker {
int32_t id; // worker ID
pthread_t thread; // thread
......@@ -40,41 +33,36 @@ typedef struct SWorkerPool {
int32_t min; // min number of workers
int32_t num; // current number of workers
taos_qset qset;
const char * name;
ProcessStartFp startFp;
ProcessEndFp endFp;
SWorker * workers;
const char *name;
SWorker *workers;
pthread_mutex_t mutex;
} SWorkerPool;
typedef struct SWriteWorker {
int32_t id; // worker id
pthread_t thread; // thread
taos_qall qall;
taos_qset qset; // queue set
struct SWriteWorkerPool *pool;
} SWriteWorker;
typedef struct SMWorker {
int32_t id; // worker id
pthread_t thread; // thread
taos_qall qall;
taos_qset qset; // queue set
struct SMWorkerPool *pool;
} SMWorker;
typedef struct SWriteWorkerPool {
int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic
const char * name;
ProcessWriteStartFp startFp;
ProcessWriteSyncFp syncFp;
ProcessWriteEndFp endFp;
SWriteWorker * workers;
pthread_mutex_t mutex;
} SWriteWorkerPool;
typedef struct SMWorkerPool {
int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic
const char *name;
SMWorker *workers;
pthread_mutex_t mutex;
} SMWorkerPool;
int32_t tWorkerInit(SWorkerPool *pool);
void tWorkerCleanup(SWorkerPool *pool);
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle);
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp);
void tWorkerFreeQueue(SWorkerPool *pool, taos_queue queue);
int32_t tWriteWorkerInit(SWriteWorkerPool *pool);
void tWriteWorkerCleanup(SWriteWorkerPool *pool);
taos_queue tWriteWorkerAllocQueue(SWriteWorkerPool *pool, void *ahandle);
void tWriteWorkerFreeQueue(SWriteWorkerPool *pool, taos_queue queue);
int32_t tMWorkerInit(SMWorkerPool *pool);
void tMWorkerCleanup(SMWorkerPool *pool);
taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp);
void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue);
#ifdef __cplusplus
}
......
......@@ -3,4 +3,4 @@ add_subdirectory(util)
add_subdirectory(common)
add_subdirectory(libs)
add_subdirectory(client)
add_subdirectory(server)
\ No newline at end of file
add_subdirectory(dnode)
\ No newline at end of file
add_subdirectory(mnode)
add_subdirectory(vnode)
add_subdirectory(qnode)
add_subdirectory(dnode)
aux_source_directory(. TAOSD_SRC)
add_executable(taosd ${TAOSD_SRC})
target_link_libraries(
taosd
PUBLIC dnode
PUBLIC util
)
\ No newline at end of file
add_subdirectory(mgmt)
aux_source_directory(src DNODE_SRC)
add_library(dnode ${DNODE_SRC})
add_executable(taosd ${DNODE_SRC})
target_link_libraries(
dnode
taosd
PUBLIC cjson
PUBLIC mnode
PUBLIC vnode
......@@ -10,7 +10,7 @@ target_link_libraries(
PUBLIC taos
)
target_include_directories(
dnode
taosd
PUBLIC "${CMAKE_SOURCE_DIR}/include/server/dnode"
private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
\ No newline at end of file
)
......@@ -13,16 +13,22 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_EPS_H_
#define _TD_DNODE_EPS_H_
#ifndef _TD_DNODE_DNODE_H_
#define _TD_DNODE_DNODE_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitEps();
void dnodeCleanupEps();
int32_t dnodeInitDnode();
void dnodeCleanupDnode();
void dnodeProcessStatusRsp(SRpcMsg *pMsg);
void dnodeProcessStartupReq(SRpcMsg *pMsg);
void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg);
int32_t dnodeInitConfig();
void dnodeCleanupConfig();
void dnodeUpdateCfg(SDnodeCfg *data);
void dnodeUpdateDnodeEps(SDnodeEps *data);
......@@ -32,11 +38,11 @@ int64_t dnodeGetClusterId();
void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
void dnodeGetEpSetForShell(SRpcEpSet *epSet);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_EPS_H_*/
\ No newline at end of file
#endif /*_TD_DNODE_DNODE_H_*/
\ No newline at end of file
......@@ -24,8 +24,6 @@ extern "C" {
#include "tglobal.h"
#include "tlog.h"
#include "trpc.h"
#include "ttimer.h"
#include "dnode.h"
extern int32_t dDebugFlag;
......@@ -38,6 +36,9 @@ extern int32_t dDebugFlag;
typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat;
int32_t dnodeInit();
void dnodeCleanup();
EDnStat dnodeGetRunStat();
void dnodeSetRunStat();
void dnodeGetStartup(SStartupStep *);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_MNODE_H_
#define _TD_DNODE_MNODE_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitMnode();
void dnodeCleanupMnode();
void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_MNODE_H_*/
\ No newline at end of file
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_TRANS_H_
#define _TD_DNODE_TRANS_H_
#ifndef _TD_DNODE_TRANSPORT_H_
#define _TD_DNODE_TRANSPORT_H_
#ifdef __cplusplus
extern "C" {
......@@ -30,4 +30,4 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
}
#endif
#endif /*_TD_DNODE_TRANS_H_*/
#endif /*_TD_DNODE_TRANSPORT_H_*/
......@@ -13,19 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_CHECK_H_
#define _TD_DNODE_CHECK_H_
#ifndef _TD_DNODE_VNODES_H_
#define _TD_DNODE_VNODES_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
int32_t dnodeInitCheck();
void dnodeCleanupCheck();
int32_t dnodeInitVnodes();
void dnodeCleanupVnodes();
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_CHECK_H_*/
#endif /*_TD_DNODE_VNODES_H_*/
\ No newline at end of file
......@@ -12,9 +12,9 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "ulog.h"
#include "dnode.h"
#define _DEFAULT_SOURCE
#include "dnodeInt.h"
static bool stop = false;
static void sigintHandler(int32_t signum, void *info, void *ctx) { stop = true; }
......@@ -31,17 +31,15 @@ int main(int argc, char const *argv[]) {
int32_t code = dnodeInit();
if (code != 0) {
uInfo("Failed to start TDengine, please check the log at:%s", tsLogDir);
dInfo("Failed to start TDengine, please check the log at:%s", tsLogDir);
exit(EXIT_FAILURE);
}
uInfo("Started TDengine service successfully.");
while (!stop) {
taosMsleep(100);
}
uInfo("TDengine is shut down!");
dInfo("TDengine is shut down!");
dnodeCleanup();
return 0;
......
......@@ -14,108 +14,42 @@
*/
#define _DEFAULT_SOURCE
#include "dnodeCheck.h"
#include "dnodeEps.h"
#include "dnodeMsg.h"
#include "dnodeTrans.h"
#include "mnode.h"
#include "dnodeDnode.h"
#include "dnodeMnode.h"
#include "dnodeTransport.h"
#include "dnodeVnodes.h"
#include "sync.h"
#include "tcache.h"
#include "tconfig.h"
#include "tnote.h"
#include "tstep.h"
#include "vnode.h"
#include "wal.h"
static struct {
EDnStat runStatus;
SStartupStep startup;
SSteps *steps;
} tsDnode;
} tsInt;
EDnStat dnodeGetRunStat() { return tsDnode.runStatus; }
EDnStat dnodeGetRunStat() { return tsInt.runStatus; }
void dnodeSetRunStat(EDnStat stat) { tsDnode.runStatus = stat; }
void dnodeSetRunStat(EDnStat stat) { tsInt.runStatus = stat; }
void dnodeReportStartup(char *name, char *desc) {
SStartupStep *startup = &tsDnode.startup;
static void dnodeReportStartup(char *name, char *desc) {
SStartupStep *startup = &tsInt.startup;
tstrncpy(startup->name, name, strlen(startup->name));
tstrncpy(startup->desc, desc, strlen(startup->desc));
startup->finished = 0;
}
static void dnodeReportStartupFinished(char *name, char *desc) {
SStartupStep *startup = &tsDnode.startup;
SStartupStep *startup = &tsInt.startup;
tstrncpy(startup->name, name, strlen(startup->name));
tstrncpy(startup->desc, desc, strlen(startup->desc));
startup->finished = 1;
}
void dnodeGetStartup(SStartupStep *pStep) { memcpy(pStep, &tsDnode.startup, sizeof(SStartupStep)); }
static int32_t dnodeInitVnode() {
SVnodePara para;
para.fp.GetDnodeEp = dnodeGetEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.fp.ReportStartup = dnodeReportStartup;
return vnodeInit(para);
}
static int32_t dnodeInitMnode() {
SMnodePara para;
para.fp.GetDnodeEp = dnodeGetEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.fp.SendRedirectMsg = dnodeSendRedirectMsg;
para.dnodeId = dnodeGetDnodeId();
para.clusterId = dnodeGetClusterId();
return mnodeInit(para);
}
static int32_t dnodeInitTfs() {}
static int32_t dnodeInitMain() {
tsDnode.runStatus = DN_RUN_STAT_STOPPED;
tscEmbedded = 1;
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
taosInitGlobalCfg();
taosReadGlobalLogCfg();
taosSetCoreDump(tsEnableCoreFile);
if (!taosMkDir(tsLogDir)) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
return -1;
}
char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/taosdlog", tsLogDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
printf("failed to init log file\n");
}
if (!taosReadGlobalCfg()) {
taosPrintGlobalCfg();
dError("TDengine read global config failed");
return -1;
}
dInfo("start to initialize TDengine");
taosInitNotes();
return taosCheckGlobalCfg();
}
static void dnodeCleanupMain() {
taos_cleanup();
taosCloseLog();
taosStopCacheRefreshWorker();
}
void dnodeGetStartup(SStartupStep *pStep) { memcpy(pStep, &tsInt.startup, sizeof(SStartupStep)); }
static int32_t dnodeCheckRunning(char *dir) {
char filepath[256] = {0};
......@@ -164,27 +98,68 @@ static int32_t dnodeInitDir() {
return 0;
}
static void dnodeCleanupDir() {}
static int32_t dnodeInitMain() {
tsInt.runStatus = DN_RUN_STAT_STOPPED;
tscEmbedded = 1;
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
taosInitGlobalCfg();
taosReadGlobalLogCfg();
taosSetCoreDump(tsEnableCoreFile);
if (!taosMkDir(tsLogDir)) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
return -1;
}
char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/taosdlog", tsLogDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
printf("failed to init log file\n");
}
if (!taosReadGlobalCfg()) {
taosPrintGlobalCfg();
dError("TDengine read global config failed");
return -1;
}
dInfo("start to initialize TDengine");
taosInitNotes();
if (taosCheckGlobalCfg() != 0) {
return -1;
}
dnodeInitDir();
return -1;
}
static void dnodeCleanupMain() {
taos_cleanup();
taosCloseLog();
taosStopCacheRefreshWorker();
}
int32_t dnodeInit() {
SSteps *steps = taosStepInit(24, dnodeReportStartup);
if (steps == NULL) return -1;
taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain);
taosStepAdd(steps, "dnode-dir", dnodeInitDir, dnodeCleanupDir);
taosStepAdd(steps, "dnode-check", dnodeInitCheck, dnodeCleanupCheck);
taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup);
taosStepAdd(steps, "dnode-tfs", dnodeInitTfs, NULL);
taosStepAdd(steps, "dnode-tfs", NULL, NULL);
taosStepAdd(steps, "dnode-wal", walInit, walCleanUp);
taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp);
taosStepAdd(steps, "dnode-eps", dnodeInitEps, dnodeCleanupEps);
taosStepAdd(steps, "dnode-vnode", dnodeInitVnode, vnodeCleanup);
taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, mnodeCleanup);
taosStepAdd(steps, "dnode-dnode", dnodeInitDnode, dnodeCleanupDnode);
taosStepAdd(steps, "dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes);
taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, dnodeCleanupMnode);
taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans);
taosStepAdd(steps, "dnode-msg", dnodeInitMsg, dnodeCleanupMsg);
tsDnode.steps = steps;
taosStepExec(tsDnode.steps);
tsInt.steps = steps;
taosStepExec(tsInt.steps);
dnodeSetRunStat(DN_RUN_STAT_RUNNING);
dnodeReportStartupFinished("TDengine", "initialized successfully");
......@@ -196,7 +171,7 @@ int32_t dnodeInit() {
void dnodeCleanup() {
if (dnodeGetRunStat() != DN_RUN_STAT_STOPPED) {
dnodeSetRunStat(DN_RUN_STAT_STOPPED);
taosStepCleanup(tsDnode.steps);
tsDnode.steps = NULL;
taosStepCleanup(tsInt.steps);
tsInt.steps = NULL;
}
}
......@@ -13,67 +13,50 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_H_
#define _TD_DNODE_H_
#define _DEFAULT_SOURCE
#include "dnodeMnode.h"
#include "dnodeDnode.h"
#include "dnodeTransport.h"
#include "mnode.h"
int32_t dnodeInitMnode() {
SMnodePara para;
para.fp.GetDnodeEp = dnodeGetEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.fp.SendRedirectMsg = dnodeSendRedirectMsg;
para.dnodeId = dnodeGetDnodeId();
para.clusterId = dnodeGetClusterId();
return mnodeInit(para);
}
#ifdef __cplusplus
extern "C" {
#endif
void dnodeCleanupMnode() { mnodeCleanup(); }
struct SRpcEpSet;
struct SRpcMsg;
/**
* Initialize and start the dnode module.
*
* @return Error code.
*/
int32_t dnodeInit();
static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->mnodeNum = htonl(pCfg->mnodeNum);
for (int32_t i = 0; i < pCfg->mnodeNum; ++i) {
pCfg->mnodeEps[i].dnodeId = htonl(pCfg->mnodeEps[i].dnodeId);
pCfg->mnodeEps[i].dnodePort = htons(pCfg->mnodeEps[i].dnodePort);
}
/**
* Stop and cleanup dnode module.
*/
void dnodeCleanup();
if (pCfg->dnodeId != dnodeGetDnodeId()) {
dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
}
/**
* Send messages to other dnodes, such as create vnode message.
*
* @param epSet, the endpoint list of the dnodes.
* @param rpcMsg, message to be sent.
*/
void dnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
/**
* Send messages to mnode, such as config message.
*
* @param rpcMsg, message to be sent.
*/
void dnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
if (mnodeGetStatus() == MN_STATUS_READY) return 0;
/**
* Send redirect message to dnode or shell.
*
* @param rpcMsg, message to be sent.
* @param forShell, used to identify whether to send to shell or dnode.
*/
void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
/**
* Get the corresponding endpoint information from dnodeId.
*
* @param dnodeId, the id ot dnode.
* @param ep, the endpoint of dnode.
* @param fqdn, the fqdn of dnode.
* @param port, the port of dnode.
*/
void dnodeGetEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
return mnodeDeploy();
}
/**
* Report the startup progress.
*/
void dnodeReportStartup(char *name, char *desc);
void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) {
int32_t code = dnodeStartMnode(pMsg);
#ifdef __cplusplus
}
#endif
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
#endif /*_TD_DNODE_H_*/
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
\ No newline at end of file
......@@ -20,24 +20,108 @@
*/
#define _DEFAULT_SOURCE
#include "dnodeTrans.h"
#include "dnodeEps.h"
#include "dnodeMsg.h"
#include "dnodeTransport.h"
#include "dnodeDnode.h"
#include "dnodeMnode.h"
#include "dnodeVnodes.h"
#include "mnode.h"
#include "vnode.h"
typedef void (*RpcMsgFp)(SRpcMsg *pMsg);
typedef void (*MsgFp)(SRpcMsg *pMsg);
static struct {
void *serverRpc;
void *clientRpc;
void *shellRpc;
int32_t queryReqNum;
int32_t submitReqNum;
RpcMsgFp peerMsgFp[TSDB_MSG_TYPE_MAX];
RpcMsgFp shellMsgFp[TSDB_MSG_TYPE_MAX];
void *serverRpc;
void *clientRpc;
void *shellRpc;
MsgFp msgFp[TSDB_MSG_TYPE_MAX];
} tsTrans;
static void dnodeInitMsgFp() {
// msg from client to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq;
// msg from client to mnode
tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_USER] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_USER] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_USER] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_USE_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_DB] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = mnodeProcessMsg;
// message from mnode to dnode
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = vnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessCreateMnodeReq;
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = NULL;
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = NULL;
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = mnodeProcessMsg;
// message from dnode to mnode
tsTrans.msgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = NULL;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = NULL;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
tsTrans.msgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
}
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
int32_t msgType = pMsg->msgType;
......@@ -61,7 +145,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
return;
}
RpcMsgFp fp = tsTrans.peerMsgFp[msgType];
MsgFp fp = tsTrans.msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg);
......@@ -74,35 +158,6 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}
static int32_t dnodeInitServer() {
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeReq;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeReq;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeDnodePort;
......@@ -145,7 +200,7 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
dnodeUpdateMnodeEps(pEpSet);
}
RpcMsgFp fp = tsTrans.peerMsgFp[msgType];
MsgFp fp = tsTrans.msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg);
......@@ -157,28 +212,6 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}
static int32_t dnodeInitClient() {
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
char secret[TSDB_KEY_LEN] = "secret";
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
......@@ -234,14 +267,7 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
return;
}
if (msgType == TSDB_MSG_TYPE_QUERY) {
atomic_fetch_add_32(&tsTrans.queryReqNum, 1);
} else if (msgType == TSDB_MSG_TYPE_SUBMIT) {
atomic_fetch_add_32(&tsTrans.submitReqNum, 1);
} else {
}
RpcMsgFp fp = tsTrans.shellMsgFp[msgType];
MsgFp fp = tsTrans.msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg);
......@@ -299,54 +325,6 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c
}
static int32_t dnodeInitShell() {
tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
// the following message shall be treated as mnode write
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessMsg;
// the following message shall be treated as mnode query
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) {
numOfThreads = 1;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeDnode.h"
#include "vnode.h"
int32_t dnodeInitVnodes() { return vnodeInit(); }
void dnodeCleanupVnodes() { vnodeCleanup(); }
\ No newline at end of file
add_subdirectory(meta)
add_subdirectory(tq)
add_subdirectory(tsdb)
add_subdirectory(impl)
\ No newline at end of file
add_subdirectory(meta)
add_subdirectory(tq)
add_subdirectory(tsdb)
aux_source_directory(src VNODE_SRC)
add_library(vnode STATIC ${VNODE_SRC})
target_include_directories(
......
......@@ -16,6 +16,8 @@
#ifndef _TD_VNODE_COMMIT_H_
#define _TD_VNODE_COMMIT_H_
#include "vnodeInt.h"
#ifdef __cplusplus
extern "C" {
#endif
......
......@@ -16,19 +16,14 @@
#ifndef _TD_VNODE_INT_H_
#define _TD_VNODE_INT_H_
#include "os.h"
#include "vnode.h"
#include "amalloc.h"
#include "meta.h"
#include "sync.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tlog.h"
#include "tq.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "tworker.h"
#include "vnode.h"
#include "wal.h"
#ifdef __cplusplus
......@@ -44,71 +39,16 @@ extern int32_t vDebugFlag;
#define vDebug(...) { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
#define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
typedef struct STsdbCfg {
int32_t cacheBlockSize; // MB
int32_t totalBlocks;
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t minRowsPerFileBlock;
int32_t maxRowsPerFileBlock;
uint8_t precision; // time resolution
int8_t compression;
int8_t cacheLastRow;
int8_t update;
} STsdbCfg;
typedef struct SMetaCfg {
} SMetaCfg;
typedef struct SVnodeCfg {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int8_t dropped;
int8_t quorum;
SWalCfg wal;
STsdbCfg tsdb;
SMetaCfg meta;
SSyncCluster sync;
} SVnodeCfg;
typedef struct {
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
SMemAllocator *allocator;
SMeta *pMeta;
STsdb *pTsdb;
STQ *pTQ;
SWal *pWal;
void *pQuery;
SSyncNode *pSync;
taos_queue pWriteQ; // write queue
taos_queue pQueryQ; // read query queue
taos_queue pFetchQ; // read fetch/cancel queue
SVnodeCfg cfg;
SSyncServerState term;
int64_t queuedWMsgSize;
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t numOfQHandle; // current initialized and existed query handle in current dnode
int8_t role;
int8_t accessState;
int8_t dropped;
int8_t status;
pthread_mutex_t statusMutex;
typedef struct SVnode {
int32_t vgId;
SVnodeCfg cfg;
SMeta *pMeta;
STsdb *pTsdb;
STQ *pTQ;
SWal *pWal;
SSyncNode *pSync;
} SVnode;
typedef struct {
int32_t len;
void *rsp;
void *qhandle; // used by query and retrieve msg
} SVnRsp;
void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
void vnodeReportStartup(char *name, char *desc);
#ifdef __cplusplus
}
#endif
......
......@@ -16,7 +16,7 @@
#ifndef _TD_VNODE_MEM_ALLOCATOR_H_
#define _TD_VNODE_MEM_ALLOCATOR_H_
#include "os.h"
#include "vnodeInt.h"
#ifdef __cplusplus
extern "C" {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_READ_H_
#define _TD_VNODE_READ_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
void vnodeProcessReadMsg(SVnode *pVnode, SVnodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_READ_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_WRITE_H_
#define _TD_VNODE_WRITE_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
void vnodeProcessWriteMsg(SVnode* pVnode, SVnodeMsg* pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_WRITE_H_*/
......@@ -13,21 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_CFG_H_
#define _TD_VNODE_CFG_H_
#ifdef __cplusplus
extern "C" {
#endif
#define _DEFAULT_SOURCE
#include "vnodeInt.h"
int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg);
int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg);
int32_t vnodeReadState(int32_t vgId, SSyncServerState *pState);
int32_t vnodeSaveState(int32_t vgid, SSyncServerState *pState);
int32_t vnodeInit() { return 0; }
void vnodeCleanup() {}
int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat) { return 0; }
int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus) { return 0; }
#ifdef __cplusplus
}
#endif
SVnode *vnodeOpen(int32_t vgId, const char *path) { return NULL; }
void vnodeClose(SVnode *pVnode) {}
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; }
int32_t vnodeDrop(SVnode *pVnode) { return 0; }
int32_t vnodeCompact(SVnode *pVnode) { return 0; }
int32_t vnodeSync(SVnode *pVnode) { return 0; }
#endif /*_TD_VNODE_CFG_H_*/
int32_t vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg) { return 0; }
......@@ -12,3 +12,6 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "vnodeRead.h"
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "vnodeWrite.h"
......@@ -6,4 +6,4 @@ target_sources(VMATest
"vnodeMemAllocatorTest.cpp"
)
target_include_directories(VMATest PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../inc")
target_link_libraries(VMATest os gtest_main)
\ No newline at end of file
target_link_libraries(VMATest os gtest_main vnode)
\ No newline at end of file
......@@ -43,35 +43,33 @@ typedef struct TqMetaList {
typedef struct TqMetaStore {
TqMetaList* inUse[TQ_INUSE_SIZE];
//a table head, key is empty
TqMetaList* unpersistHead;
//deserializer
//serializer
//deleter
int fileFd; //TODO:temporaral use
int idxFd; //TODO:temporaral use
void* (*serializer)(void*);
void* (*deserializer)(void*);
void (*deleter)(void*);
} TqMetaStore;
typedef struct TqMetaPageBuf {
int16_t offset;
char buffer[TQ_PAGE_SIZE];
} TqMetaPageBuf;
TqMetaStore* tqStoreOpen(const char* path, void* serializer(void* ), void* deserializer(void*), void deleter(void*));
int32_t tqStoreClose(TqMetaStore*);
int32_t tqStoreDelete(TqMetaStore*);
//int32_t TqStoreCommitAll(TqMetaStore*);
int32_t tqStorePersist(TqMetaStore*);
TqMetaStore* TqStoreOpen(const char* path, void* serializer(void* ), void* deserializer(void*));
int32_t TqStoreClose(TqMetaStore*);
int32_t TqStoreDelete(TqMetaStore*);
int32_t TqStoreCommitAll(TqMetaStore*);
int32_t TqStorePersist(TqMetaStore*);
TqMetaHandle* TqHandleGetInUse(TqMetaStore*, int64_t key);
int32_t TqHandlePutInUse(TqMetaStore*, TqMetaHandle* handle);
TqMetaHandle* TqHandleGetInTxn(TqMetaStore*, int64_t key);
int32_t TqHandlePutInTxn(TqMetaStore*, TqMetaHandle* handle);
TqMetaHandle* tqHandleGetInUse(TqMetaStore*, int64_t key);
int32_t tqHandlePutInUse(TqMetaStore*, TqMetaHandle* handle);
TqMetaHandle* tqHandleGetInTxn(TqMetaStore*, int64_t key);
int32_t tqHandlePutInTxn(TqMetaStore*, TqMetaHandle* handle);
//delete in-use-handle, make in-txn-handle in use
int32_t TqHandleCommit(TqMetaStore*, int64_t key);
int32_t tqHandleCommit(TqMetaStore*, int64_t key);
//delete in-txn-handle
int32_t TqHandleAbort(TqMetaStore*, int64_t key);
int32_t tqHandleAbort(TqMetaStore*, int64_t key);
//delete in-use-handle
int32_t TqHandleDel(TqMetaStore*, int64_t key);
int32_t tqHandleDel(TqMetaStore*, int64_t key);
//delete in-use-handle and in-txn-handle
int32_t TqHandleClear(TqMetaStore*, int64_t key);
int32_t tqHandleClear(TqMetaStore*, int64_t key);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tqMetaStore.h"
//TODO:replace by a abstract file layer
#include <fcntl.h>
#include <unistd.h>
typedef struct TqMetaPageBuf {
int16_t offset;
char buffer[TQ_PAGE_SIZE];
} TqMetaPageBuf;
TqMetaStore* tqStoreOpen(const char* path, void* serializer(void*),
void* deserializer(void*), void deleter(void*)) {
//concat data file name and index file name
int fileFd = open(path, O_WRONLY | O_CREAT | O_EXCL, 0755);
if(fileFd < 0) return NULL;
TqMetaStore* pMeta = malloc(sizeof(TqMetaStore));
if(pMeta == NULL) {
//close
return NULL;
}
memset(pMeta, 0, sizeof(TqMetaStore));
pMeta->fileFd = fileFd;
int idxFd = open(path, O_WRONLY | O_CREAT | O_EXCL, 0755);
if(idxFd < 0) {
//close file
//free memory
return NULL;
}
pMeta->idxFd = idxFd;
pMeta->unpersistHead = malloc(sizeof(TqMetaList));
if(pMeta->unpersistHead == NULL) {
//close file
//free memory
return NULL;
}
pMeta->serializer = serializer;
pMeta->deserializer = deserializer;
pMeta->deleter = deleter;
return pMeta;
}
int32_t tqStoreClose(TqMetaStore* pMeta) {
//commit data and idx
//close file
//free memory
return 0;
}
int32_t tqStoreDelete(TqMetaStore* pMeta) {
//close file
//delete file
//free memory
return 0;
}
int32_t tqStorePersist(TqMetaStore* pMeta) {
TqMetaList *node = pMeta->unpersistHead;
while(node->unpersistNext) {
//serialize
//append data
//write offset and idx
//remove from unpersist list
}
return 0;
}
int32_t tqHandlePutInUse(TqMetaStore* pMeta, TqMetaHandle* handle) {
return 0;
}
TqMetaHandle* tqHandleGetInUse(TqMetaStore* pMeta, int64_t key) {
return NULL;
}
int32_t tqHandlePutInTxn(TqMetaStore* pMeta, TqMetaHandle* handle) {
return 0;
}
TqMetaHandle* tqHandleGetInTxn(TqMetaStore* pMeta, int64_t key) {
return NULL;
}
int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) {
return 0;
}
int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) {
return 0;
}
int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) {
return 0;
}
int32_t tqHandleClear(TqMetaStore* pMeta, int64_t key) {
return 0;
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册