提交 ba924223 编写于 作者: H Hongze Cheng

Merge branch '3.0' into feature/vnode

...@@ -672,16 +672,16 @@ typedef struct { ...@@ -672,16 +672,16 @@ typedef struct {
} SDnodeCfg; } SDnodeCfg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t id;
int8_t isMnode; int8_t isMnode;
int8_t reserved; int8_t reserved;
uint16_t dnodePort; uint16_t port;
char dnodeFqdn[TSDB_FQDN_LEN]; char fqdn[TSDB_FQDN_LEN];
} SDnodeEp; } SDnodeEp;
typedef struct { typedef struct {
int32_t dnodeNum; int32_t num;
SDnodeEp dnodeEps[]; SDnodeEp eps[];
} SDnodeEps; } SDnodeEps;
typedef struct { typedef struct {
...@@ -820,9 +820,9 @@ typedef struct { ...@@ -820,9 +820,9 @@ typedef struct {
} SCreateDnodeMsg, SDropDnodeMsg; } SCreateDnodeMsg, SDropDnodeMsg;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int8_t replica; int8_t replica;
int8_t reserved[3]; int8_t reserved[3];
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
} SCreateMnodeMsg, SAlterMnodeMsg, SDropMnodeMsg; } SCreateMnodeMsg, SAlterMnodeMsg, SDropMnodeMsg;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_H_
#define _TD_DNODE_H_
#include "tdef.h"
#ifdef __cplusplus
extern "C" {
#endif
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode;
typedef struct {
/**
* @brief software version of the program.
*
*/
int32_t sver;
/**
* @brief num of CPU cores.
*
*/
int32_t numOfCores;
/**
* @brief number of threads per CPU core.
*
*/
float numOfThreadsPerCore;
/**
* @brief the proportion of total CPU cores available for query processing.
*
*/
float ratioOfQueryCores;
/**
* @brief max number of connections allowed in dnode.
*
*/
int32_t maxShellConns;
/**
* @brief time interval of heart beat from shell to dnode, seconds.
*
*/
int32_t shellActivityTimer;
/**
* @brief time interval of dnode status reporting to mnode, seconds, for cluster only.
*
*/
int32_t statusInterval;
/**
* @brief first port number for the connection (12 continuous UDP/TCP port number are used).
*
*/
uint16_t serverPort;
/**
* @brief data file's directory.
*
*/
char dataDir[PATH_MAX];
/**
* @brief local endpoint.
*
*/
char localEp[TSDB_EP_LEN];
/**
* @brieflocal fully qualified domain name (FQDN).
*
*/
char localFqdn[TSDB_FQDN_LEN];
/**
* @brief first fully qualified domain name (FQDN) for TDengine system.
*
*/
char firstEp[TSDB_EP_LEN];
/**
* @brief system time zone.
*
*/
char timezone[TSDB_TIMEZONE_LEN];
/**
* @brief system locale.
*
*/
char locale[TSDB_LOCALE_LEN];
/**
* @briefdefault system charset.
*
*/
char charset[TSDB_LOCALE_LEN];
} SDnodeOpt;
/* ------------------------ SDnode ------------------------ */
/**
* @brief Initialize and start the dnode.
*
* @param pOptions Options of the dnode.
* @return SDnode* The dnode object.
*/
SDnode *dndInit(SDnodeOpt *pOptions);
/**
* @brief Stop and cleanup the dnode.
*
* @param pDnode The dnode object to close.
*/
void dndCleanup(SDnode *pDnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_H_*/
...@@ -20,17 +20,16 @@ ...@@ -20,17 +20,16 @@
extern "C" { extern "C" {
#endif #endif
typedef enum { MN_MSG_TYPE_WRITE = 1, MN_MSG_TYPE_APPLY, MN_MSG_TYPE_SYNC, MN_MSG_TYPE_READ } EMnMsgType; /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode;
typedef struct SMnode SMnode;
typedef struct SMnodeMsg SMnodeMsg; typedef struct SMnodeMsg SMnodeMsg;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell);
typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnd, SMnodeMsg *pMsg);
typedef struct { typedef struct SMnodeLoad {
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
} SMnodeCfg;
typedef struct {
int64_t numOfDnode; int64_t numOfDnode;
int64_t numOfMnode; int64_t numOfMnode;
int64_t numOfVgroup; int64_t numOfVgroup;
...@@ -44,29 +43,132 @@ typedef struct { ...@@ -44,29 +43,132 @@ typedef struct {
} SMnodeLoad; } SMnodeLoad;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
void (*SendMsgToDnode)(struct SEpSet *epSet, struct SRpcMsg *rpcMsg); int8_t replica;
void (*SendMsgToMnode)(struct SRpcMsg *rpcMsg); int8_t selfIndex;
void (*SendRedirectMsg)(struct SRpcMsg *rpcMsg, bool forShell); SReplica replicas[TSDB_MAX_REPLICA];
int32_t (*PutMsgIntoApplyQueue)(SMnodeMsg *pMsg); struct SDnode *pDnode;
} SMnodePara; PutMsgToMnodeQFp putMsgToApplyMsgFp;
SendMsgToDnodeFp sendMsgToDnodeFp;
int32_t mnodeInit(SMnodePara para); SendMsgToMnodeFp sendMsgToMnodeFp;
void mnodeCleanup(); SendRedirectMsgFp sendRedirectMsgFp;
} SMnodeOptions;
int32_t mnodeDeploy(SMnodeCfg *pCfg);
void mnodeUnDeploy(); /* ------------------------ SMnode ------------------------ */
int32_t mnodeStart(SMnodeCfg *pCfg); /**
int32_t mnodeAlter(SMnodeCfg *pCfg); * @brief Open a mnode.
void mnodeStop(); *
* @param path Path of the mnode
int32_t mnodeGetLoad(SMnodeLoad *pLoad); * @param pOptions Options of the mnode
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); * @return SMnode* The mnode object
*/
SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg); SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions);
void mnodeCleanupMsg(SMnodeMsg *pMsg);
void mnodeProcessMsg(SMnodeMsg *pMsg, EMnMsgType msgType); /**
* @brief Close a mnode
*
* @param pMnode The mnode object to close
*/
void mnodeClose(SMnode *pMnode);
/**
* @brief Close a mnode
*
* @param pMnode The mnode object to close
* @param pOptions Options of the mnode
* @return int32_t 0 for success, -1 for failure
*/
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions);
/**
* @brief Drop a mnode.
*
* @param path Path of the mnode.
*/
void mnodeDestroy(const char *path);
/**
* @brief Get mnode statistics info
*
* @param pMnode The mnode object
* @param pLoad Statistics of the mnode.
* @return int32_t 0 for success, -1 for failure
*/
int32_t mnodeGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
/**
* @brief Get user authentication info
*
* @param pMnode The mnode object
* @param user
* @param spi
* @param encrypt
* @param secret
* @param ckey
* @return int32_t 0 for success, -1 for failure
*/
int32_t mnodeRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey);
/**
* @brief Initialize mnode msg
*
* @param pMnode The mnode object
* @param pMsg The request rpc msg
* @return int32_t The created mnode msg
*/
SMnodeMsg *mnodeInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg);
/**
* @brief Cleanup mnode msg
*
* @param pMsg The request msg
*/
void mnodeCleanupMsg(SMnodeMsg *pMsg);
/**
* @brief Cleanup mnode msg
*
* @param pMsg The request msg
* @param code The error code
*/
void mnodeSendRsp(SMnodeMsg *pMsg, int32_t code);
/**
* @brief Process the read request
*
* @param pMnode The mnode object
* @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure
*/
void mnodeProcessReadMsg(SMnode *pMnode, SMnodeMsg *pMsg);
/**
* @brief Process the write request
*
* @param pMnode The mnode object
* @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure
*/
void mnodeProcessWriteMsg(SMnode *pMnode, SMnodeMsg *pMsg);
/**
* @brief Process the sync request
*
* @param pMnode The mnode object
* @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure
*/
void mnodeProcessSyncMsg(SMnode *pMnode, SMnodeMsg *pMsg);
/**
* @brief Process the apply request
*
* @param pMnode The mnode object
* @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure
*/
void mnodeProcessApplyMsg(SMnode *pMnode, SMnodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -56,39 +56,39 @@ extern "C" { ...@@ -56,39 +56,39 @@ extern "C" {
dataPos += valLen; \ dataPos += valLen; \
} }
#define SDB_SET_INT64(pData, dataPos, val) \ #define SDB_SET_INT64(pRaw, dataPos, val) \
{ \ { \
if (sdbSetRawInt64(pRaw, dataPos, val) != 0) { \ if (sdbSetRawInt64(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \ sdbFreeRaw(pRaw); \
return NULL; \ return NULL; \
}; \ } \
dataPos += sizeof(int64_t); \ dataPos += sizeof(int64_t); \
} }
#define SDB_SET_INT32(pData, dataPos, val) \ #define SDB_SET_INT32(pRaw, dataPos, val) \
{ \ { \
if (sdbSetRawInt32(pRaw, dataPos, val) != 0) { \ if (sdbSetRawInt32(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \ sdbFreeRaw(pRaw); \
return NULL; \ return NULL; \
}; \ } \
dataPos += sizeof(int32_t); \ dataPos += sizeof(int32_t); \
} }
#define SDB_SET_INT8(pData, dataPos, val) \ #define SDB_SET_INT8(pRaw, dataPos, val) \
{ \ { \
if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \ if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \ sdbFreeRaw(pRaw); \
return NULL; \ return NULL; \
}; \ } \
dataPos += sizeof(int8_t); \ dataPos += sizeof(int8_t); \
} }
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen) \ #define SDB_SET_BINARY(pRaw, dataPos, val, valLen) \
{ \ { \
if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \ if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
sdbFreeRaw(pRaw); \ sdbFreeRaw(pRaw); \
return NULL; \ return NULL; \
}; \ } \
dataPos += valLen; \ dataPos += valLen; \
} }
...@@ -97,7 +97,7 @@ extern "C" { ...@@ -97,7 +97,7 @@ extern "C" {
if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \ if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \
sdbFreeRaw(pRaw); \ sdbFreeRaw(pRaw); \
return NULL; \ return NULL; \
}; \ } \
} }
typedef struct SSdbRaw SSdbRaw; typedef struct SSdbRaw SSdbRaw;
...@@ -144,6 +144,8 @@ typedef struct { ...@@ -144,6 +144,8 @@ typedef struct {
SdbDeleteFp deleteFp; SdbDeleteFp deleteFp;
} SSdbTable; } SSdbTable;
typedef struct SSdb SSdb;
int32_t sdbInit(); int32_t sdbInit();
void sdbCleanup(); void sdbCleanup();
void sdbSetTable(SSdbTable table); void sdbSetTable(SSdbTable table);
......
...@@ -184,10 +184,16 @@ typedef struct { ...@@ -184,10 +184,16 @@ typedef struct {
SRpcMsg rpcMsg[]; SRpcMsg rpcMsg[];
} SVnodeMsg; } SVnodeMsg;
typedef struct SDnode SDnode;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell);
typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnd, int32_t vgId, SVnodeMsg *pMsg);
typedef struct { typedef struct {
void (*SendMsgToDnode)(SEpSet *pEpSet, SRpcMsg *pMsg); PutMsgToVnodeQFp putMsgToApplyQueueFp;
void (*SendMsgToMnode)(SRpcMsg *pMsg); SendMsgToDnodeFp sendMsgToDnodeFp;
int32_t (*PutMsgIntoApplyQueue)(int32_t vgId, SVnodeMsg *pMsg); SendMsgToMnodeFp sendMsgToMnodeFp;
} SVnodePara; } SVnodePara;
int32_t vnodeInit(SVnodePara); int32_t vnodeInit(SVnodePara);
......
...@@ -51,7 +51,7 @@ typedef struct SRpcMsg { ...@@ -51,7 +51,7 @@ typedef struct SRpcMsg {
} SRpcMsg; } SRpcMsg;
typedef struct SRpcInit { typedef struct SRpcInit {
uint16_t localPort; // local port uint16_t localPort; // local port
char *label; // for debug purpose char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections int numOfThreads; // number of threads to handle connections
int sessions; // number of sessions allowed int sessions; // number of sessions allowed
...@@ -66,10 +66,12 @@ typedef struct SRpcInit { ...@@ -66,10 +66,12 @@ typedef struct SRpcInit {
char *ckey; // ciphering key char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app // call back to process incoming msg, code shall be ignored by server app
void (*cfp)(SRpcMsg *, SEpSet *); void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
void *parent;
} SRpcInit; } SRpcInit;
int32_t rpcInit(); int32_t rpcInit();
......
...@@ -16,11 +16,21 @@ ...@@ -16,11 +16,21 @@
#define _TD_WAL_H_ #define _TD_WAL_H_
#include "os.h" #include "os.h"
#include "tdef.h"
#include "tlog.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
extern int32_t wDebugFlag;
#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); }}
#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); }}
#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", 255, __VA_ARGS__); }}
#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", 255, __VA_ARGS__); }}
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
typedef enum { typedef enum {
TAOS_WAL_NOLOG = 0, TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1, TAOS_WAL_WRITE = 1,
...@@ -28,9 +38,8 @@ typedef enum { ...@@ -28,9 +38,8 @@ typedef enum {
} EWalType; } EWalType;
typedef struct { typedef struct {
int8_t msgType; int8_t sver;
int8_t sver; // sver 2 for WAL SDataRow/SMemRow compatibility int8_t reserved[3];
int8_t reserved[2];
int32_t len; int32_t len;
int64_t version; int64_t version;
uint32_t signature; uint32_t signature;
...@@ -44,11 +53,33 @@ typedef struct { ...@@ -44,11 +53,33 @@ typedef struct {
EWalType walLevel; // wal level EWalType walLevel; // wal level
} SWalCfg; } SWalCfg;
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_FILE_NUM 1 // 3
typedef struct SWal { typedef struct SWal {
int8_t unused; int64_t version;
int64_t fileId;
int64_t rId;
int64_t tfd;
int32_t vgId;
int32_t keep;
int32_t level;
int32_t fsyncPeriod;
int32_t fsyncSeq;
int8_t stop;
int8_t reseved[3];
char path[WAL_PATH_LEN];
char name[WAL_FILE_LEN];
pthread_mutex_t mutex;
} SWal; // WAL HANDLE } SWal; // WAL HANDLE
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, void *pMsg);
// module initialization // module initialization
int32_t walInit(); int32_t walInit();
...@@ -82,6 +113,11 @@ int64_t walGetSnapshotVer(SWal *); ...@@ -82,6 +113,11 @@ int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *); int64_t walGetLastVer(SWal *);
// int32_t walDataCorrupted(SWal*); // int32_t walDataCorrupted(SWal*);
//internal
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -46,6 +46,7 @@ extern "C" { ...@@ -46,6 +46,7 @@ extern "C" {
#include <math.h> #include <math.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/types.h> #include <sys/types.h>
#include <dirent.h>
#include <unistd.h> #include <unistd.h>
#include "osAtomic.h" #include "osAtomic.h"
......
...@@ -20,12 +20,12 @@ ...@@ -20,12 +20,12 @@
extern "C" { extern "C" {
#endif #endif
void taosRemoveDir(const char *dirname); void taosRemoveDir(const char *dirname);
bool taosDirExist(char *dirname); int32_t taosDirExist(char *dirname);
bool taosMkDir(const char *dirname); int32_t taosMkDir(const char *dirname);
void taosRemoveOldFiles(char *dirname, int32_t keepDays); void taosRemoveOldFiles(char *dirname, int32_t keepDays);
bool taosExpandDir(char *dirname, char *outname, int32_t maxlen); int32_t taosExpandDir(char *dirname, char *outname, int32_t maxlen);
bool taosRealPath(char *dirname, int32_t maxlen); int32_t taosRealPath(char *dirname, int32_t maxlen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -68,12 +68,13 @@ int32_t* taosGetErrno(); ...@@ -68,12 +68,13 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0106) #define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0106)
#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0107) #define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0107)
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0108) #define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0108)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0109) #define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0109)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x010A) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110)
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x010B) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111)
#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x010C) #define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112)
#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x010D) #define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x0113)
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010E) #define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0114)
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0115)
//client //client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation") #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation")
...@@ -223,20 +224,20 @@ int32_t* taosGetErrno(); ...@@ -223,20 +224,20 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0395) //"Topic already exists) #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0395) //"Topic already exists)
// dnode // dnode
#define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) //"Message not processed") #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
#define TSDB_CODE_DND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0401) //"Dnode out of memory") #define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0401)
#define TSDB_CODE_DND_MNODE_ID_NOT_MATCH_DNODE TAOS_DEF_ERROR_CODE(0, 0x0402) //"Mnode Id not match Dnode") #define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0402)
#define TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403) //"Mnode already deployed") #define TSDB_CODE_DND_DNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0410)
#define TSDB_CODE_DND_MNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0404) //"Mnode not deployed") #define TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0411)
#define TSDB_CODE_DND_READ_MNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0405) //"Read mnode.json error") #define TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0420)
#define TSDB_CODE_DND_WRITE_MNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0406) //"Write mnode.json error") #define TSDB_CODE_DND_MNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0421)
#define TSDB_CODE_DND_NO_WRITE_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0407) //"No permission for disk files in dnode") #define TSDB_CODE_DND_MNODE_ID_INVALID TAOS_DEF_ERROR_CODE(0, 0x0422)
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0408) //"Invalid message length") #define TSDB_CODE_DND_MNODE_ID_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0423)
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0409) //"Action in progress") #define TSDB_CODE_DND_MNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0424)
#define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x040A) //"Too many vnode directories") #define TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0425)
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x040B) //"Dnode is exiting" #define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0430)
#define TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x040C) //"Parse vnodes.json error") #define TSDB_CODE_DND_VNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0431)
#define TSDB_CODE_DND_PARSE_DNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x040D) //"Parse dnodes.json error") #define TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0432)
// vnode // vnode
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress") #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress")
......
...@@ -82,10 +82,10 @@ extern SGlobalCfg tsGlobalConfig[]; ...@@ -82,10 +82,10 @@ extern SGlobalCfg tsGlobalConfig[];
extern int32_t tsGlobalConfigNum; extern int32_t tsGlobalConfigNum;
extern char * tsCfgStatusStr[]; extern char * tsCfgStatusStr[];
void taosReadGlobalLogCfg(); void taosReadGlobalLogCfg();
bool taosReadGlobalCfg(); int32_t taosReadGlobalCfg();
void taosPrintGlobalCfg(); void taosPrintGlobalCfg();
void taosDumpGlobalCfg(); void taosDumpGlobalCfg();
void taosInitConfigOption(SGlobalCfg cfg); void taosInitConfigOption(SGlobalCfg cfg);
SGlobalCfg *taosGetConfigOption(const char *option); SGlobalCfg *taosGetConfigOption(const char *option);
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#ifndef _TD_UTIL_LOG_H #ifndef _TD_UTIL_LOG_H
#define _TD_UTIL_LOG_H #define _TD_UTIL_LOG_H
#include "os.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
......
...@@ -38,9 +38,9 @@ extern SNoteObj tsHttpNote; ...@@ -38,9 +38,9 @@ extern SNoteObj tsHttpNote;
extern SNoteObj tsTscNote; extern SNoteObj tsTscNote;
extern SNoteObj tsInfoNote; extern SNoteObj tsInfoNote;
void taosInitNotes(); int32_t taosInitNotes();
void taosNotePrint(SNoteObj* pNote, const char* const format, ...); void taosNotePrint(SNoteObj* pNote, const char* const format, ...);
void taosNotePrintBuffer(SNoteObj *pNote, char *buffer, int32_t len); void taosNotePrintBuffer(SNoteObj* pNote, char* buffer, int32_t len);
#define nPrintHttp(...) \ #define nPrintHttp(...) \
if (tsHttpEnableRecordSql) { \ if (tsHttpEnableRecordSql) { \
...@@ -53,7 +53,7 @@ void taosNotePrintBuffer(SNoteObj *pNote, char *buffer, int32_t len); ...@@ -53,7 +53,7 @@ void taosNotePrintBuffer(SNoteObj *pNote, char *buffer, int32_t len);
} }
#define nInfo(buffer, len) \ #define nInfo(buffer, len) \
if (tscEmbedded == 1) { \ if (tscEmbeddedInUtil == 1) { \
taosNotePrintBuffer(&tsInfoNote, buffer, len); \ taosNotePrintBuffer(&tsInfoNote, buffer, len); \
} }
......
...@@ -20,20 +20,21 @@ ...@@ -20,20 +20,21 @@
extern "C" { extern "C" {
#endif #endif
#include "os.h"
#include "tlog.h" #include "tlog.h"
extern int32_t uDebugFlag; extern int32_t uDebugFlag;
extern int8_t tscEmbedded; extern int8_t tscEmbeddedInUtil;
#define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} #define uFatal(...) { if (uDebugFlag & DEBUG_FATAL) { taosPrintLog("UTL FATAL", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} #define uError(...) { if (uDebugFlag & DEBUG_ERROR) { taosPrintLog("UTL ERROR ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uWarn(...) { if (uDebugFlag & DEBUG_WARN) { taosPrintLog("UTL WARN ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} #define uWarn(...) { if (uDebugFlag & DEBUG_WARN) { taosPrintLog("UTL WARN ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL ", tscEmbedded ? 255 : uDebugFlag, __VA_ARGS__); }} #define uInfo(...) { if (uDebugFlag & DEBUG_INFO) { taosPrintLog("UTL ", tscEmbeddedInUtil ? 255 : uDebugFlag, __VA_ARGS__); }}
#define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }} #define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }}
#define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }} #define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL ", uDebugFlag, __VA_ARGS__); }}
#define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); } #define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); }
#define pPrint(...) { taosPrintLog("APP ", 255, __VA_ARGS__); } #define pPrint(...) { taosPrintLog("APP ", 255, __VA_ARGS__); }
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -1679,7 +1679,7 @@ int32_t taosCheckGlobalCfg() { ...@@ -1679,7 +1679,7 @@ int32_t taosCheckGlobalCfg() {
taosCheckDataDirCfg(); taosCheckDataDirCfg();
if (!taosDirExist(tsTempDir)) { if (taosDirExist(tsTempDir) != 0) {
return -1; return -1;
} }
......
add_subdirectory(mnode) add_subdirectory(mnode)
add_subdirectory(vnode) add_subdirectory(vnode)
add_subdirectory(qnode) add_subdirectory(qnode)
add_subdirectory(mgmt) add_subdirectory(mgmt)
\ No newline at end of file
aux_source_directory(src DNODE_SRC) add_subdirectory(daemon)
add_executable(taosd ${DNODE_SRC}) add_subdirectory(impl)
target_link_libraries( \ No newline at end of file
taosd
PUBLIC cjson
PUBLIC mnode
PUBLIC vnode
PUBLIC wal
PUBLIC sync
PUBLIC taos
)
target_include_directories(
taosd
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode"
private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
aux_source_directory(src DAEMON_SRC)
add_executable(taosd ${DAEMON_SRC})
target_link_libraries(
taosd
PUBLIC dnode
PUBLIC util
PUBLIC os
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnode.h"
#include "os.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tnote.h"
#include "ulog.h"
static struct {
bool stop;
bool dumpConfig;
bool generateGrant;
bool printAuth;
bool printVersion;
char configDir[PATH_MAX];
} global = {0};
void dmnSigintHandle(int signum, void *info, void *ctx) { global.stop = true; }
void dmnSetSignalHandle() {
taosSetSignal(SIGTERM, dmnSigintHandle);
taosSetSignal(SIGHUP, dmnSigintHandle);
taosSetSignal(SIGINT, dmnSigintHandle);
taosSetSignal(SIGABRT, dmnSigintHandle);
taosSetSignal(SIGBREAK, dmnSigintHandle);
}
int dmnParseOption(int argc, char const *argv[]) {
tstrncpy(global.configDir, "/etc/taos", PATH_MAX);
for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) {
if (strlen(argv[++i]) >= PATH_MAX) {
printf("config file path overflow");
return -1;
}
tstrncpy(global.configDir, argv[i], PATH_MAX);
} else {
printf("'-c' requires a parameter, default is %s\n", configDir);
return -1;
}
} else if (strcmp(argv[i], "-C") == 0) {
global.dumpConfig = true;
} else if (strcmp(argv[i], "-k") == 0) {
global.generateGrant = true;
} else if (strcmp(argv[i], "-A") == 0) {
global.printAuth = true;
} else if (strcmp(argv[i], "-V") == 0) {
global.printVersion = true;
} else {
}
}
return 0;
}
void dmnGenerateGrant() {
#if 0
grantParseParameter();
#endif
}
void dmnPrintVersion() {
#ifdef TD_ENTERPRISE
char *releaseName = "enterprise";
#else
char *releaseName = "community";
#endif
printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
printf("gitinfoI: %s\n", gitinfoOfInternal);
printf("builuInfo: %s\n", buildinfo);
}
int dmnReadConfig(const char *path) {
taosInitGlobalCfg();
taosReadGlobalLogCfg();
if (taosMkDir(tsLogDir) != 0) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
return -1;
}
char temp[PATH_MAX];
snprintf(temp, PATH_MAX, "%s/taosdlog", tsLogDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) != 0) {
printf("failed to init log file\n");
return -1;
}
if (taosInitNotes() != 0) {
printf("failed to init log file\n");
return -1;
}
if (taosReadGlobalCfg() != 0) {
uError("failed to read global config");
return -1;
}
if (taosCheckGlobalCfg() != 0) {
uError("failed to check global config");
return -1;
}
taosSetCoreDump(tsEnableCoreFile);
return 0;
}
void dmnDumpConfig() { taosDumpGlobalCfg(); }
void dmnWaitSignal() {
dmnSetSignalHandle();
while (!global.stop) {
taosMsleep(100);
}
}
void dmnInitOption(SDnodeOpt *pOption) {
pOption->sver = tsVersion;
pOption->numOfCores = tsNumOfCores;
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore;
pOption->ratioOfQueryCores = tsRatioOfQueryCores;
pOption->maxShellConns = tsMaxShellConns;
pOption->shellActivityTimer = tsShellActivityTimer;
pOption->statusInterval = tsStatusInterval;
pOption->serverPort = tsServerPort;
tstrncpy(pOption->dataDir, tsDataDir, TSDB_EP_LEN);
tstrncpy(pOption->localEp, tsLocalEp, TSDB_EP_LEN);
tstrncpy(pOption->localFqdn, tsLocalEp, TSDB_FQDN_LEN);
tstrncpy(pOption->firstEp, tsFirst, TSDB_FQDN_LEN);
tstrncpy(pOption->timezone, tsLocalEp, TSDB_TIMEZONE_LEN);
tstrncpy(pOption->locale, tsLocalEp, TSDB_LOCALE_LEN);
tstrncpy(pOption->charset, tsLocalEp, TSDB_LOCALE_LEN);
}
int dmnRunDnode() {
SDnodeOpt option = {0};
dmnInitOption(&option);
SDnode *pDnode = dndInit(&option);
if (pDnode == NULL) {
uInfo("Failed to start TDengine, please check the log at %s", tsLogDir);
return -1;
}
uInfo("Started TDengine service successfully.");
dmnWaitSignal();
uInfo("TDengine is shut down!");
dndCleanup(pDnode);
taosCloseLog();
return 0;
}
int main(int argc, char const *argv[]) {
if (dmnParseOption(argc, argv) != 0) {
return -1;
}
if (global.generateGrant) {
dmnGenerateGrant();
return 0;
}
if (global.printVersion) {
dmnPrintVersion();
return 0;
}
if (dmnReadConfig(global.configDir) != 0) {
return -1;
}
if (global.dumpConfig) {
dmnDumpConfig();
return 0;
}
return dmnRunDnode();
}
aux_source_directory(src DNODE_SRC)
add_library(dnode STATIC ${DNODE_SRC})
target_link_libraries(
dnode
PUBLIC cjson
PUBLIC mnode
PUBLIC vnode
PUBLIC wal
PUBLIC sync
PUBLIC taos
)
target_include_directories(
dnode
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/mgmt"
private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
...@@ -13,27 +13,27 @@ ...@@ -13,27 +13,27 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_DNODE_DNODE_H_ #ifndef _TD_DND_DNODE_H_
#define _TD_DNODE_DNODE_H_ #define _TD_DND_DNODE_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h" #include "dndInt.h"
int32_t dnodeInitDnode(); int32_t dndInitDnode(SDnode *pDnd);
void dnodeCleanupDnode(); void dndCleanupDnode(SDnode *pDnd);
void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessDnodeReq(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessDnodeRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dnodeGetDnodeId(); int32_t dndGetDnodeId(SDnode *pDnd);
int64_t dnodeGetClusterId(); int64_t dndGetClusterId(SDnode *pDnd);
void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dnodeGetMnodeEpSetForPeer(SEpSet *epSet); void dndGetMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet);
void dnodeGetMnodeEpSetForShell(SEpSet *epSet); void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg, bool forShell);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_DNODE_DNODE_H_*/ #endif /*_TD_DND_DNODE_H_*/
\ No newline at end of file \ No newline at end of file
...@@ -13,42 +13,117 @@ ...@@ -13,42 +13,117 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_DNODE_INT_H_ #ifndef _TD_DND_INT_H_
#define _TD_DNODE_INT_H_ #define _TD_DND_INT_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "cJSON.h"
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tglobal.h" #include "thash.h"
#include "tlockfree.h"
#include "tlog.h" #include "tlog.h"
#include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "tthread.h"
#include "ttime.h"
#include "tworker.h"
#include "mnode.h"
#include "vnode.h"
#include "dnode.h"
extern int32_t dDebugFlag; extern int32_t dDebugFlag;
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }} #define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("SRV FATAL ", 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }} #define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("SRV ERROR ", 255, __VA_ARGS__); }}
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", 255, __VA_ARGS__); }} #define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("SRV WARN ", 255, __VA_ARGS__); }}
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", 255, __VA_ARGS__); }} #define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("SRV ", 255, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }}
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat;
typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
typedef struct {
char *dnode;
char *mnode;
char *vnodes;
} SDnodeDir;
typedef struct {
int32_t dnodeId;
uint32_t rebootTime;
int32_t dropped;
int64_t clusterId;
SEpSet shellEpSet;
SEpSet peerEpSet;
char *file;
SHashObj *dnodeHash;
SDnodeEps *dnodeEps;
pthread_t *threadId;
pthread_mutex_t mutex;
} SDnodeMgmt;
typedef struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SWorkerPool mgmtPool;
SWorkerPool readPool;
SWorkerPool writePool;
SWorkerPool syncPool;
taos_queue pReadQ;
taos_queue pWriteQ;
taos_queue pApplyQ;
taos_queue pSyncQ;
taos_queue pMgmtQ;
char *file;
SMnode *pMnode;
SRWLatch latch;
} SMnodeMgmt;
typedef struct {
SHashObj *hash;
SWorkerPool mgmtPool;
SWorkerPool queryPool;
SWorkerPool fetchPool;
SMWorkerPool syncPool;
SMWorkerPool writePool;
taos_queue pMgmtQ;
int32_t openVnodes;
int32_t totalVnodes;
SRWLatch latch;
} SVnodesMgmt;
typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat; typedef struct {
typedef void (*MsgFp)(SRpcMsg *pMsg, SEpSet *pEpSet); void *serverRpc;
void *clientRpc;
DndMsgFp msgFp[TSDB_MSG_TYPE_MAX];
} STransMgmt;
int32_t dnodeInit(); typedef struct SDnode {
void dnodeCleanup(); EStat stat;
SDnodeOpt opt;
SDnodeDir dir;
SDnodeMgmt d;
SMnodeMgmt m;
SVnodesMgmt vmgmt;
STransMgmt t;
SStartupMsg startup;
} SDnode;
EDnStat dnodeGetRunStat(); EStat dndGetStat(SDnode *pDnode);
void dnodeSetRunStat(); void dndSetStat(SDnode *pDnode, EStat stat);
char *dndStatStr(EStat stat);
void dnodeReportStartup(char *name, char *desc); void dndReportStartup(SDnode *pDnode, char *name, char *desc);
void dnodeReportStartupFinished(char *name, char *desc); void dndGetStartup(SDnode *pDnode, SStartupMsg *pStartup);
void dnodeGetStartup(SStartupMsg *);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_DNODE_INT_H_*/ #endif /*_TD_DND_INT_H_*/
\ No newline at end of file \ No newline at end of file
...@@ -13,26 +13,24 @@ ...@@ -13,26 +13,24 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_DNODE_VNODES_H_ #ifndef _TD_DND_MNODE_H_
#define _TD_DNODE_VNODES_H_ #define _TD_DND_MNODE_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h" #include "dndInt.h"
int32_t dnodeInitVnodes(); int32_t dndInitMnode(SDnode *pDnode);
void dnodeCleanupVnodes(); void dndCleanupMnode(SDnode *pDnode);
void dnodeGetVnodeLoads(SVnodeLoads *pVloads); int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey);
void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_DNODE_VNODES_H_*/ #endif /*_TD_DND_MNODE_H_*/
\ No newline at end of file \ No newline at end of file
...@@ -13,21 +13,21 @@ ...@@ -13,21 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_DNODE_TRANSPORT_H_ #ifndef _TD_DND_TRANSPORT_H_
#define _TD_DNODE_TRANSPORT_H_ #define _TD_DND_TRANSPORT_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h" #include "dndInt.h"
int32_t dnodeInitTrans(); int32_t dndInitTrans(SDnode *pDnode);
void dnodeCleanupTrans(); void dndCleanupTrans(SDnode *pDnode);
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg);
void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg); void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_DNODE_TRANSPORT_H_*/ #endif /*_TD_DND_TRANSPORT_H_*/
...@@ -13,25 +13,25 @@ ...@@ -13,25 +13,25 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_DNODE_MNODE_H_ #ifndef _TD_DND_VNODES_H_
#define _TD_DNODE_MNODE_H_ #define _TD_DND_VNODES_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dnodeInt.h" #include "dndInt.h"
int32_t dnodeInitMnode(); int32_t dndInitVnodes(SDnode *pDnode);
void dnodeCleanupMnode(); void dndCleanupVnodes(SDnode *pDnode);
int32_t dnodeGetUserAuthFromMnode(char *user, char *spi, char *encrypt, char *secret, char *ckey); void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pVloads);
void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessMnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessMnodeReadMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessMnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessMnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_DNODE_MNODE_H_*/ #endif /*_TD_DND_VNODES_H_*/
\ No newline at end of file \ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndDnode.h"
#include "dndMnode.h"
#include "dndTransport.h"
#include "dndVnodes.h"
#include "sync.h"
#include "tcache.h"
#include "wal.h"
#include "tcrc32c.h"
EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; }
void dndSetStat(SDnode *pDnode, EStat stat) {
dDebug("dnode stat set from %s to %s", dndStatStr(pDnode->stat), dndStatStr(stat));
pDnode->stat = stat;
}
char *dndStatStr(EStat stat) {
switch (stat) {
case DND_STAT_INIT:
return "init";
case DND_STAT_RUNNING:
return "running";
case DND_STAT_STOPPED:
return "stopped";
default:
return "unknown";
}
}
void dndReportStartup(SDnode *pDnode, char *name, char *desc) {
SStartupMsg *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, name, strlen(pStartup->name));
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
pStartup->finished = 0;
}
void dndGetStartup(SDnode *pDnode, SStartupMsg *pStartup) {
memcpy(pStartup, &pDnode->startup, sizeof(SStartupMsg));
pStartup->finished = (dndGetStat(pDnode) == DND_STAT_RUNNING);
}
static int32_t dndCheckRunning(char *dataDir) {
char filepath[PATH_MAX] = {0};
snprintf(filepath, sizeof(filepath), "%s/.running", dataDir);
FileFd fd = taosOpenFileCreateWriteTrunc(filepath);
if (fd < 0) {
dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
int32_t ret = taosLockFile(fd);
if (ret != 0) {
dError("failed to lock file:%s since %s, quit", filepath, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(fd);
return -1;
}
return 0;
}
static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) {
if (dndCheckRunning(pOptions->dataDir) != 0) {
return -1;
}
char path[PATH_MAX + 100];
snprintf(path, sizeof(path), "%s%smnode", pOptions->dataDir, TD_DIRSEP);
pDnode->dir.mnode = strdup(path);
snprintf(path, sizeof(path), "%s%svnode", pOptions->dataDir, TD_DIRSEP);
pDnode->dir.vnodes = strdup(path);
snprintf(path, sizeof(path), "%s%sdnode", pOptions->dataDir, TD_DIRSEP);
pDnode->dir.dnode = strdup(path);
if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL) {
dError("failed to malloc dir object");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (taosMkDir(pDnode->dir.dnode) != 0) {
dError("failed to create dir:%s since %s", pDnode->dir.dnode, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosMkDir(pDnode->dir.mnode) != 0) {
dError("failed to create dir:%s since %s", pDnode->dir.mnode, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosMkDir(pDnode->dir.vnodes) != 0) {
dError("failed to create dir:%s since %s", pDnode->dir.vnodes, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0;
}
static void dndCleanupEnv(SDnode *pDnode) {
if (pDnode->dir.mnode != NULL) {
tfree(pDnode->dir.mnode);
}
if (pDnode->dir.vnodes != NULL) {
tfree(pDnode->dir.vnodes);
}
if (pDnode->dir.dnode != NULL) {
tfree(pDnode->dir.dnode);
}
taosStopCacheRefreshWorker();
}
SDnode *dndInit(SDnodeOpt *pOptions) {
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
SDnode *pDnode = calloc(1, sizeof(pDnode));
if (pDnode == NULL) {
dError("failed to create dnode object");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
dInfo("start to initialize TDengine");
dndSetStat(pDnode, DND_STAT_INIT);
if (dndInitEnv(pDnode, pOptions) != 0) {
dError("failed to init env");
dndCleanup(pDnode);
return NULL;
}
if (rpcInit() != 0) {
dError("failed to init rpc env");
dndCleanup(pDnode);
return NULL;
}
if (walInit() != 0) {
dError("failed to init wal env");
dndCleanup(pDnode);
return NULL;
}
if (dndInitDnode(pDnode) != 0) {
dError("failed to init dnode");
dndCleanup(pDnode);
return NULL;
}
if (dndInitVnodes(pDnode) != 0) {
dError("failed to init vnodes");
dndCleanup(pDnode);
return NULL;
}
if (dndInitMnode(pDnode) != 0) {
dError("failed to init mnode");
dndCleanup(pDnode);
return NULL;
}
if (dndInitTrans(pDnode) != 0) {
dError("failed to init transport");
dndCleanup(pDnode);
return NULL;
}
dndSetStat(pDnode, DND_STAT_RUNNING);
dndReportStartup(pDnode, "TDengine", "initialized successfully");
dInfo("TDengine is initialized successfully");
return 0;
}
void dndCleanup(SDnode *pDnode) {
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
dError("dnode is shutting down");
return;
}
dInfo("start to cleanup TDengine");
dndSetStat(pDnode, DND_STAT_STOPPED);
dndCleanupTrans(pDnode);
dndCleanupMnode(pDnode);
dndCleanupVnodes(pDnode);
dndCleanupDnode(pDnode);
walCleanUp();
rpcCleanup();
dInfo("TDengine is cleaned up successfully");
dndCleanupEnv(pDnode);
free(pDnode);
}
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/* this file is mainly responsible for the communication between DNODEs. Each
* dnode works as both server and client. Dnode may send status, grant, config
* messages to mnode, mnode may send create/alter/drop table/vnode messages
* to dnode. All theses messages are handled from here
*/
#define _DEFAULT_SOURCE
#include "dndTransport.h"
#include "dndDnode.h"
#include "dndMnode.h"
#include "dndVnodes.h"
static void dndInitMsgFp(STransMgmt *pMgmt) {
// msg from client to dnode
pMgmt->msgFp[TSDB_MSG_TYPE_SUBMIT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_QUERY] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_FETCH] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_TABLE_META] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_TABLES_META] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_ACK] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_RESET] = dndProcessVnodeWriteMsg;
// msg from client to mnode
pMgmt->msgFp[TSDB_MSG_TYPE_CONNECT] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_ACCT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_USER] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_USER] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_USER] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_USE_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SYNC_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SHOW] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dndProcessMnodeWriteMsg;
// message from client to dnode
pMgmt->msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dndProcessDnodeReq;
// message from mnode to vnode
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = dndProcessMnodeWriteMsg;
// message from mnode to dnode
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_MNODE_IN] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_MNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = dndProcessDnodeReq;
pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = dndProcessMnodeWriteMsg;
// message from dnode to mnode
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dndProcessDnodeRsp;
pMgmt->msgFp[TSDB_MSG_TYPE_GRANT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_GRANT_RSP] = dndProcessDnodeRsp;
pMgmt->msgFp[TSDB_MSG_TYPE_STATUS] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dndProcessDnodeRsp;
}
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->t;
int32_t msgType = pMsg->msgType;
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
rpcFreeCont(pMsg->pCont);
return;
}
DndMsgFp fp = pMgmt->msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, rsp:%s will be processed, code:%s", pMsg->handle, taosMsg[msgType], tstrerror(pMsg->code));
(*fp)(pDnode, pMsg, pEpSet);
} else {
dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
rpcFreeCont(pMsg->pCont);
}
}
static int32_t dndInitClient(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1;
rpcInit.cfp = dndProcessResponse;
rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000;
rpcInit.user = "-internal";
rpcInit.ckey = "-key";
rpcInit.secret = "-secret";
pMgmt->clientRpc = rpcOpen(&rpcInit);
if (pMgmt->clientRpc == NULL) {
dError("failed to init rpc client");
return -1;
}
return 0;
}
static void dndCleanupClient(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
if (pMgmt->clientRpc) {
rpcClose(pMgmt->clientRpc);
pMgmt->clientRpc = NULL;
dInfo("dnode peer rpc client is closed");
}
}
static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->t;
int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
dndProcessDnodeReq(pDnode, pMsg, pEpSet);
return;
}
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
dError("RPC %p, req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_EXITING};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
return;
} else if (dndGetStat(pDnode) != DND_STAT_RUNNING) {
dError("RPC %p, req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
return;
}
if (pMsg->pCont == NULL) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN};
rpcSendResponse(&rspMsg);
return;
}
DndMsgFp fp = pMgmt->msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, req:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pDnode, pMsg, pEpSet);
} else {
dError("RPC %p, req:%s is not processed", pMsg->handle, taosMsg[msgType]);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
}
static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) {
STransMgmt *pMgmt = &pDnode->t;
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}
static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SDnode *pDnode = parent;
if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) != 0) {
if (terrno != TSDB_CODE_APP_NOT_READY) {
dTrace("failed to get user auth from mnode since %s", terrstr());
return -1;
}
}
dDebug("user:%s, send auth msg to mnodes", user);
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, TSDB_USER_LEN);
SRpcMsg rpcMsg = {.pCont = pMsg, .contLen = sizeof(SAuthMsg), .msgType = TSDB_MSG_TYPE_AUTH};
SRpcMsg rpcRsp = {0};
dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
terrno = rpcRsp.code;
dError("user:%s, failed to get user auth from mnodes since %s", user, terrstr());
} else {
SAuthRsp *pRsp = rpcRsp.pCont;
memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
*spi = pRsp->spi;
*encrypt = pRsp->encrypt;
dDebug("user:%s, success to get user auth from mnodes", user);
}
rpcFreeCont(rpcRsp.pCont);
return rpcRsp.code;
}
static int32_t dndInitServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
dndInitMsgFp(pMgmt);
int32_t numOfThreads = (int32_t)((pDnode->opt.numOfCores * pDnode->opt.numOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) {
numOfThreads = 1;
}
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = pDnode->opt.serverPort;
rpcInit.label = "DND-S";
rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dndProcessRequest;
rpcInit.sessions = pDnode->opt.maxShellConns;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000;
rpcInit.afp = dndRetrieveUserAuthInfo;
pMgmt->serverRpc = rpcOpen(&rpcInit);
if (pMgmt->serverRpc == NULL) {
dError("failed to init rpc server");
return -1;
}
return 0;
}
static void dndCleanupServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
if (pMgmt->serverRpc) {
rpcClose(pMgmt->serverRpc);
pMgmt->serverRpc = NULL;
}
}
int32_t dndInitTrans(SDnode *pDnode) {
if (dndInitClient(pDnode) != 0) {
return -1;
}
if (dndInitServer(pDnode) != 0) {
return -1;
}
dInfo("dnode-transport is initialized");
return 0;
}
void dndCleanupTrans(SDnode *pDnode) {
dndCleanupServer(pDnode);
dndCleanupClient(pDnode);
dInfo("dnode-transport is cleaned up");
}
void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
STransMgmt *pMgmt = &pDnode->t;
rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL);
}
void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pMsg) {
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
dndSendMsgToDnode(pDnode, &epSet, pMsg);
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeInt.h"
static bool stop = false;
static void sigintHandler(int32_t signum, void *info, void *ctx) { stop = true; }
static void setSignalHandler() {
taosSetSignal(SIGTERM, sigintHandler);
taosSetSignal(SIGHUP, sigintHandler);
taosSetSignal(SIGINT, sigintHandler);
taosSetSignal(SIGABRT, sigintHandler);
taosSetSignal(SIGBREAK, sigintHandler);
}
int main(int argc, char const *argv[]) {
setSignalHandler();
int32_t code = dnodeInit();
if (code != 0) {
dInfo("Failed to start TDengine, please check the log at:%s", tsLogDir);
exit(EXIT_FAILURE);
}
while (!stop) {
taosMsleep(100);
}
dInfo("TDengine is shut down!");
dnodeCleanup();
return 0;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeDnode.h"
#include "dnodeMnode.h"
#include "dnodeTransport.h"
#include "dnodeVnodes.h"
#include "sync.h"
#include "tcache.h"
#include "tconfig.h"
#include "tnote.h"
#include "tstep.h"
#include "wal.h"
static struct {
SStartupMsg startup;
EDnStat runStat;
SSteps *steps;
} tsInt;
EDnStat dnodeGetRunStat() { return tsInt.runStat; }
void dnodeSetRunStat(EDnStat stat) { tsInt.runStat = stat; }
void dnodeReportStartup(char *name, char *desc) {
SStartupMsg *pStartup = &tsInt.startup;
tstrncpy(pStartup->name, name, strlen(pStartup->name));
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
pStartup->finished = 0;
}
void dnodeReportStartupFinished(char *name, char *desc) {
SStartupMsg *pStartup = &tsInt.startup;
tstrncpy(pStartup->name, name, strlen(pStartup->name));
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
pStartup->finished = 1;
}
void dnodeGetStartup(SStartupMsg *pStartup) { memcpy(pStartup, &tsInt.startup, sizeof(SStartupMsg)); }
static int32_t dnodeCheckRunning(char *dir) {
char filepath[256] = {0};
snprintf(filepath, sizeof(filepath), "%s/.running", dir);
FileFd fd = taosOpenFileCreateWriteTrunc(filepath);
if (fd < 0) {
dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno));
return -1;
}
int32_t ret = taosLockFile(fd);
if (ret != 0) {
dError("failed to lock file:%s since %s, quit", filepath, strerror(errno));
taosCloseFile(fd);
return -1;
}
return 0;
}
static int32_t dnodeInitDir() {
sprintf(tsMnodeDir, "%s/mnode", tsDataDir);
sprintf(tsVnodeDir, "%s/vnode", tsDataDir);
sprintf(tsDnodeDir, "%s/dnode", tsDataDir);
if (!taosMkDir(tsDnodeDir)) {
dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno));
return -1;
}
if (!taosMkDir(tsMnodeDir)) {
dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno));
return -1;
}
if (!taosMkDir(tsVnodeDir)) {
dError("failed to create dir:%s since %s", tsVnodeDir, strerror(errno));
return -1;
}
if (dnodeCheckRunning(tsDnodeDir) != 0) {
return -1;
}
return 0;
}
static int32_t dnodeInitMain() {
tsInt.runStat = DN_RUN_STAT_STOPPED;
tscEmbedded = 1;
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
taosInitGlobalCfg();
taosReadGlobalLogCfg();
taosSetCoreDump(tsEnableCoreFile);
if (!taosMkDir(tsLogDir)) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
return -1;
}
char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/taosdlog", tsLogDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
printf("failed to init log file\n");
}
if (!taosReadGlobalCfg()) {
taosPrintGlobalCfg();
dError("TDengine read global config failed");
return -1;
}
dInfo("start to initialize TDengine");
taosInitNotes();
if (taosCheckGlobalCfg() != 0) {
return -1;
}
dnodeInitDir();
return 0;
}
static void dnodeCleanupMain() {
taos_cleanup();
taosCloseLog();
taosStopCacheRefreshWorker();
}
int32_t dnodeInit() {
SSteps *steps = taosStepInit(10, dnodeReportStartup);
if (steps == NULL) return -1;
taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain);
taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup);
taosStepAdd(steps, "dnode-tfs", NULL, NULL);
taosStepAdd(steps, "dnode-wal", walInit, walCleanUp);
//taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp);
taosStepAdd(steps, "dnode-dnode", dnodeInitDnode, dnodeCleanupDnode);
taosStepAdd(steps, "dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes);
taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, dnodeCleanupMnode);
taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans);
tsInt.steps = steps;
taosStepExec(tsInt.steps);
dnodeSetRunStat(DN_RUN_STAT_RUNNING);
dnodeReportStartupFinished("TDengine", "initialized successfully");
dInfo("TDengine is initialized successfully");
return 0;
}
void dnodeCleanup() {
if (dnodeGetRunStat() != DN_RUN_STAT_STOPPED) {
dnodeSetRunStat(DN_RUN_STAT_STOPPED);
taosStepCleanup(tsInt.steps);
tsInt.steps = NULL;
}
}
此差异已折叠。
此差异已折叠。
add_subdirectory(impl) add_subdirectory(impl)
add_subdirectory(sdb) add_subdirectory(sdb)
add_subdirectory(transaction)
...@@ -8,7 +8,6 @@ target_include_directories( ...@@ -8,7 +8,6 @@ target_include_directories(
target_link_libraries( target_link_libraries(
mnode mnode
PRIVATE sdb PRIVATE sdb
PRIVATE transaction
PUBLIC transport PUBLIC transport
PUBLIC cjson PUBLIC cjson
) )
\ No newline at end of file
此差异已折叠。
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
int32_t mnodeInitAuth() { return 0; } int32_t mnodeInitAuth() { return 0; }
void mnodeCleanupAuth() {} void mnodeCleanupAuth() {}
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t mnodeRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, TSDB_NETTEST_USER) == 0) { if (strcmp(user, TSDB_NETTEST_USER) == 0) {
char pass[32] = {0}; char pass[32] = {0};
taosEncryptPass((uint8_t *)user, strlen(user), pass); taosEncryptPass((uint8_t *)user, strlen(user), pass);
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "mnodeInt.h" #include "mnodeInt.h"
#include "mnodeTrans.h"
int32_t mnodeInitSync() { return 0; } int32_t mnodeInitSync() { return 0; }
void mnodeCleanUpSync() {} void mnodeCleanUpSync() {}
......
...@@ -174,7 +174,7 @@ static void mnodeAddVersionInfo(SBufferWriter* bw) { ...@@ -174,7 +174,7 @@ static void mnodeAddVersionInfo(SBufferWriter* bw) {
static void mnodeAddRuntimeInfo(SBufferWriter* bw) { static void mnodeAddRuntimeInfo(SBufferWriter* bw) {
SMnodeLoad load = {0}; SMnodeLoad load = {0};
if (mnodeGetLoad(&load) != 0) { if (mnodeGetLoad(NULL, &load) != 0) {
return; return;
} }
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册