提交 fa0d64f1 编写于 作者: S Shengliang Guan

add daemon lib

上级 2f1bb281
......@@ -672,16 +672,16 @@ typedef struct {
} SDnodeCfg;
typedef struct {
int32_t dnodeId;
int32_t id;
int8_t isMnode;
int8_t reserved;
uint16_t dnodePort;
char dnodeFqdn[TSDB_FQDN_LEN];
uint16_t port;
char fqdn[TSDB_FQDN_LEN];
} SDnodeEp;
typedef struct {
int32_t dnodeNum;
SDnodeEp dnodeEps[];
int32_t num;
SDnodeEp eps[];
} SDnodeEps;
typedef struct {
......@@ -820,9 +820,9 @@ typedef struct {
} SCreateDnodeMsg, SDropDnodeMsg;
typedef struct {
int32_t dnodeId;
int8_t replica;
int8_t reserved[3];
int32_t dnodeId;
int8_t replica;
int8_t reserved[3];
SReplica replicas[TSDB_MAX_REPLICA];
} SCreateMnodeMsg, SAlterMnodeMsg, SDropMnodeMsg;
......
......@@ -16,6 +16,8 @@
#ifndef _TD_DNODE_H_
#define _TD_DNODE_H_
#include "tdef.h"
#ifdef __cplusplus
extern "C" {
#endif
......@@ -23,6 +25,24 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode;
typedef struct {
int32_t sver;
int32_t numOfCores;
float numOfThreadsPerCore;
float ratioOfQueryCores;
int32_t maxShellConns;
int32_t shellActivityTimer;
int32_t statusInterval;
uint16_t serverPort;
char dataDir[PATH_MAX];
char localEp[TSDB_EP_LEN];
char localFqdn[TSDB_FQDN_LEN];
char firstEp[TSDB_EP_LEN];
char timezone[TSDB_TIMEZONE_LEN];
char locale[TSDB_LOCALE_LEN];
char charset[TSDB_LOCALE_LEN];
} SDnodeOpt;
/* ------------------------ SDnode ------------------------ */
/**
* @brief Initialize and start the dnode.
......@@ -30,14 +50,14 @@ typedef struct SDnode SDnode;
* @param cfgPath Config file path.
* @return SDnode* The dnode object.
*/
SDnode *dnodeInit(const char *cfgPath);
SDnode *dndInit(SDnodeOpt *pOptions);
/**
* @brief Stop and cleanup dnode.
*
* @param pDnode The dnode object to close.
* @param pDnd The dnode object to close.
*/
void dnodeCleanup(SDnode *pDnode);
void dndCleanup(SDnode *pDnd);
#ifdef __cplusplus
}
......
......@@ -24,10 +24,10 @@ extern "C" {
typedef struct SDnode SDnode;
typedef struct SMnode SMnode;
typedef struct SMnodeMsg SMnodeMsg;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg, bool forShell);
typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnode, SMnodeMsg *pMsg);
typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell);
typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnd, SMnodeMsg *pMsg);
typedef struct SMnodeLoad {
int64_t numOfDnode;
......@@ -48,7 +48,7 @@ typedef struct {
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
struct SServer *pServer;
struct SDnode *pDnode;
PutMsgToMnodeQFp putMsgToApplyMsgFp;
SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp;
......@@ -122,10 +122,17 @@ SMnodeMsg *mnodeInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg);
/**
* @brief Cleanup mnode msg
*
* @param pMnode The mnode object
* @param pMsg The request msg
*/
void mnodeCleanupMsg(SMnode *pMnode, SMnodeMsg *pMsg);
void mnodeCleanupMsg(SMnodeMsg *pMsg);
/**
* @brief Cleanup mnode msg
*
* @param pMsg The request msg
* @param code The error code
*/
void mnodeSendRsp(SMnodeMsg *pMsg, int32_t code);
/**
* @brief Process the read request
......
......@@ -185,10 +185,10 @@ typedef struct {
} SVnodeMsg;
typedef struct SDnode SDnode;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg, bool forShell);
typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg);
typedef void (*SendMsgToDnodeFp)(SDnode *pDnd, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnd, struct SRpcMsg *rpcMsg, bool forShell);
typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnd, int32_t vgId, SVnodeMsg *pMsg);
typedef struct {
PutMsgToVnodeQFp putMsgToApplyQueueFp;
......
......@@ -51,7 +51,7 @@ typedef struct SRpcMsg {
} SRpcMsg;
typedef struct SRpcInit {
uint16_t localPort; // local port
uint16_t localPort; // local port
char *label; // for debug purpose
int numOfThreads; // number of threads to handle connections
int sessions; // number of sessions allowed
......@@ -66,10 +66,12 @@ typedef struct SRpcInit {
char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app
void (*cfp)(SRpcMsg *, SEpSet *);
void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
// call back to retrieve the client auth info, for server app only
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
// call back to retrieve the client auth info, for server app only
int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
void *parent;
} SRpcInit;
int32_t rpcInit();
......
......@@ -68,12 +68,13 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x0106)
#define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0107)
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0108)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0109)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x010A)
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x010B)
#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x010C)
#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x010D)
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010E)
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0109)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111)
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112)
#define TSDB_CODE_REF_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x0113)
#define TSDB_CODE_REF_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0114)
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0115)
//client
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation")
......@@ -223,20 +224,20 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0395) //"Topic already exists)
// dnode
#define TSDB_CODE_DND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0400) //"Message not processed")
#define TSDB_CODE_DND_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0401) //"Dnode out of memory")
#define TSDB_CODE_DND_MNODE_ID_NOT_MATCH_DNODE TAOS_DEF_ERROR_CODE(0, 0x0402) //"Mnode Id not match Dnode")
#define TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0403) //"Mnode already deployed")
#define TSDB_CODE_DND_MNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0404) //"Mnode not deployed")
#define TSDB_CODE_DND_READ_MNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0405) //"Read mnode.json error")
#define TSDB_CODE_DND_WRITE_MNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0406) //"Write mnode.json error")
#define TSDB_CODE_DND_NO_WRITE_ACCESS TAOS_DEF_ERROR_CODE(0, 0x0407) //"No permission for disk files in dnode")
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0408) //"Invalid message length")
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0409) //"Action in progress")
#define TSDB_CODE_DND_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x040A) //"Too many vnode directories")
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x040B) //"Dnode is exiting"
#define TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x040C) //"Parse vnodes.json error")
#define TSDB_CODE_DND_PARSE_DNODE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x040D) //"Parse dnodes.json error")
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)
#define TSDB_CODE_DND_EXITING TAOS_DEF_ERROR_CODE(0, 0x0401)
#define TSDB_CODE_DND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0402)
#define TSDB_CODE_DND_DNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0410)
#define TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0411)
#define TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0420)
#define TSDB_CODE_DND_MNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0421)
#define TSDB_CODE_DND_MNODE_ID_INVALID TAOS_DEF_ERROR_CODE(0, 0x0422)
#define TSDB_CODE_DND_MNODE_ID_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0423)
#define TSDB_CODE_DND_MNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0424)
#define TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0425)
#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0430)
#define TSDB_CODE_DND_VNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0431)
#define TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0432)
// vnode
#define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress")
......
......@@ -16,38 +16,173 @@
#define _DEFAULT_SOURCE
#include "dnode.h"
#include "os.h"
#include "tglobal.h"
#include "ulog.h"
static bool stop = false;
static struct {
bool stop;
bool dumpConfig;
bool generateGrant;
bool printAuth;
bool printVersion;
char configDir[PATH_MAX];
} global = {0};
static void sigintHandler(int32_t signum, void *info, void *ctx) { stop = true; }
void dmnSigintHandle(int signum, void *info, void *ctx) { global.stop = true; }
static void setSignalHandler() {
taosSetSignal(SIGTERM, sigintHandler);
taosSetSignal(SIGHUP, sigintHandler);
taosSetSignal(SIGINT, sigintHandler);
taosSetSignal(SIGABRT, sigintHandler);
taosSetSignal(SIGBREAK, sigintHandler);
void dmnSetSignalHandle() {
taosSetSignal(SIGTERM, dmnSigintHandle);
taosSetSignal(SIGHUP, dmnSigintHandle);
taosSetSignal(SIGINT, dmnSigintHandle);
taosSetSignal(SIGABRT, dmnSigintHandle);
taosSetSignal(SIGBREAK, dmnSigintHandle);
}
int main(int argc, char const *argv[]) {
const char *path = "/etc/taos";
int dmnParseOpts(int argc, char const *argv[]) {
tstrncpy(global.configDir, "/etc/taos", PATH_MAX);
SDnode *pDnode = dnodeInit(path);
if (pDnode == NULL) {
uInfo("Failed to start TDengine, please check the log at %s", tsLogDir);
exit(EXIT_FAILURE);
for (int i = 1; i < argc; ++i) {
if (strcmp(argv[i], "-c") == 0) {
if (i < argc - 1) {
if (strlen(argv[++i]) >= PATH_MAX) {
printf("config file path overflow");
return -1;
}
tstrncpy(global.configDir, argv[i], PATH_MAX);
} else {
printf("'-c' requires a parameter, default:%s\n", configDir);
return -1;
}
} else if (strcmp(argv[i], "-C") == 0) {
global.dumpConfig = true;
} else if (strcmp(argv[i], "-k") == 0) {
global.generateGrant = true;
} else if (strcmp(argv[i], "-A") == 0) {
global.printAuth = true;
} else if (strcmp(argv[i], "-V") == 0) {
global.printVersion = true;
} else {
}
}
uInfo("Started TDengine service successfully.");
return 0;
}
void dmnGenerateGrant() { grantParseParameter(); }
void dmnPrintVersion() {
#ifdef TD_ENTERPRISE
char *versionStr = "enterprise";
#else
char *versionStr = "community";
#endif
printf("%s version: %s compatible_version: %s\n", versionStr, version, compatible_version);
printf("gitinfo: %s\n", gitinfo);
printf("gitinfoI: %s\n", gitinfoOfInternal);
printf("builuInfo: %s\n", buildinfo);
}
int dmnReadConfig(const char *path) {
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
taosInitGlobalCfg();
taosReadGlobalLogCfg();
if (taosMkDir(tsLogDir) != 0) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
return -1;
}
char temp[PATH_MAX];
snprintf(temp, PATH_MAX, "%s/taosdlog", tsLogDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) != 0) {
printf("failed to init log file\n");
return -1;
}
if (taosInitNotes() != 0) {
printf("failed to init log file\n");
return -1;
}
if (taosReadGlobalCfg() != 0) {
uError("failed to read global config");
return -1;
}
if (taosCheckGlobalCfg() != 0) {
uError("failed to check global config");
return -1;
}
taosSetCoreDump(tsEnableCoreFile);
return 0;
}
setSignalHandler();
while (!stop) {
void dmnDumpConfig() { taosDumpGlobalCfg(); }
void dmnWaitSignal() {
dmnSetSignalHandle();
while (!global.stop) {
taosMsleep(100);
}
}
void dmnInitOption(SDnodeOpt *pOpt) {
pOpt->sver = tsVersion;
pOpt->numOfCores = tsNumOfCores;
pOpt->statusInterval = tsStatusInterval;
pOpt->serverPort = tsServerPort;
tstrncpy(pOpt->localEp, tsLocalEp, TSDB_EP_LEN);
tstrncpy(pOpt->localFqdn, tsLocalEp, TSDB_FQDN_LEN);
tstrncpy(pOpt->timezone, tsLocalEp, TSDB_TIMEZONE_LEN);
tstrncpy(pOpt->locale, tsLocalEp, TSDB_LOCALE_LEN);
tstrncpy(pOpt->charset, tsLocalEp, TSDB_LOCALE_LEN);
}
int dmnRunDnode() {
SDnodeOpt opt = {0};
dmnInitOption(&opt);
SDnode *pDnd = dndInit(&opt);
if (pDnd == NULL) {
uInfo("Failed to start TDengine, please check the log at %s", tsLogDir);
return -1;
}
uInfo("Started TDengine service successfully.");
dmnWaitSignal();
uInfo("TDengine is shut down!");
dnodeCleanup(pDnode);
dndCleanup(pDnd);
taosCloseLog();
return 0;
}
int main(int argc, char const *argv[]) {
if (dmnParseOpts(argc, argv) != 0) {
return -1;
}
if (global.generateGrant) {
dmnGenerateGrant();
return 0;
}
if (global.printVersion) {
dmnPrintVersion();
return 0;
}
if (dmnReadConfig(global.configDir) != 0) {
return -1;
}
if (global.dumpConfig) {
dmnDumpConfig();
return 0;
}
return dmnRunDnode();
}
......@@ -13,26 +13,27 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_VNODES_H_
#define _TD_DNODE_VNODES_H_
#ifndef _TD_DND_DNODE_H_
#define _TD_DND_DNODE_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
#include "dndInt.h"
int32_t dnodeInitVnodes();
void dnodeCleanupVnodes();
void dnodeGetVnodeLoads(SVnodeLoads *pVloads);
int32_t dndInitDnode(SDnode *pDnd);
void dndCleanupDnode(SDnode *pDnd);
void dndProcessDnodeReq(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessDnodeRsp(SDnode *pDnd, SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeQueryMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndGetDnodeId(SDnode *pDnd);
int64_t dndGetClusterId(SDnode *pDnd);
void dndGetDnodeEp(SDnode *pDnd, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dndGetMnodeEpSet(SDnode *pDnd, SEpSet *pEpSet);
void dndSendRedirectMsg(SDnode *pDnd, SRpcMsg *pMsg, bool forShell);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_VNODES_H_*/
\ No newline at end of file
#endif /*_TD_DND_DNODE_H_*/
\ No newline at end of file
......@@ -13,20 +13,27 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_INT_H_
#define _TD_DNODE_INT_H_
#ifndef _TD_DND_INT_H_
#define _TD_DND_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "cJSON.h"
#include "os.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "thash.h"
#include "tlockfree.h"
#include "tlog.h"
#include "tqueue.h"
#include "trpc.h"
#include "tthread.h"
#include "ttime.h"
#include "tworker.h"
#include "mnode.h"
#include "vnode.h"
#include "dnode.h"
extern int32_t dDebugFlag;
......@@ -37,8 +44,8 @@ extern int32_t dDebugFlag;
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }}
typedef enum { DN_STAT_INIT, DN_STAT_RUNNING, DN_STAT_STOPPED } EStat;
typedef void (*MsgFp)(SRpcMsg *pMsg, SEpSet *pEpSet);
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat;
typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
typedef struct {
char *dnode;
......@@ -48,50 +55,75 @@ typedef struct {
typedef struct {
int32_t dnodeId;
uint32_t rebootTime;
int32_t dropped;
int64_t clusterId;
SDnodeEps *dnodeEps;
SHashObj *dnodeHash;
SEpSet mnodeEpSetForShell;
SEpSet mnodeEpSetForPeer;
SEpSet shellEpSet;
SEpSet peerEpSet;
char *file;
uint32_t rebootTime;
int8_t dropped;
int8_t threadStop;
SHashObj *dnodeHash;
SDnodeEps *dnodeEps;
pthread_t *threadId;
pthread_mutex_t mutex;
} SDnodeDnode;
} SDnodeMgmt;
typedef struct {
} SDnodeMnode;
int32_t refCount;
int8_t deployed;
int8_t dropped;
SWorkerPool mgmtPool;
SWorkerPool readPool;
SWorkerPool writePool;
SWorkerPool syncPool;
taos_queue pReadQ;
taos_queue pWriteQ;
taos_queue pApplyQ;
taos_queue pSyncQ;
taos_queue pMgmtQ;
char *file;
SMnode *pMnode;
SRWLatch latch;
} SMnodeMgmt;
typedef struct {
} SDnodeVnodes;
SHashObj *hash;
SWorkerPool mgmtPool;
SWorkerPool queryPool;
SWorkerPool fetchPool;
SMWorkerPool syncPool;
SMWorkerPool writePool;
taos_queue pMgmtQ;
int32_t openVnodes;
int32_t totalVnodes;
SRWLatch latch;
} SVnodesMgmt;
typedef struct {
void *peerRpc;
void *shellRpc;
void *clientRpc;
} SDnodeTrans;
void *serverRpc;
void *clientRpc;
DndMsgFp msgFp[TSDB_MSG_TYPE_MAX];
} STransMgmt;
typedef struct SDnode {
EStat stat;
SDnodeDir dir;
SDnodeDnode dnode;
SDnodeVnodes vnodes;
SDnodeMnode mnode;
SDnodeTrans trans;
SStartupMsg startup;
EStat stat;
SDnodeOpt opt;
SDnodeDir dir;
SDnodeMgmt d;
SMnodeMgmt m;
SVnodesMgmt vmgmt;
STransMgmt t;
SStartupMsg startup;
} SDnode;
EStat dnodeGetStat(SDnode *pDnode);
void dnodeSetStat(SDnode *pDnode, EStat stat);
char *dnodeStatStr(EStat stat);
EStat dndGetStat(SDnode *pDnode);
void dndSetStat(SDnode *pDnode, EStat stat);
char *dndStatStr(EStat stat);
void dnodeReportStartup(SDnode *pDnode, char *name, char *desc);
void dnodeGetStartup(SDnode *pDnode, SStartupMsg *pStartup);
void dndReportStartup(SDnode *pDnode, char *name, char *desc);
void dndGetStartup(SDnode *pDnode, SStartupMsg *pStartup);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_INT_H_*/
\ No newline at end of file
#endif /*_TD_DND_INT_H_*/
\ No newline at end of file
......@@ -13,27 +13,24 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_DNODE_H_
#define _TD_DNODE_DNODE_H_
#ifndef _TD_DND_MNODE_H_
#define _TD_DND_MNODE_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
#include "dndInt.h"
int32_t dnodeInitDnode(SDnode *pDnode);
void dnodeCleanupDnode(SDnode *pDnode);
void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dnodeGetDnodeId();
int64_t dnodeGetClusterId();
void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
void dnodeGetMnodeEpSetForPeer(SEpSet *epSet);
void dnodeGetMnodeEpSetForShell(SEpSet *epSet);
void dnodeSendRedirectMsg(SDnode *pDnode, SRpcMsg *rpcMsg, bool forShell);
int32_t dndInitMnode(SDnode *pDnode);
void dndCleanupMnode(SDnode *pDnode);
int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey);
void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_DNODE_H_*/
\ No newline at end of file
#endif /*_TD_DND_MNODE_H_*/
\ No newline at end of file
......@@ -13,21 +13,21 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_TRANSPORT_H_
#define _TD_DNODE_TRANSPORT_H_
#ifndef _TD_DND_TRANSPORT_H_
#define _TD_DND_TRANSPORT_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
#include "dndInt.h"
int32_t dnodeInitTrans();
void dnodeCleanupTrans();
void dnodeSendMsgToMnode(SDnode *pDnode, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnode(SDnode *pDnode, SEpSet *epSet, SRpcMsg *rpcMsg);
int32_t dndInitTrans(SDnode *pDnode);
void dndCleanupTrans(SDnode *pDnode);
void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg);
void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_TRANSPORT_H_*/
#endif /*_TD_DND_TRANSPORT_H_*/
......@@ -13,25 +13,25 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_MNODE_H_
#define _TD_DNODE_MNODE_H_
#ifndef _TD_DND_VNODES_H_
#define _TD_DND_VNODES_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
#include "dndInt.h"
int32_t dnodeInitMnode();
void dnodeCleanupMnode();
int32_t dnodeGetUserAuthFromMnode(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void dnodeProcessMnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessMnodeReadMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessMnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeProcessMnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndInitVnodes(SDnode *pDnode);
void dndCleanupVnodes(SDnode *pDnode);
void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pVloads);
void dndProcessVnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_MNODE_H_*/
\ No newline at end of file
#endif /*_TD_DND_VNODES_H_*/
\ No newline at end of file
......@@ -14,50 +14,47 @@
*/
#define _DEFAULT_SOURCE
#include "dnodeDnode.h"
#include "dnodeMnode.h"
#include "dnodeTransport.h"
#include "dnodeVnodes.h"
#include "dndDnode.h"
#include "dndMnode.h"
#include "dndTransport.h"
#include "dndVnodes.h"
#include "sync.h"
#include "tcache.h"
#include "tconfig.h"
#include "tnote.h"
#include "tstep.h"
#include "wal.h"
EStat dnodeGetStat(SDnode *pDnode) { return pDnode->stat; }
EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; }
void dnodeSetStat(SDnode *pDnode, EStat stat) {
dDebug("dnode stat set from %s to %s", dnodeStatStr(pDnode->stat), dnodeStatStr(stat));
void dndSetStat(SDnode *pDnode, EStat stat) {
dDebug("dnode stat set from %s to %s", dndStatStr(pDnode->stat), dndStatStr(stat));
pDnode->stat = stat;
}
char *dnodeStatStr(EStat stat) {
char *dndStatStr(EStat stat) {
switch (stat) {
case DN_STAT_INIT:
case DND_STAT_INIT:
return "init";
case DN_STAT_RUNNING:
case DND_STAT_RUNNING:
return "running";
case DN_STAT_STOPPED:
case DND_STAT_STOPPED:
return "stopped";
default:
return "unknown";
}
}
void dnodeReportStartup(SDnode *pDnode, char *name, char *desc) {
void dndReportStartup(SDnode *pDnode, char *name, char *desc) {
SStartupMsg *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, name, strlen(pStartup->name));
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
pStartup->finished = 0;
}
void dnodeGetStartup(SDnode *pDnode, SStartupMsg *pStartup) {
memcpy(pStartup, &pDnode->startup, sizeof(SStartupMsg);
pStartup->finished = (dnodeGetStat(pDnode) == DN_STAT_RUNNING);
void dndGetStartup(SDnode *pDnode, SStartupMsg *pStartup) {
memcpy(pStartup, &pDnode->startup, sizeof(SStartupMsg));
pStartup->finished = (dndGetStat(pDnode) == DND_STAT_RUNNING);
}
static int32_t dnodeCheckRunning(char *dataDir) {
static int32_t dndCheckRunning(char *dataDir) {
char filepath[PATH_MAX] = {0};
snprintf(filepath, sizeof(filepath), "%s/.running", dataDir);
......@@ -79,15 +76,19 @@ static int32_t dnodeCheckRunning(char *dataDir) {
return 0;
}
static int32_t dnodeInitDisk(SDnode *pDnode, char *dataDir) {
char path[PATH_MAX];
snprintf(path, PATH_MAX, "%s/mnode", dataDir);
static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) {
if (dndCheckRunning(pOptions->dataDir) != 0) {
return -1;
}
char path[PATH_MAX + 100];
snprintf(path, sizeof(path), "%s%smnode", pOptions->dataDir, TD_DIRSEP);
pDnode->dir.mnode = strdup(path);
sprintf(path, PATH_MAX, "%s/vnode", dataDir);
snprintf(path, sizeof(path), "%s%svnode", pOptions->dataDir, TD_DIRSEP);
pDnode->dir.vnodes = strdup(path);
sprintf(path, PATH_MAX, "%s/dnode", dataDir);
snprintf(path, sizeof(path), "%s%sdnode", pOptions->dataDir, TD_DIRSEP);
pDnode->dir.dnode = strdup(path);
if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL) {
......@@ -114,55 +115,10 @@ static int32_t dnodeInitDisk(SDnode *pDnode, char *dataDir) {
return -1;
}
if (dnodeCheckRunning(dataDir) != 0) {
return -1;
}
return 0;
}
static int32_t dnodeInitEnv(SDnode *pDnode, const char *cfgPath) {
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
taosInitGlobalCfg();
taosReadGlobalLogCfg();
taosSetCoreDump(tsEnableCoreFile);
if (!taosMkDir(tsLogDir)) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
return -1;
}
char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/taosdlog", tsLogDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
dError("failed to init log file\n");
return -1;
}
if (!taosReadGlobalCfg()) {
taosPrintGlobalCfg();
dError("TDengine read global config failed");
return -1;
}
taosInitNotes();
if (taosCheckGlobalCfg() != 0) {
dError("TDengine check global config failed");
return -1;
}
if (dnodeInitDisk(pDnode, tsDataDir) != 0) {
dError("TDengine failed to init directory");
return -1;
}
return 0;
}
static void dnodeCleanupEnv(SDnode *pDnode) {
static void dndCleanupEnv(SDnode *pDnode) {
if (pDnode->dir.mnode != NULL) {
tfree(pDnode->dir.mnode);
}
......@@ -175,11 +131,10 @@ static void dnodeCleanupEnv(SDnode *pDnode) {
tfree(pDnode->dir.dnode);
}
taosCloseLog();
taosStopCacheRefreshWorker();
}
SDnode *dnodeInit(const char *cfgPath) {
SDnode *dndInit(SDnodeOpt *pOptions) {
SDnode *pDnode = calloc(1, sizeof(pDnode));
if (pDnode == NULL) {
dError("failed to create dnode object");
......@@ -188,73 +143,73 @@ SDnode *dnodeInit(const char *cfgPath) {
}
dInfo("start to initialize TDengine");
dnodeSetStat(pDnode, DN_STAT_INIT);
dndSetStat(pDnode, DND_STAT_INIT);
if (dnodeInitEnv(pDnode, cfgPath) != 0) {
if (dndInitEnv(pDnode, pOptions) != 0) {
dError("failed to init env");
dnodeCleanup(pDnode);
dndCleanup(pDnode);
return NULL;
}
if (rpcInit() != 0) {
dError("failed to init rpc env");
dnodeCleanup(pDnode);
dndCleanup(pDnode);
return NULL;
}
if (walInit() != 0) {
dError("failed to init wal env");
dnodeCleanup(pDnode);
dndCleanup(pDnode);
return NULL;
}
if (dnodeInitDnode(pDnode) != 0) {
if (dndInitDnode(pDnode) != 0) {
dError("failed to init dnode");
dnodeCleanup(pDnode);
dndCleanup(pDnode);
return NULL;
}
if (dnodeInitVnodes(pDnode) != 0) {
if (dndInitVnodes(pDnode) != 0) {
dError("failed to init vnodes");
dnodeCleanup(pDnode);
dndCleanup(pDnode);
return NULL;
}
if (dnodeInitMnode(pDnode) != 0) {
if (dndInitMnode(pDnode) != 0) {
dError("failed to init mnode");
dnodeCleanup(pDnode);
dndCleanup(pDnode);
return NULL;
}
if (dnodeInitTrans(pDnode) != 0) {
if (dndInitTrans(pDnode) != 0) {
dError("failed to init transport");
dnodeCleanup(pDnode);
dndCleanup(pDnode);
return NULL;
}
dnodeSetStat(pDnode, DN_STAT_RUNNING);
dnodeReportStartup(pDnode, "TDengine", "initialized successfully");
dndSetStat(pDnode, DND_STAT_RUNNING);
dndReportStartup(pDnode, "TDengine", "initialized successfully");
dInfo("TDengine is initialized successfully");
return 0;
}
void dnodeCleanup(SDnode *pDnode) {
if (dnodeGetStat(pDnode) == DN_STAT_STOPPED) {
void dndCleanup(SDnode *pDnode) {
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
dError("dnode is shutting down");
return;
}
dInfo("start to cleanup TDengine");
dnodeSetStat(pDnode, DN_STAT_STOPPED);
dnodeCleanupTrans(pDnode);
dnodeCleanupMnode(pDnode);
dnodeCleanupVnodes(pDnode);
dnodeCleanupDnode(pDnode);
dndSetStat(pDnode, DND_STAT_STOPPED);
dndCleanupTrans(pDnode);
dndCleanupMnode(pDnode);
dndCleanupVnodes(pDnode);
dndCleanupDnode(pDnode);
walCleanUp();
rpcCleanup();
dInfo("TDengine is cleaned up successfully");
dnodeCleanupEnv(pDnode);
dndCleanupEnv(pDnode);
free(pDnode);
}
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/* this file is mainly responsible for the communication between DNODEs. Each
* dnode works as both server and client. Dnode may send status, grant, config
* messages to mnode, mnode may send create/alter/drop table/vnode messages
* to dnode. All theses messages are handled from here
*/
#define _DEFAULT_SOURCE
#include "dndTransport.h"
#include "dndDnode.h"
#include "dndMnode.h"
#include "dndVnodes.h"
static void dndInitMsgFp(STransMgmt *pMgmt) {
// msg from client to dnode
pMgmt->msgFp[TSDB_MSG_TYPE_SUBMIT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_QUERY] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_FETCH] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_TABLE] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_TABLE_META] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_TABLES_META] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_QUERY] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_ACK] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_MQ_RESET] = dndProcessVnodeWriteMsg;
// msg from client to mnode
pMgmt->msgFp[TSDB_MSG_TYPE_CONNECT] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_ACCT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_USER] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_USER] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_USER] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_DNODE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_USE_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SYNC_DB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SHOW] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = dndProcessMnodeWriteMsg;
// message from client to dnode
pMgmt->msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dndProcessDnodeReq;
// message from mnode to vnode
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = dndProcessMnodeWriteMsg;
// message from mnode to dnode
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_VNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = dndProcessVnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_MNODE_IN] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_MNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dndProcessMnodeMgmtMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = dndProcessDnodeReq;
pMgmt->msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = dndProcessMnodeWriteMsg;
// message from dnode to mnode
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_AUTH_RSP] = dndProcessDnodeRsp;
pMgmt->msgFp[TSDB_MSG_TYPE_GRANT] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_GRANT_RSP] = dndProcessDnodeRsp;
pMgmt->msgFp[TSDB_MSG_TYPE_STATUS] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dndProcessDnodeRsp;
}
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->t;
int32_t msgType = pMsg->msgType;
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return;
dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
rpcFreeCont(pMsg->pCont);
return;
}
DndMsgFp fp = pMgmt->msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, rsp:%s will be processed, code:%s", pMsg->handle, taosMsg[msgType], tstrerror(pMsg->code));
(*fp)(pDnode, pMsg, pEpSet);
} else {
dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
rpcFreeCont(pMsg->pCont);
}
}
static int32_t dndInitClient(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1;
rpcInit.cfp = dndProcessResponse;
rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000;
rpcInit.user = "-internal";
rpcInit.ckey = "-key";
rpcInit.secret = "-secret";
pMgmt->clientRpc = rpcOpen(&rpcInit);
if (pMgmt->clientRpc == NULL) {
dError("failed to init rpc client");
return -1;
}
return 0;
}
static void dndCleanupClient(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
if (pMgmt->clientRpc) {
rpcClose(pMgmt->clientRpc);
pMgmt->clientRpc = NULL;
dInfo("dnode peer rpc client is closed");
}
}
static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->t;
int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
dndProcessDnodeReq(pDnode, pMsg, pEpSet);
return;
}
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
dError("RPC %p, req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_EXITING};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
return;
} else if (dndGetStat(pDnode) != DND_STAT_RUNNING) {
dError("RPC %p, req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
return;
}
if (pMsg->pCont == NULL) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN};
rpcSendResponse(&rspMsg);
return;
}
DndMsgFp fp = pMgmt->msgFp[msgType];
if (fp != NULL) {
dTrace("RPC %p, req:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pDnode, pMsg, pEpSet);
} else {
dError("RPC %p, req:%s is not processed", pMsg->handle, taosMsg[msgType]);
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
}
static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRpcRsp) {
STransMgmt *pMgmt = &pDnode->t;
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
}
static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SDnode *pDnode = parent;
if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) != 0) {
if (terrno != TSDB_CODE_APP_NOT_READY) {
dTrace("failed to get user auth from mnode since %s", terrstr());
return -1;
}
}
dDebug("user:%s, send auth msg to mnodes", user);
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
tstrncpy(pMsg->user, user, TSDB_USER_LEN);
SRpcMsg rpcMsg = {.pCont = pMsg, .contLen = sizeof(SAuthMsg), .msgType = TSDB_MSG_TYPE_AUTH};
SRpcMsg rpcRsp = {0};
dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
terrno = rpcRsp.code;
dError("user:%s, failed to get user auth from mnodes since %s", user, terrstr());
} else {
SAuthRsp *pRsp = rpcRsp.pCont;
memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
*spi = pRsp->spi;
*encrypt = pRsp->encrypt;
dDebug("user:%s, success to get user auth from mnodes", user);
}
rpcFreeCont(rpcRsp.pCont);
return rpcRsp.code;
}
static int32_t dndInitServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
dndInitMsgFp(pMgmt);
int32_t numOfThreads = (int32_t)((pDnode->opt.numOfCores * pDnode->opt.numOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) {
numOfThreads = 1;
}
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = pDnode->opt.serverPort;
rpcInit.label = "DND-S";
rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dndProcessRequest;
rpcInit.sessions = pDnode->opt.maxShellConns;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000;
rpcInit.afp = dndRetrieveUserAuthInfo;
pMgmt->serverRpc = rpcOpen(&rpcInit);
if (pMgmt->serverRpc == NULL) {
dError("failed to init rpc server");
return -1;
}
return 0;
}
static void dndCleanupServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->t;
if (pMgmt->serverRpc) {
rpcClose(pMgmt->serverRpc);
pMgmt->serverRpc = NULL;
}
}
int32_t dndInitTrans(SDnode *pDnode) {
if (dndInitClient(pDnode) != 0) {
return -1;
}
if (dndInitServer(pDnode) != 0) {
return -1;
}
dInfo("dnode-transport is initialized");
return 0;
}
void dndCleanupTrans(SDnode *pDnode) {
dndCleanupServer(pDnode);
dndCleanupClient(pDnode);
dInfo("dnode-transport is cleaned up");
}
void dndSendMsgToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
STransMgmt *pMgmt = &pDnode->t;
rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL);
}
void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pMsg) {
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
dndSendMsgToDnode(pDnode, &epSet, pMsg);
}
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dnodeMnode.h"
#include "cJSON.h"
#include "dnodeDnode.h"
#include "dnodeTransport.h"
#include "mnode.h"
#include "tlockfree.h"
#include "tqueue.h"
#include "tstep.h"
#include "tworker.h"
static struct {
int32_t refCount;
int8_t deployed;
int8_t dropped;
SWorkerPool mgmtPool;
SWorkerPool readPool;
SWorkerPool writePool;
SWorkerPool syncPool;
taos_queue pReadQ;
taos_queue pWriteQ;
taos_queue pApplyQ;
taos_queue pSyncQ;
taos_queue pMgmtQ;
SSteps *pSteps;
SMnode *pMnode;
SRWLatch latch;
} tsMnode = {0};
static int32_t dnodeAllocMnodeReadQueue();
static void dnodeFreeMnodeReadQueue();
static int32_t dnodeAllocMnodeWriteQueue();
static void dnodeFreeMnodeWriteQueue();
static int32_t dnodeAllocMnodeApplyQueue();
static void dnodeFreeMnodeApplyQueue();
static int32_t dnodeAllocMnodeSyncQueue();
static void dnodeFreeMnodeSyncQueue();
static SMnode *dnodeAcquireMnode() {
SMnode *pMnode = NULL;
taosRLockLatch(&tsMnode.latch);
if (tsMnode.deployed) {
atomic_add_fetch_32(&tsMnode.refCount, 1);
pMnode = tsMnode.pMnode;
}
taosRUnLockLatch(&tsMnode.latch);
return pMnode;
}
static void dnodeReleaseMnode(SMnode *pMnode) {
taosRLockLatch(&tsMnode.latch);
atomic_sub_fetch_32(&tsMnode.refCount, 1);
taosRUnLockLatch(&tsMnode.latch);
}
static int32_t dnodeReadMnodeFile() {
int32_t code = TSDB_CODE_DND_READ_MNODE_FILE_ERROR;
int32_t len = 0;
int32_t maxLen = 300;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
FILE *fp = NULL;
char file[PATH_MAX + 20] = {0};
snprintf(file, sizeof(file), "%s/mnode.json", tsDnodeDir);
fp = fopen(file, "r");
if (!fp) {
dDebug("file %s not exist", file);
code = 0;
goto PRASE_MNODE_OVER;
}
len = (int32_t)fread(content, 1, maxLen, fp);
if (len <= 0) {
dError("failed to read %s since content is null", file);
goto PRASE_MNODE_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s since invalid json format", file);
goto PRASE_MNODE_OVER;
}
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
if (!deployed || deployed->type != cJSON_String) {
dError("failed to read %s since deployed not found", file);
goto PRASE_MNODE_OVER;
}
tsMnode.deployed = atoi(deployed->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_String) {
dError("failed to read %s since dropped not found", file);
goto PRASE_MNODE_OVER;
}
tsMnode.dropped = atoi(dropped->valuestring);
code = 0;
dInfo("succcessed to read file %s", file);
PRASE_MNODE_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (fp != NULL) fclose(fp);
return code;
}
static int32_t dnodeWriteMnodeFile() {
char file[PATH_MAX + 20] = {0};
char realfile[PATH_MAX + 20] = {0};
snprintf(file, sizeof(file), "%s/mnode.json.bak", tsDnodeDir);
snprintf(realfile, sizeof(realfile), "%s/mnode.json", tsDnodeDir);
FILE *fp = fopen(file, "w");
if (!fp) {
dError("failed to write %s since %s", file, strerror(errno));
return TSDB_CODE_DND_WRITE_MNODE_FILE_ERROR;
}
int32_t len = 0;
int32_t maxLen = 300;
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"deployed\": \"%d\",\n", tsMnode.deployed);
len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\"\n", tsMnode.dropped);
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
taosFsyncFile(fileno(fp));
fclose(fp);
free(content);
int32_t code = taosRenameFile(file, realfile);
if (code != 0) {
dError("failed to rename %s since %s", file, tstrerror(code));
return TSDB_CODE_DND_WRITE_MNODE_FILE_ERROR;
}
dInfo("successed to write %s", realfile);
return 0;
}
static int32_t dnodeStartMnode() {
int32_t code = dnodeAllocMnodeReadQueue();
if (code != 0) {
return code;
}
code = dnodeAllocMnodeWriteQueue();
if (code != 0) {
return code;
}
code = dnodeAllocMnodeApplyQueue();
if (code != 0) {
return code;
}
code = dnodeAllocMnodeSyncQueue();
if (code != 0) {
return code;
}
taosWLockLatch(&tsMnode.latch);
tsMnode.deployed = 1;
taosWUnLockLatch(&tsMnode.latch);
return mnodeStart(NULL);
}
static void dnodeStopMnode() {
taosWLockLatch(&tsMnode.latch);
tsMnode.deployed = 0;
taosWUnLockLatch(&tsMnode.latch);
dnodeReleaseMnode();
while (tsMnode.refCount > 0) taosMsleep(10);
while (!taosQueueEmpty(tsMnode.pReadQ)) taosMsleep(10);
while (!taosQueueEmpty(tsMnode.pApplyQ)) taosMsleep(10);
while (!taosQueueEmpty(tsMnode.pWriteQ)) taosMsleep(10);
while (!taosQueueEmpty(tsMnode.pSyncQ)) taosMsleep(10);
dnodeFreeMnodeReadQueue();
dnodeFreeMnodeWriteQueue();
dnodeFreeMnodeApplyQueue();
dnodeFreeMnodeSyncQueue();
}
static int32_t dnodeUnDeployMnode() {
tsMnode.dropped = 1;
int32_t code = dnodeWriteMnodeFile();
if (code != 0) {
tsMnode.dropped = 0;
dError("failed to undeploy mnode since %s", tstrerror(code));
return code;
}
dnodeStopMnode();
mnodeUnDeploy();
dnodeWriteMnodeFile();
return code;
}
static int32_t dnodeDeployMnode(SMnodeCfg *pCfg) {
int32_t code = mnodeDeploy(pCfg);
if (code != 0) {
dError("failed to deploy mnode since %s", tstrerror(code));
return code;
}
code = dnodeStartMnode();
if (code != 0) {
dnodeUnDeployMnode();
dError("failed to deploy mnode since %s", tstrerror(code));
return code;
}
code = dnodeWriteMnodeFile();
if (code != 0) {
dnodeUnDeployMnode();
dError("failed to deploy mnode since %s", tstrerror(code));
return code;
}
dInfo("deploy mnode success");
return code;
}
static int32_t dnodeAlterMnode(SMnodeCfg *pCfg) {
int32_t code = dnodeAcquireMnode();
if (code == 0) {
code = mnodeAlter(pCfg);
dnodeReleaseMnode();
}
return code;
}
static SCreateMnodeMsg *dnodeParseCreateMnodeMsg(SRpcMsg *pRpcMsg) {
SCreateMnodeMsg *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId);
for (int32_t i = 0; i < pMsg->replica; ++i) {
pMsg->replicas[i].port = htons(pMsg->replicas[i].port);
}
return pMsg;
}
static int32_t dnodeProcessCreateMnodeReq(SRpcMsg *pRpcMsg) {
SAlterMnodeMsg *pMsg = (SAlterMnodeMsg *)dnodeParseCreateMnodeMsg(pRpcMsg->pCont);
if (pMsg->dnodeId != dnodeGetDnodeId()) {
return TSDB_CODE_DND_MNODE_ID_NOT_MATCH_DNODE;
} else {
SMnodeCfg cfg = {0};
cfg.replica = pMsg->replica;
memcpy(cfg.replicas, pMsg->replicas, sizeof(SReplica) * sizeof(TSDB_MAX_REPLICA));
return dnodeDeployMnode(&cfg);
}
}
static int32_t dnodeProcessAlterMnodeReq(SRpcMsg *pRpcMsg) {
SAlterMnodeMsg *pMsg = (SAlterMnodeMsg *)dnodeParseCreateMnodeMsg(pRpcMsg->pCont);
if (pMsg->dnodeId != dnodeGetDnodeId()) {
return TSDB_CODE_DND_MNODE_ID_NOT_MATCH_DNODE;
} else {
SMnodeCfg cfg = {0};
cfg.replica = pMsg->replica;
memcpy(cfg.replicas, pMsg->replicas, sizeof(SReplica) * sizeof(TSDB_MAX_REPLICA));
return dnodeAlterMnode(&cfg);
}
}
static int32_t dnodeProcessDropMnodeReq(SRpcMsg *pMsg) {
SAlterMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
if (pCfg->dnodeId != dnodeGetDnodeId()) {
return TSDB_CODE_DND_MNODE_ID_NOT_MATCH_DNODE;
} else {
return dnodeUnDeployMnode();
}
}
static void dnodeProcessMnodeMgmtQueue(void *unused, SRpcMsg *pMsg) {
int32_t code = 0;
switch (pMsg->msgType) {
case TSDB_MSG_TYPE_CREATE_MNODE_IN:
code = dnodeProcessCreateMnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_ALTER_MNODE_IN:
code = dnodeProcessAlterMnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_DROP_MNODE_IN:
code = dnodeProcessDropMnodeReq(pMsg);
break;
default:
code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
break;
}
SRpcMsg rsp = {.code = code, .handle = pMsg->handle};
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void dnodeProcessMnodeReadQueue(void *unused, SMnodeMsg *pMsg) { mnodeProcessMsg(pMsg, MN_MSG_TYPE_READ); }
static void dnodeProcessMnodeWriteQueue(void *unused, SMnodeMsg *pMsg) { mnodeProcessMsg(pMsg, MN_MSG_TYPE_WRITE); }
static void dnodeProcessMnodeApplyQueue(void *unused, SMnodeMsg *pMsg) { mnodeProcessMsg(pMsg, MN_MSG_TYPE_APPLY); }
static void dnodeProcessMnodeSyncQueue(void *unused, SMnodeMsg *pMsg) { mnodeProcessMsg(pMsg, MN_MSG_TYPE_SYNC); }
static int32_t dnodeWriteMnodeMsgToQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
int32_t code = 0;
SMnodeMsg *pMsg = NULL;
if (pQueue == NULL) {
code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
} else {
pMsg = mnodeInitMsg(pRpcMsg);
if (pMsg == NULL) {
code = terrno;
}
}
if (code == 0) {
code = taosWriteQitem(pQueue, pMsg);
}
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
rpcSendResponse(&rsp);
rpcFreeCont(pRpcMsg->pCont);
}
}
void dnodeProcessMnodeMgmtMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteMnodeMsgToQueue(tsMnode.pMgmtQ, pMsg); }
void dnodeProcessMnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
if (dnodeAcquireMnode() == 0) {
dnodeWriteMnodeMsgToQueue(tsMnode.pWriteQ, pMsg);
dnodeReleaseMnode();
} else {
dnodeSendRedirectMsg(NULL, pMsg, 0);
}
}
void dnodeProcessMnodeSyncMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
int32_t code = dnodeAcquireMnode();
if (code == 0) {
dnodeWriteMnodeMsgToQueue(tsMnode.pSyncQ, pMsg);
dnodeReleaseMnode();
} else {
SRpcMsg rsp = {.handle = pMsg->handle, .code = code};
rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
}
}
void dnodeProcessMnodeReadMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
if (dnodeAcquireMnode() == 0) {
dnodeWriteMnodeMsgToQueue(tsMnode.pReadQ, pMsg);
dnodeReleaseMnode();
} else {
dnodeSendRedirectMsg(NULL, pMsg, 0);
}
}
static int32_t dnodePutMsgIntoMnodeApplyQueue(SMnodeMsg *pMsg) {
int32_t code = dnodeAcquireMnode();
if (code != 0) return code;
code = taosWriteQitem(tsMnode.pApplyQ, pMsg);
dnodeReleaseMnode();
return code;
}
static int32_t dnodeAllocMnodeMgmtQueue() {
tsMnode.pMgmtQ = tWorkerAllocQueue(&tsMnode.mgmtPool, NULL, (FProcessItem)dnodeProcessMnodeMgmtQueue);
if (tsMnode.pMgmtQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeMnodeMgmtQueue() {
tWorkerFreeQueue(&tsMnode.mgmtPool, tsMnode.pMgmtQ);
tsMnode.pMgmtQ = NULL;
}
static int32_t dnodeInitMnodeMgmtWorker() {
SWorkerPool *pPool = &tsMnode.mgmtPool;
pPool->name = "mnode-mgmt";
pPool->min = 1;
pPool->max = 1;
return tWorkerInit(pPool);
}
static void dnodeCleanupMnodeMgmtWorker() { tWorkerCleanup(&tsMnode.mgmtPool); }
static int32_t dnodeAllocMnodeReadQueue() {
tsMnode.pReadQ = tWorkerAllocQueue(&tsMnode.readPool, NULL, (FProcessItem)dnodeProcessMnodeReadQueue);
if (tsMnode.pReadQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeMnodeReadQueue() {
tWorkerFreeQueue(&tsMnode.readPool, tsMnode.pReadQ);
tsMnode.pReadQ = NULL;
}
static int32_t dnodeInitMnodeReadWorker() {
SWorkerPool *pPool = &tsMnode.readPool;
pPool->name = "mnode-read";
pPool->min = 0;
pPool->max = 1;
return tWorkerInit(pPool);
}
static void dnodeCleanupMnodeReadWorker() { tWorkerCleanup(&tsMnode.readPool); }
static int32_t dnodeAllocMnodeWriteQueue() {
tsMnode.pWriteQ = tWorkerAllocQueue(&tsMnode.writePool, NULL, (FProcessItem)dnodeProcessMnodeWriteQueue);
if (tsMnode.pWriteQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeMnodeWriteQueue() {
tWorkerFreeQueue(&tsMnode.writePool, tsMnode.pWriteQ);
tsMnode.pWriteQ = NULL;
}
static int32_t dnodeAllocMnodeApplyQueue() {
tsMnode.pApplyQ = tWorkerAllocQueue(&tsMnode.writePool, NULL, (FProcessItem)dnodeProcessMnodeApplyQueue);
if (tsMnode.pApplyQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeMnodeApplyQueue() {
tWorkerFreeQueue(&tsMnode.writePool, tsMnode.pApplyQ);
tsMnode.pApplyQ = NULL;
}
static int32_t dnodeInitMnodeWriteWorker() {
SWorkerPool *pPool = &tsMnode.writePool;
pPool->name = "mnode-write";
pPool->min = 0;
pPool->max = 1;
return tWorkerInit(pPool);
}
static void dnodeCleanupMnodeWriteWorker() { tWorkerCleanup(&tsMnode.writePool); }
static int32_t dnodeAllocMnodeSyncQueue() {
tsMnode.pSyncQ = tWorkerAllocQueue(&tsMnode.syncPool, NULL, (FProcessItem)dnodeProcessMnodeSyncQueue);
if (tsMnode.pSyncQ == NULL) {
return TSDB_CODE_DND_OUT_OF_MEMORY;
}
return 0;
}
static void dnodeFreeMnodeSyncQueue() {
tWorkerFreeQueue(&tsMnode.syncPool, tsMnode.pSyncQ);
tsMnode.pSyncQ = NULL;
}
static int32_t dnodeInitMnodeSyncWorker() {
SWorkerPool *pPool = &tsMnode.syncPool;
pPool->name = "mnode-sync";
pPool->min = 0;
pPool->max = 1;
return tWorkerInit(pPool);
}
static void dnodeCleanupMnodeSyncWorker() { tWorkerCleanup(&tsMnode.syncPool); }
static int32_t dnodeInitMnodeModule() {
taosInitRWLatch(&tsMnode.latch);
SMnodeOptions para;
para.dnodeId = dnodeGetDnodeId();
para.clusterId = dnodeGetClusterId();
para.sendMsgToDnodeFp = dnodeSendMsgToDnode;
para.sendMsgToMnodeFp = dnodeSendMsgToMnode;
para.sendRedirectMsgFp = dnodeSendRedirectMsg;
tsMnode.pMnode = mnodeCreate(para);
if (tsMnode.pMnode != NULL) {
return -1;
}
return 0;
}
static void dnodeCleanupMnodeModule() { mnodeDrop(NULL); }
static bool dnodeNeedDeployMnode() {
if (dnodeGetDnodeId() > 0) return false;
if (dnodeGetClusterId() > 0) return false;
if (strcmp(tsFirst, tsLocalEp) != 0) return false;
return true;
}
static int32_t dnodeOpenMnode() {
int32_t code = dnodeReadMnodeFile();
if (code != 0) {
dError("failed to read open mnode since %s", tstrerror(code));
return code;
}
if (tsMnode.dropped) {
dInfo("mnode already dropped, undeploy it");
return dnodeUnDeployMnode();
}
if (!tsMnode.deployed) {
bool needDeploy = dnodeNeedDeployMnode();
if (!needDeploy) return 0;
dInfo("start to deploy mnode");
SMnodeCfg cfg = {.replica = 1};
cfg.replicas[0].port = tsServerPort;
tstrncpy(cfg.replicas[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
code = dnodeDeployMnode(&cfg);
} else {
dInfo("start to open mnode");
return dnodeStartMnode();
}
}
static void dnodeCloseMnode() {
if (dnodeAcquireMnode() == 0) {
dnodeStopMnode();
}
}
int32_t dnodeInitMnode() {
dInfo("dnode-mnode start to init");
SSteps *pSteps = taosStepInit(6, dnodeReportStartup);
taosStepAdd(pSteps, "dnode-mnode-env", dnodeInitMnodeModule, dnodeCleanupMnodeModule);
taosStepAdd(pSteps, "dnode-mnode-mgmt", dnodeInitMnodeMgmtWorker, dnodeCleanupMnodeMgmtWorker);
taosStepAdd(pSteps, "dnode-mnode-read", dnodeInitMnodeReadWorker, dnodeCleanupMnodeReadWorker);
taosStepAdd(pSteps, "dnode-mnode-write", dnodeInitMnodeWriteWorker, dnodeCleanupMnodeWriteWorker);
taosStepAdd(pSteps, "dnode-mnode-sync", dnodeInitMnodeSyncWorker, dnodeCleanupMnodeSyncWorker);
taosStepAdd(pSteps, "dnode-mnode", dnodeOpenMnode, dnodeCloseMnode);
tsMnode.pSteps = pSteps;
int32_t code = taosStepExec(pSteps);
if (code != 0) {
dError("dnode-mnode init failed since %s", tstrerror(code));
} else {
dInfo("dnode-mnode is initialized");
}
}
void dnodeCleanupMnode() {
if (tsMnode.pSteps == NULL) {
dInfo("dnode-mnode start to clean up");
taosStepCleanup(tsMnode.pSteps);
tsMnode.pSteps = NULL;
dInfo("dnode-mnode is cleaned up");
}
}
int32_t dnodeGetUserAuthFromMnode(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SMnode *pMnode = dnodeAcquireMnode();
if (pMnode == NULL) {
dTrace("failed to get user auth since mnode not deployed");
terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
return -1;
}
int32_t code = mnodeRetriveAuth(pMnode, user, spi, encrypt, secret, ckey);
dnodeReleaseMnode(pMnode);
return code;
}
\ No newline at end of file
此差异已折叠。
......@@ -131,7 +131,7 @@ typedef struct SMnodeObj {
int64_t roleTime;
int64_t createdTime;
int64_t updateTime;
SDnodeObj *pDnode;
SDnodeObj *pDnd;
} SMnodeObj;
typedef struct {
......@@ -215,7 +215,7 @@ typedef struct SDbObj {
typedef struct {
int32_t dnodeId;
int8_t role;
SDnodeObj *pDnode;
SDnodeObj *pDnd;
} SVnodeGid;
typedef struct SVgObj {
......
......@@ -83,7 +83,7 @@ static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOptions *pOptions) {
pMnode->replica = pOptions->replica;
pMnode->selfIndex = pOptions->selfIndex;
memcpy(&pMnode->replicas, pOptions->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pServer = pOptions->pServer;
pMnode->pServer = pOptions->pDnode;
pMnode->putMsgToApplyMsgFp = pOptions->putMsgToApplyMsgFp;
pMnode->sendMsgToDnodeFp = pOptions->sendMsgToDnodeFp;
pMnode->sendMsgToMnodeFp = pOptions->sendMsgToMnodeFp;
......@@ -187,7 +187,7 @@ SMnodeMsg *mnodeInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
}
if (rpcGetConnInfo(pRpcMsg->handle, &pMsg->conn) != 0) {
mnodeCleanupMsg(pMnode, pMsg);
mnodeCleanupMsg(pMsg);
mError("can not get user from conn:%p", pMsg->rpcMsg.handle);
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
return NULL;
......@@ -199,7 +199,7 @@ SMnodeMsg *mnodeInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
return pMsg;
}
void mnodeCleanupMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
void mnodeCleanupMsg(SMnodeMsg *pMsg) {
if (pMsg->pUser != NULL) {
sdbRelease(pMsg->pUser);
}
......@@ -210,7 +210,7 @@ void mnodeCleanupMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
static void mnodeProcessRpcMsg(SMnodeMsg *pMsg) {
if (!mnodeIsMaster()) {
mnodeSendRedirectMsg(NULL, &pMsg->rpcMsg, true);
mnodeCleanupMsg(NULL, pMsg);
mnodeCleanupMsg(pMsg);
return;
}
......
......@@ -54,10 +54,11 @@ typedef struct {
char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key
void (*cfp)(SRpcMsg *, SEpSet *);
int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t refCount;
void *parent;
void *idPool; // handle to ID pool
void *tmrCtrl; // handle to timer
SHashObj *hash; // handle returned by hash utility
......@@ -260,6 +261,7 @@ void *rpcOpen(const SRpcInit *pInit) {
pRpc->spi = pInit->spi;
pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp;
pRpc->parent = pInit->parent;
pRpc->refCount = 1;
atomic_add_fetch_32(&tsRpcNum, 1);
......@@ -744,7 +746,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
if (pConn->user[0] == 0) {
terrno = TSDB_CODE_RPC_AUTH_REQUIRED;
} else {
terrno = (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
terrno = (*pRpc->afp)(pRpc->parent, pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
}
if (terrno != 0) {
......@@ -1024,8 +1026,8 @@ static void doRpcReportBrokenLinkToServer(void *param, void *id) {
SRpcMsg *pRpcMsg = (SRpcMsg *)(param);
SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle);
SRpcInfo *pRpc = pConn->pRpc;
(*(pRpc->cfp))(pRpcMsg, NULL);
free(pRpcMsg);
(*(pRpc->cfp))(pRpc->parent, pRpcMsg, NULL);
free(pRpcMsg);
}
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
SRpcInfo *pRpc = pConn->pRpc;
......@@ -1137,9 +1139,9 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
// for asynchronous API
SEpSet *pEpSet = NULL;
if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect)
pEpSet = &pContext->epSet;
pEpSet = &pContext->epSet;
(*pRpc->cfp)(pMsg, pEpSet);
(*pRpc->cfp)(pRpc->parent, pMsg, pEpSet);
}
// free the request message
......@@ -1155,15 +1157,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
rpcMsg.pCont = pHead->content;
rpcMsg.msgType = pHead->msgType;
rpcMsg.code = pHead->code;
if ( rpcIsReq(pHead->msgType) ) {
rpcMsg.code = pHead->code;
if (rpcIsReq(pHead->msgType)) {
rpcMsg.ahandle = pConn->ahandle;
rpcMsg.handle = pConn;
rpcAddRef(pRpc); // add the refCount for requests
// notify the server app
(*(pRpc->cfp))(&rpcMsg, NULL);
(*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
} else {
// it's a response
rpcMsg.handle = pContext;
......
......@@ -78,6 +78,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, "Memory corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, "Data file corrupted")
TAOS_DEFINE_ERROR(TSDB_CODE_CHECKSUM_ERROR, "Checksum error")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, "Invalid config message")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, "Ref ID is removed")
......@@ -235,20 +236,20 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_PARTITONS, "Invalid topic partito
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")
// dnode
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, "Message not processed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, "Dnode out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_ID_NOT_MATCH_DNODE, "Mnode Id not match Dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED, "Mnode already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_NOT_DEPLOYED, "Mnode not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_READ_MNODE_FILE_ERROR, "Read mnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_WRITE_MNODE_FILE_ERROR, "Write mnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_NO_WRITE_ACCESS, "No permission for disk files in dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message length")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_TOO_MANY_VNODES, "Too many vnode directories")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_EXITING, "Dnode is exiting")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_PARSE_VNODE_FILE_ERROR, "Parse vnodes.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_PARSE_DNODE_FILE_ERROR, "Parse dnodes.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, "Invalid message length")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_READ_FILE_ERROR, "Read dnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_DNODE_WRITE_FILE_ERROR, "Write dnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED, "Mnode already deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_NOT_DEPLOYED, "Mnode not deployed")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_ID_INVALID, "Mnode Id invalid")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_ID_NOT_FOUND, "Mnode Id not found")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_READ_FILE_ERROR, "Read mnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR, "Write mnode.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_TOO_MANY_VNODES, "Too many vnode directories")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_READ_FILE_ERROR, "Read vnodes.json error")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR, "Write vnodes.json error")
// vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, "Action in progress")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册