提交 2f1bb281 编写于 作者: S Shengliang Guan

add daemon

上级 7f1a4b9c
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DNODE_H_
#define _TD_DNODE_H_
#ifdef __cplusplus
extern "C" {
#endif
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode;
/* ------------------------ SDnode ------------------------ */
/**
* @brief Initialize and start the dnode.
*
* @param cfgPath Config file path.
* @return SDnode* The dnode object.
*/
SDnode *dnodeInit(const char *cfgPath);
/**
* @brief Stop and cleanup dnode.
*
* @param pDnode The dnode object to close.
*/
void dnodeCleanup(SDnode *pDnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DNODE_H_*/
...@@ -20,17 +20,16 @@ ...@@ -20,17 +20,16 @@
extern "C" { extern "C" {
#endif #endif
typedef enum { MN_MSG_TYPE_WRITE = 1, MN_MSG_TYPE_APPLY, MN_MSG_TYPE_SYNC, MN_MSG_TYPE_READ } EMnMsgType; /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode;
typedef struct SMnode SMnode;
typedef struct SMnodeMsg SMnodeMsg; typedef struct SMnodeMsg SMnodeMsg;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg, bool forShell);
typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnode, SMnodeMsg *pMsg);
typedef struct { typedef struct SMnodeLoad {
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
} SMnodeCfg;
typedef struct {
int64_t numOfDnode; int64_t numOfDnode;
int64_t numOfMnode; int64_t numOfMnode;
int64_t numOfVgroup; int64_t numOfVgroup;
...@@ -43,38 +42,126 @@ typedef struct { ...@@ -43,38 +42,126 @@ typedef struct {
int64_t compStorage; int64_t compStorage;
} SMnodeLoad; } SMnodeLoad;
typedef struct SMnode SMnode;
typedef struct SServer SServer;
typedef void (*SendMsgToDnodeFp)(SServer *pServer, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SServer *pServer, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SServer *pServer, struct SRpcMsg *rpcMsg, bool forShell);
typedef int32_t (*PutMsgToMnodeQFp)(SServer *pServer, SMnodeMsg *pMsg);
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
struct SServer *pServer;
PutMsgToMnodeQFp putMsgToApplyMsgFp; PutMsgToMnodeQFp putMsgToApplyMsgFp;
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectMsgFp sendRedirectMsgFp;
} SMnodePara; } SMnodeOptions;
/* ------------------------ SMnode ------------------------ */
/**
* @brief Open a mnode.
*
* @param path Path of the mnode
* @param pOptions Options of the mnode
* @return SMnode* The mnode object
*/
SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions);
/**
* @brief Close a mnode
*
* @param pMnode The mnode object to close
*/
void mnodeClose(SMnode *pMnode);
/**
* @brief Close a mnode
*
* @param pMnode The mnode object to close
* @param pOptions Options of the mnode
* @return int32_t 0 for success, -1 for failure
*/
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions);
/**
* @brief Drop a mnode.
*
* @param path Path of the mnode.
*/
void mnodeDestroy(const char *path);
/**
* @brief Get mnode statistics info
*
* @param pMnode The mnode object
* @param pLoad Statistics of the mnode.
* @return int32_t 0 for success, -1 for failure
*/
int32_t mnodeGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
SMnode* mnodeCreate(SMnodePara para); /**
void mnodeCleanup(); * @brief Get user authentication info
*
* @param pMnode The mnode object
* @param user
* @param spi
* @param encrypt
* @param secret
* @param ckey
* @return int32_t 0 for success, -1 for failure
*/
int32_t mnodeRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t mnodeDeploy(SMnodeCfg *pCfg); /**
void mnodeUnDeploy(); * @brief Initialize mnode msg
int32_t mnodeStart(SMnodeCfg *pCfg); *
int32_t mnodeAlter(SMnodeCfg *pCfg); * @param pMnode The mnode object
void mnodeStop(); * @param pMsg The request rpc msg
* @return int32_t The created mnode msg
*/
SMnodeMsg *mnodeInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg);
int32_t mnodeGetLoad(SMnodeLoad *pLoad); /**
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); * @brief Cleanup mnode msg
*
* @param pMnode The mnode object
* @param pMsg The request msg
*/
void mnodeCleanupMsg(SMnode *pMnode, SMnodeMsg *pMsg);
SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg); /**
void mnodeCleanupMsg(SMnodeMsg *pMsg); * @brief Process the read request
void mnodeProcessMsg(SMnodeMsg *pMsg, EMnMsgType msgType); *
* @param pMnode The mnode object
* @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure
*/
void mnodeProcessReadMsg(SMnode *pMnode, SMnodeMsg *pMsg);
/**
* @brief Process the write request
*
* @param pMnode The mnode object
* @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure
*/
void mnodeProcessWriteMsg(SMnode *pMnode, SMnodeMsg *pMsg);
/**
* @brief Process the sync request
*
* @param pMnode The mnode object
* @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure
*/
void mnodeProcessSyncMsg(SMnode *pMnode, SMnodeMsg *pMsg);
/**
* @brief Process the apply request
*
* @param pMnode The mnode object
* @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure
*/
void mnodeProcessApplyMsg(SMnode *pMnode, SMnodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -184,11 +184,11 @@ typedef struct { ...@@ -184,11 +184,11 @@ typedef struct {
SRpcMsg rpcMsg[]; SRpcMsg rpcMsg[];
} SVnodeMsg; } SVnodeMsg;
typedef struct SServer SServer; typedef struct SDnode SDnode;
typedef void (*SendMsgToDnodeFp)(SServer *pServer, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SServer *pServer, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SServer *pServer, struct SRpcMsg *rpcMsg, bool forShell); typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg, bool forShell);
typedef int32_t (*PutMsgToVnodeQFp)(SServer *pServer, int32_t vgId, SVnodeMsg *pMsg); typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg);
typedef struct { typedef struct {
PutMsgToVnodeQFp putMsgToApplyQueueFp; PutMsgToVnodeQFp putMsgToApplyQueueFp;
......
add_subdirectory(mnode) add_subdirectory(mnode)
add_subdirectory(vnode) add_subdirectory(vnode)
add_subdirectory(qnode) add_subdirectory(qnode)
add_subdirectory(mgmt) add_subdirectory(mgmt)
\ No newline at end of file
aux_source_directory(src DNODE_SRC) add_subdirectory(daemon)
add_executable(taosd ${DNODE_SRC}) add_subdirectory(impl)
target_link_libraries( \ No newline at end of file
taosd
PUBLIC cjson
PUBLIC mnode
PUBLIC vnode
PUBLIC wal
PUBLIC sync
PUBLIC taos
)
target_include_directories(
taosd
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode"
private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
aux_source_directory(src DAEMON_SRC)
add_executable(taosd ${DAEMON_SRC})
target_link_libraries(
taosd
PUBLIC dnode
PUBLIC util
PUBLIC os
)
...@@ -14,10 +14,14 @@ ...@@ -14,10 +14,14 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dnodeInt.h" #include "dnode.h"
#include "os.h"
#include "ulog.h"
static bool stop = false; static bool stop = false;
static void sigintHandler(int32_t signum, void *info, void *ctx) { stop = true; } static void sigintHandler(int32_t signum, void *info, void *ctx) { stop = true; }
static void setSignalHandler() { static void setSignalHandler() {
taosSetSignal(SIGTERM, sigintHandler); taosSetSignal(SIGTERM, sigintHandler);
taosSetSignal(SIGHUP, sigintHandler); taosSetSignal(SIGHUP, sigintHandler);
...@@ -27,20 +31,23 @@ static void setSignalHandler() { ...@@ -27,20 +31,23 @@ static void setSignalHandler() {
} }
int main(int argc, char const *argv[]) { int main(int argc, char const *argv[]) {
setSignalHandler(); const char *path = "/etc/taos";
int32_t code = dnodeInit(); SDnode *pDnode = dnodeInit(path);
if (code != 0) { if (pDnode == NULL) {
dInfo("Failed to start TDengine, please check the log at:%s", tsLogDir); uInfo("Failed to start TDengine, please check the log at %s", tsLogDir);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
uInfo("Started TDengine service successfully.");
setSignalHandler();
while (!stop) { while (!stop) {
taosMsleep(100); taosMsleep(100);
} }
dInfo("TDengine is shut down!"); uInfo("TDengine is shut down!");
dnodeCleanup(); dnodeCleanup(pDnode);
return 0; return 0;
} }
aux_source_directory(src DNODE_SRC)
add_library(dnode ${DNODE_SRC})
target_link_libraries(
dnode
PUBLIC cjson
PUBLIC mnode
PUBLIC vnode
PUBLIC wal
PUBLIC sync
PUBLIC taos
)
target_include_directories(
dnode
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/mgmt"
private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
...@@ -21,8 +21,8 @@ extern "C" { ...@@ -21,8 +21,8 @@ extern "C" {
#endif #endif
#include "dnodeInt.h" #include "dnodeInt.h"
int32_t dnodeInitDnode(); int32_t dnodeInitDnode(SDnode *pDnode);
void dnodeCleanupDnode(); void dnodeCleanupDnode(SDnode *pDnode);
void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dnodeGetDnodeId(); int32_t dnodeGetDnodeId();
...@@ -30,7 +30,7 @@ int64_t dnodeGetClusterId(); ...@@ -30,7 +30,7 @@ int64_t dnodeGetClusterId();
void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
void dnodeGetMnodeEpSetForPeer(SEpSet *epSet); void dnodeGetMnodeEpSetForPeer(SEpSet *epSet);
void dnodeGetMnodeEpSetForShell(SEpSet *epSet); void dnodeGetMnodeEpSetForShell(SEpSet *epSet);
void dnodeSendRedirectMsg(SServer *pServer, SRpcMsg *rpcMsg, bool forShell); void dnodeSendRedirectMsg(SDnode *pDnode, SRpcMsg *rpcMsg, bool forShell);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,33 +22,73 @@ extern "C" { ...@@ -22,33 +22,73 @@ extern "C" {
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tglobal.h" #include "tglobal.h"
#include "thash.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "tthread.h"
#include "ttime.h"
extern int32_t dDebugFlag; extern int32_t dDebugFlag;
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }} #define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("SRV FATAL ", 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }} #define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("SRV ERROR ", 255, __VA_ARGS__); }}
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", 255, __VA_ARGS__); }} #define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("SRV WARN ", 255, __VA_ARGS__); }}
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", 255, __VA_ARGS__); }} #define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("SRV ", 255, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("SRV ", dDebugFlag, __VA_ARGS__); }}
typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat; typedef enum { DN_STAT_INIT, DN_STAT_RUNNING, DN_STAT_STOPPED } EStat;
typedef void (*MsgFp)(SRpcMsg *pMsg, SEpSet *pEpSet); typedef void (*MsgFp)(SRpcMsg *pMsg, SEpSet *pEpSet);
typedef struct SServer { typedef struct {
} SServer; char *dnode;
char *mnode;
char *vnodes;
} SDnodeDir;
int32_t dnodeInit(); typedef struct {
void dnodeCleanup(); int32_t dnodeId;
int64_t clusterId;
SDnodeEps *dnodeEps;
SHashObj *dnodeHash;
SEpSet mnodeEpSetForShell;
SEpSet mnodeEpSetForPeer;
char *file;
uint32_t rebootTime;
int8_t dropped;
int8_t threadStop;
pthread_t *threadId;
pthread_mutex_t mutex;
} SDnodeDnode;
EDnStat dnodeGetRunStat(); typedef struct {
void dnodeSetRunStat(EDnStat stat); } SDnodeMnode;
void dnodeReportStartup(char *name, char *desc); typedef struct {
void dnodeReportStartupFinished(char *name, char *desc); } SDnodeVnodes;
void dnodeGetStartup(SStartupMsg *);
typedef struct {
void *peerRpc;
void *shellRpc;
void *clientRpc;
} SDnodeTrans;
typedef struct SDnode {
EStat stat;
SDnodeDir dir;
SDnodeDnode dnode;
SDnodeVnodes vnodes;
SDnodeMnode mnode;
SDnodeTrans trans;
SStartupMsg startup;
} SDnode;
EStat dnodeGetStat(SDnode *pDnode);
void dnodeSetStat(SDnode *pDnode, EStat stat);
char *dnodeStatStr(EStat stat);
void dnodeReportStartup(SDnode *pDnode, char *name, char *desc);
void dnodeGetStartup(SDnode *pDnode, SStartupMsg *pStartup);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -23,8 +23,8 @@ extern "C" { ...@@ -23,8 +23,8 @@ extern "C" {
int32_t dnodeInitTrans(); int32_t dnodeInitTrans();
void dnodeCleanupTrans(); void dnodeCleanupTrans();
void dnodeSendMsgToMnode(SServer *pServer, SRpcMsg *rpcMsg); void dnodeSendMsgToMnode(SDnode *pDnode, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnode(SServer *pServer, SEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SDnode *pDnode, SEpSet *epSet, SRpcMsg *rpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,48 +25,53 @@ ...@@ -25,48 +25,53 @@
#include "tstep.h" #include "tstep.h"
#include "wal.h" #include "wal.h"
static struct { EStat dnodeGetStat(SDnode *pDnode) { return pDnode->stat; }
SStartupMsg startup;
EDnStat runStat;
SSteps *steps;
} tsInt;
EDnStat dnodeGetRunStat() { return tsInt.runStat; } void dnodeSetStat(SDnode *pDnode, EStat stat) {
dDebug("dnode stat set from %s to %s", dnodeStatStr(pDnode->stat), dnodeStatStr(stat));
pDnode->stat = stat;
}
void dnodeSetRunStat(EDnStat stat) { char *dnodeStatStr(EStat stat) {
dDebug("runstat set to %d", stat); switch (stat) {
tsInt.runStat = stat; case DN_STAT_INIT:
return "init";
case DN_STAT_RUNNING:
return "running";
case DN_STAT_STOPPED:
return "stopped";
default:
return "unknown";
}
} }
void dnodeReportStartup(char *name, char *desc) { void dnodeReportStartup(SDnode *pDnode, char *name, char *desc) {
SStartupMsg *pStartup = &tsInt.startup; SStartupMsg *pStartup = &pDnode->startup;
tstrncpy(pStartup->name, name, strlen(pStartup->name)); tstrncpy(pStartup->name, name, strlen(pStartup->name));
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc)); tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
pStartup->finished = 0; pStartup->finished = 0;
} }
void dnodeReportStartupFinished(char *name, char *desc) { void dnodeGetStartup(SDnode *pDnode, SStartupMsg *pStartup) {
SStartupMsg *pStartup = &tsInt.startup; memcpy(pStartup, &pDnode->startup, sizeof(SStartupMsg);
tstrncpy(pStartup->name, name, strlen(pStartup->name)); pStartup->finished = (dnodeGetStat(pDnode) == DN_STAT_RUNNING);
tstrncpy(pStartup->desc, desc, strlen(pStartup->desc));
pStartup->finished = 1;
} }
void dnodeGetStartup(SStartupMsg *pStartup) { memcpy(pStartup, &tsInt.startup, sizeof(SStartupMsg)); } static int32_t dnodeCheckRunning(char *dataDir) {
char filepath[PATH_MAX] = {0};
static int32_t dnodeCheckRunning(char *dir) { snprintf(filepath, sizeof(filepath), "%s/.running", dataDir);
char filepath[256] = {0};
snprintf(filepath, sizeof(filepath), "%s/.running", dir);
FileFd fd = taosOpenFileCreateWriteTrunc(filepath); FileFd fd = taosOpenFileCreateWriteTrunc(filepath);
if (fd < 0) { if (fd < 0) {
dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno)); dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
int32_t ret = taosLockFile(fd); int32_t ret = taosLockFile(fd);
if (ret != 0) { if (ret != 0) {
dError("failed to lock file:%s since %s, quit", filepath, strerror(errno)); dError("failed to lock file:%s since %s, quit", filepath, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(fd); taosCloseFile(fd);
return -1; return -1;
} }
...@@ -74,35 +79,49 @@ static int32_t dnodeCheckRunning(char *dir) { ...@@ -74,35 +79,49 @@ static int32_t dnodeCheckRunning(char *dir) {
return 0; return 0;
} }
static int32_t dnodeInitDir() { static int32_t dnodeInitDisk(SDnode *pDnode, char *dataDir) {
sprintf(tsMnodeDir, "%s/mnode", tsDataDir); char path[PATH_MAX];
sprintf(tsVnodeDir, "%s/vnode", tsDataDir); snprintf(path, PATH_MAX, "%s/mnode", dataDir);
sprintf(tsDnodeDir, "%s/dnode", tsDataDir); pDnode->dir.mnode = strdup(path);
sprintf(path, PATH_MAX, "%s/vnode", dataDir);
pDnode->dir.vnodes = strdup(path);
sprintf(path, PATH_MAX, "%s/dnode", dataDir);
pDnode->dir.dnode = strdup(path);
if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL) {
dError("failed to malloc dir object");
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (!taosMkDir(tsDnodeDir)) { if (!taosMkDir(pDnode->dir.dnode)) {
dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno)); dError("failed to create dir:%s since %s", pDnode->dir.dnode, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (!taosMkDir(tsMnodeDir)) { if (!taosMkDir(pDnode->dir.mnode)) {
dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno)); dError("failed to create dir:%s since %s", pDnode->dir.mnode, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (!taosMkDir(tsVnodeDir)) { if (!taosMkDir(pDnode->dir.vnodes)) {
dError("failed to create dir:%s since %s", tsVnodeDir, strerror(errno)); dError("failed to create dir:%s since %s", pDnode->dir.vnodes, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (dnodeCheckRunning(tsDnodeDir) != 0) { if (dnodeCheckRunning(dataDir) != 0) {
return -1; return -1;
} }
return 0; return 0;
} }
static int32_t dnodeInitMain() { static int32_t dnodeInitEnv(SDnode *pDnode, const char *cfgPath) {
tscEmbedded = 1;
taosIgnSIGPIPE(); taosIgnSIGPIPE();
taosBlockSIGPIPE(); taosBlockSIGPIPE();
taosResolveCRC(); taosResolveCRC();
...@@ -118,7 +137,8 @@ static int32_t dnodeInitMain() { ...@@ -118,7 +137,8 @@ static int32_t dnodeInitMain() {
char temp[TSDB_FILENAME_LEN]; char temp[TSDB_FILENAME_LEN];
sprintf(temp, "%s/taosdlog", tsLogDir); sprintf(temp, "%s/taosdlog", tsLogDir);
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) { if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
printf("failed to init log file\n"); dError("failed to init log file\n");
return -1;
} }
if (!taosReadGlobalCfg()) { if (!taosReadGlobalCfg()) {
...@@ -127,55 +147,114 @@ static int32_t dnodeInitMain() { ...@@ -127,55 +147,114 @@ static int32_t dnodeInitMain() {
return -1; return -1;
} }
dInfo("start to initialize TDengine");
taosInitNotes(); taosInitNotes();
if (taosCheckGlobalCfg() != 0) { if (taosCheckGlobalCfg() != 0) {
dError("TDengine check global config failed");
return -1; return -1;
} }
dnodeInitDir(); if (dnodeInitDisk(pDnode, tsDataDir) != 0) {
dError("TDengine failed to init directory");
return -1;
}
return 0; return 0;
} }
static void dnodeCleanupMain() { static void dnodeCleanupEnv(SDnode *pDnode) {
taos_cleanup(); if (pDnode->dir.mnode != NULL) {
tfree(pDnode->dir.mnode);
}
if (pDnode->dir.vnodes != NULL) {
tfree(pDnode->dir.vnodes);
}
if (pDnode->dir.dnode != NULL) {
tfree(pDnode->dir.dnode);
}
taosCloseLog(); taosCloseLog();
taosStopCacheRefreshWorker(); taosStopCacheRefreshWorker();
} }
int32_t dnodeInit() { SDnode *dnodeInit(const char *cfgPath) {
SSteps *steps = taosStepInit(10, dnodeReportStartup); SDnode *pDnode = calloc(1, sizeof(pDnode));
if (steps == NULL) return -1; if (pDnode == NULL) {
#if 1 dError("failed to create dnode object");
dnodeSetRunStat(DN_RUN_STAT_RUNNING); terrno = TSDB_CODE_OUT_OF_MEMORY;
#endif return NULL;
taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain); }
taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup);
taosStepAdd(steps, "dnode-tfs", NULL, NULL); dInfo("start to initialize TDengine");
taosStepAdd(steps, "dnode-wal", walInit, walCleanUp); dnodeSetStat(pDnode, DN_STAT_INIT);
//taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp);
taosStepAdd(steps, "dnode-dnode", dnodeInitDnode, dnodeCleanupDnode); if (dnodeInitEnv(pDnode, cfgPath) != 0) {
taosStepAdd(steps, "dnode-vnodes", dnodeInitVnodes, dnodeCleanupVnodes); dError("failed to init env");
taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, dnodeCleanupMnode); dnodeCleanup(pDnode);
taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans); return NULL;
}
tsInt.steps = steps;
taosStepExec(tsInt.steps); if (rpcInit() != 0) {
dError("failed to init rpc env");
dnodeSetRunStat(DN_RUN_STAT_RUNNING); dnodeCleanup(pDnode);
dnodeReportStartupFinished("TDengine", "initialized successfully"); return NULL;
}
if (walInit() != 0) {
dError("failed to init wal env");
dnodeCleanup(pDnode);
return NULL;
}
if (dnodeInitDnode(pDnode) != 0) {
dError("failed to init dnode");
dnodeCleanup(pDnode);
return NULL;
}
if (dnodeInitVnodes(pDnode) != 0) {
dError("failed to init vnodes");
dnodeCleanup(pDnode);
return NULL;
}
if (dnodeInitMnode(pDnode) != 0) {
dError("failed to init mnode");
dnodeCleanup(pDnode);
return NULL;
}
if (dnodeInitTrans(pDnode) != 0) {
dError("failed to init transport");
dnodeCleanup(pDnode);
return NULL;
}
dnodeSetStat(pDnode, DN_STAT_RUNNING);
dnodeReportStartup(pDnode, "TDengine", "initialized successfully");
dInfo("TDengine is initialized successfully"); dInfo("TDengine is initialized successfully");
return 0; return 0;
} }
void dnodeCleanup() { void dnodeCleanup(SDnode *pDnode) {
if (dnodeGetRunStat() != DN_RUN_STAT_STOPPED) { if (dnodeGetStat(pDnode) == DN_STAT_STOPPED) {
dnodeSetRunStat(DN_RUN_STAT_STOPPED); dError("dnode is shutting down");
taosStepCleanup(tsInt.steps); return;
tsInt.steps = NULL;
} }
dInfo("start to cleanup TDengine");
dnodeSetStat(pDnode, DN_STAT_STOPPED);
dnodeCleanupTrans(pDnode);
dnodeCleanupMnode(pDnode);
dnodeCleanupVnodes(pDnode);
dnodeCleanupDnode(pDnode);
walCleanUp();
rpcCleanup();
dInfo("TDengine is cleaned up successfully");
dnodeCleanupEnv(pDnode);
free(pDnode);
} }
...@@ -51,19 +51,24 @@ static void dnodeFreeMnodeApplyQueue(); ...@@ -51,19 +51,24 @@ static void dnodeFreeMnodeApplyQueue();
static int32_t dnodeAllocMnodeSyncQueue(); static int32_t dnodeAllocMnodeSyncQueue();
static void dnodeFreeMnodeSyncQueue(); static void dnodeFreeMnodeSyncQueue();
static int32_t dnodeAcquireMnode() { static SMnode *dnodeAcquireMnode() {
SMnode *pMnode = NULL;
taosRLockLatch(&tsMnode.latch); taosRLockLatch(&tsMnode.latch);
int32_t code = tsMnode.deployed ? 0 : TSDB_CODE_DND_MNODE_NOT_DEPLOYED; if (tsMnode.deployed) {
if (code == 0) {
atomic_add_fetch_32(&tsMnode.refCount, 1); atomic_add_fetch_32(&tsMnode.refCount, 1);
pMnode = tsMnode.pMnode;
} }
taosRUnLockLatch(&tsMnode.latch); taosRUnLockLatch(&tsMnode.latch);
return code; return pMnode;
} }
static void dnodeReleaseMnode() { atomic_sub_fetch_32(&tsMnode.refCount, 1); } static void dnodeReleaseMnode(SMnode *pMnode) {
taosRLockLatch(&tsMnode.latch);
atomic_sub_fetch_32(&tsMnode.refCount, 1);
taosRUnLockLatch(&tsMnode.latch);
}
static int32_t dnodeReadMnodeFile() { static int32_t dnodeReadMnodeFile() {
int32_t code = TSDB_CODE_DND_READ_MNODE_FILE_ERROR; int32_t code = TSDB_CODE_DND_READ_MNODE_FILE_ERROR;
...@@ -503,12 +508,12 @@ static void dnodeCleanupMnodeSyncWorker() { tWorkerCleanup(&tsMnode.syncPool); } ...@@ -503,12 +508,12 @@ static void dnodeCleanupMnodeSyncWorker() { tWorkerCleanup(&tsMnode.syncPool); }
static int32_t dnodeInitMnodeModule() { static int32_t dnodeInitMnodeModule() {
taosInitRWLatch(&tsMnode.latch); taosInitRWLatch(&tsMnode.latch);
SMnodePara para; SMnodeOptions para;
para.dnodeId = dnodeGetDnodeId(); para.dnodeId = dnodeGetDnodeId();
para.clusterId = dnodeGetClusterId(); para.clusterId = dnodeGetClusterId();
para.sendMsgToDnodeFp = dnodeSendMsgToDnode; para.sendMsgToDnodeFp = dnodeSendMsgToDnode;
para.sendMsgToMnodeFp = dnodeSendMsgToMnode; para.sendMsgToMnodeFp = dnodeSendMsgToMnode;
para.sendMsgToMnodeFp = dnodeSendRedirectMsg; para.sendRedirectMsgFp = dnodeSendRedirectMsg;
tsMnode.pMnode = mnodeCreate(para); tsMnode.pMnode = mnodeCreate(para);
if (tsMnode.pMnode != NULL) { if (tsMnode.pMnode != NULL) {
...@@ -517,7 +522,7 @@ static int32_t dnodeInitMnodeModule() { ...@@ -517,7 +522,7 @@ static int32_t dnodeInitMnodeModule() {
return 0; return 0;
} }
static void dnodeCleanupMnodeModule() { mnodeCleanup(); } static void dnodeCleanupMnodeModule() { mnodeDrop(NULL); }
static bool dnodeNeedDeployMnode() { static bool dnodeNeedDeployMnode() {
if (dnodeGetDnodeId() > 0) return false; if (dnodeGetDnodeId() > 0) return false;
...@@ -590,13 +595,14 @@ void dnodeCleanupMnode() { ...@@ -590,13 +595,14 @@ void dnodeCleanupMnode() {
} }
int32_t dnodeGetUserAuthFromMnode(char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t dnodeGetUserAuthFromMnode(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
int32_t code = dnodeAcquireMnode(); SMnode *pMnode = dnodeAcquireMnode();
if (code != 0) { if (pMnode == NULL) {
dTrace("failed to get user auth since mnode not deployed"); dTrace("failed to get user auth since mnode not deployed");
return code; terrno = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
return -1;
} }
code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); int32_t code = mnodeRetriveAuth(pMnode, user, spi, encrypt, secret, ckey);
dnodeReleaseMnode(); dnodeReleaseMnode(pMnode);
return code; return code;
} }
\ No newline at end of file
...@@ -135,7 +135,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -135,7 +135,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SEpSet *pEpSet) {
return; return;
} }
if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) { if (dnodeGetStat() != DN_STAT_RUNNING) {
rspMsg.code = TSDB_CODE_APP_NOT_READY; rspMsg.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
...@@ -193,7 +193,7 @@ static void dnodeCleanupPeerServer() { ...@@ -193,7 +193,7 @@ static void dnodeCleanupPeerServer() {
static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SEpSet *pEpSet) { static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SEpSet *pEpSet) {
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { if (dnodeGetStat() == DN_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return; if (pMsg == NULL || pMsg->pCont == NULL) return;
dTrace("RPC %p, peer rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]); dTrace("RPC %p, peer rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
...@@ -248,13 +248,13 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -248,13 +248,13 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg rspMsg = {.handle = pMsg->handle}; SRpcMsg rspMsg = {.handle = pMsg->handle};
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { if (dnodeGetStat() == DN_STAT_STOPPED) {
dError("RPC %p, shell req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]); dError("RPC %p, shell req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]);
rspMsg.code = TSDB_CODE_DND_EXITING; rspMsg.code = TSDB_CODE_DND_EXITING;
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
return; return;
} else if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) { } else if (dnodeGetStat() != DN_STAT_RUNNING) {
dError("RPC %p, shell req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]); dError("RPC %p, shell req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
rspMsg.code = TSDB_CODE_APP_NOT_READY; rspMsg.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
...@@ -382,13 +382,13 @@ void dnodeCleanupTrans() { ...@@ -382,13 +382,13 @@ void dnodeCleanupTrans() {
dnodeCleanupClient(); dnodeCleanupClient();
} }
void dnodeSendMsgToDnode(SServer *pServer, SEpSet *epSet, SRpcMsg *rpcMsg) { void dnodeSendMsgToDnode(SDnode *pDnode, SEpSet *epSet, SRpcMsg *rpcMsg) {
#if 0 #if 0
rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL);
#endif #endif
} }
void dnodeSendMsgToMnode(SServer *pServer, SRpcMsg *rpcMsg) { void dnodeSendMsgToMnode(SDnode *pDnode, SRpcMsg *rpcMsg) {
SEpSet epSet = {0}; SEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet); dnodeGetMnodeEpSetForPeer(&epSet);
dnodeSendMsgToDnode(NULL, &epSet, rpcMsg); dnodeSendMsgToDnode(NULL, &epSet, rpcMsg);
......
...@@ -815,7 +815,7 @@ void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -815,7 +815,7 @@ void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
} }
} }
static int32_t dnodePutMsgIntoVnodeApplyQueue(SServer *pServer, int32_t vgId, SVnodeMsg *pMsg) { static int32_t dnodePutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg) {
SVnodeObj *pVnode = dnodeAcquireVnode(vgId); SVnodeObj *pVnode = dnodeAcquireVnode(vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
return terrno; return terrno;
......
...@@ -32,21 +32,22 @@ typedef struct SMnodeBak { ...@@ -32,21 +32,22 @@ typedef struct SMnodeBak {
tmr_h timer; tmr_h timer;
SSteps *pInitSteps; SSteps *pInitSteps;
SSteps *pStartSteps; SSteps *pStartSteps;
SMnodePara para; SMnodeOptions para;
MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX]; MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX];
} SMnodeBak; } SMnodeBak;
typedef struct SMnode { typedef struct SMnode {
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
tmr_h timer; int8_t replica;
SSteps *pInitSteps; int8_t selfIndex;
SSteps *pStartSteps; SReplica replicas[TSDB_MAX_REPLICA];
SMnodePara para; tmr_h timer;
MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX]; SSteps *pInitSteps;
SSteps *pStartSteps;
struct SSdb *pSdb; struct SSdb *pSdb;
struct SServer *pServer; struct SDnode *pServer;
MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX];
PutMsgToMnodeQFp putMsgToApplyMsgFp; PutMsgToMnodeQFp putMsgToApplyMsgFp;
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;
......
...@@ -77,40 +77,20 @@ static void mnodeCleanupTimer() { ...@@ -77,40 +77,20 @@ static void mnodeCleanupTimer() {
tmr_h mnodeGetTimer() { return tsMint.timer; } tmr_h mnodeGetTimer() { return tsMint.timer; }
static int32_t mnodeSetPara(SMnode *pMnode, SMnodePara para) { static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOptions *pOptions) {
pMnode->dnodeId = para.dnodeId; pMnode->dnodeId = pOptions->dnodeId;
pMnode->clusterId = para.clusterId; pMnode->clusterId = pOptions->clusterId;
pMnode->putMsgToApplyMsgFp = para.putMsgToApplyMsgFp; pMnode->replica = pOptions->replica;
pMnode->sendMsgToDnodeFp = para.sendMsgToDnodeFp; pMnode->selfIndex = pOptions->selfIndex;
pMnode->sendMsgToMnodeFp = para.sendMsgToMnodeFp; memcpy(&pMnode->replicas, pOptions->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->sendRedirectMsgFp = para.sendRedirectMsgFp; pMnode->pServer = pOptions->pServer;
pMnode->putMsgToApplyMsgFp = pOptions->putMsgToApplyMsgFp;
if (pMnode->sendMsgToDnodeFp == NULL) { pMnode->sendMsgToDnodeFp = pOptions->sendMsgToDnodeFp;
terrno = TSDB_CODE_MND_APP_ERROR; pMnode->sendMsgToMnodeFp = pOptions->sendMsgToMnodeFp;
return -1; pMnode->sendRedirectMsgFp = pOptions->sendRedirectMsgFp;
}
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
if (pMnode->sendMsgToMnodeFp == NULL) { pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
terrno = TSDB_CODE_MND_APP_ERROR;
return -1;
}
if (pMnode->sendRedirectMsgFp == NULL) {
terrno = TSDB_CODE_MND_APP_ERROR;
return -1;
}
if (pMnode->putMsgToApplyMsgFp == NULL) {
terrno = TSDB_CODE_MND_APP_ERROR;
return -1;
}
if (pMnode->dnodeId < 0) {
terrno = TSDB_CODE_MND_APP_ERROR;
return -1;
}
if (pMnode->clusterId < 0) {
terrno = TSDB_CODE_MND_APP_ERROR; terrno = TSDB_CODE_MND_APP_ERROR;
return -1; return -1;
} }
...@@ -156,12 +136,12 @@ static int32_t mnodeAllocStartSteps() { ...@@ -156,12 +136,12 @@ static int32_t mnodeAllocStartSteps() {
return 0; return 0;
} }
SMnode *mnodeCreate(SMnodePara para) { SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions) {
SMnode *pMnode = calloc(1, sizeof(SMnode)); SMnode *pMnode = calloc(1, sizeof(SMnode));
if (mnodeSetPara(pMnode, para) != 0) { if (mnodeSetOptions(pMnode, pOptions) != 0) {
free(pMnode); free(pMnode);
mError("failed to init mnode para since %s", terrstr()); mError("failed to init mnode options since %s", terrstr());
return NULL; return NULL;
} }
...@@ -175,35 +155,31 @@ SMnode *mnodeCreate(SMnodePara para) { ...@@ -175,35 +155,31 @@ SMnode *mnodeCreate(SMnodePara para) {
return NULL; return NULL;
} }
taosStepExec(tsMint.pInitSteps); taosStepExec(tsMint.pInitSteps);
return NULL;
}
void mnodeCleanup() { taosStepCleanup(tsMint.pInitSteps); }
int32_t mnodeDeploy(SMnodeCfg *pCfg) {
if (tsMint.para.dnodeId <= 0 && tsMint.para.clusterId <= 0) { if (tsMint.para.dnodeId <= 0 && tsMint.para.clusterId <= 0) {
if (sdbDeploy() != 0) { if (sdbDeploy() != 0) {
mError("failed to deploy sdb since %s", terrstr()); mError("failed to deploy sdb since %s", terrstr());
return -1; return NULL;
} else {
mInfo("mnode is deployed");
} }
} }
mDebug("mnode is deployed"); taosStepExec(tsMint.pStartSteps);
return 0;
}
void mnodeUnDeploy() { sdbUnDeploy(); } return pMnode;
}
int32_t mnodeStart(SMnodeCfg *pCfg) { return taosStepExec(tsMint.pStartSteps); } void mnodeClose(SMnode *pMnode) { free(pMnode); }
int32_t mnodeAlter(SMnodeCfg *pCfg) { return 0; } int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions) { return 0; }
void mnodeStop() { taosStepCleanup(tsMint.pStartSteps); } void mnodeDestroy(const char *path) { sdbUnDeploy(); }
int32_t mnodeGetLoad(SMnodeLoad *pLoad) { return 0; } int32_t mnodeGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { return 0; }
SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg) { SMnodeMsg *mnodeInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg)); SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
if (pMsg == NULL) { if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -211,7 +187,7 @@ SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg) { ...@@ -211,7 +187,7 @@ SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg) {
} }
if (rpcGetConnInfo(pRpcMsg->handle, &pMsg->conn) != 0) { if (rpcGetConnInfo(pRpcMsg->handle, &pMsg->conn) != 0) {
mnodeCleanupMsg(pMsg); mnodeCleanupMsg(pMnode, pMsg);
mError("can not get user from conn:%p", pMsg->rpcMsg.handle); mError("can not get user from conn:%p", pMsg->rpcMsg.handle);
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
return NULL; return NULL;
...@@ -223,7 +199,7 @@ SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg) { ...@@ -223,7 +199,7 @@ SMnodeMsg *mnodeInitMsg(SRpcMsg *pRpcMsg) {
return pMsg; return pMsg;
} }
void mnodeCleanupMsg(SMnodeMsg *pMsg) { void mnodeCleanupMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
if (pMsg->pUser != NULL) { if (pMsg->pUser != NULL) {
sdbRelease(pMsg->pUser); sdbRelease(pMsg->pUser);
} }
...@@ -232,6 +208,12 @@ void mnodeCleanupMsg(SMnodeMsg *pMsg) { ...@@ -232,6 +208,12 @@ void mnodeCleanupMsg(SMnodeMsg *pMsg) {
} }
static void mnodeProcessRpcMsg(SMnodeMsg *pMsg) { static void mnodeProcessRpcMsg(SMnodeMsg *pMsg) {
if (!mnodeIsMaster()) {
mnodeSendRedirectMsg(NULL, &pMsg->rpcMsg, true);
mnodeCleanupMsg(NULL, pMsg);
return;
}
int32_t msgType = pMsg->rpcMsg.msgType; int32_t msgType = pMsg->rpcMsg.msgType;
MnodeRpcFp fp = tsMint.msgFp[msgType]; MnodeRpcFp fp = tsMint.msgFp[msgType];
...@@ -250,25 +232,13 @@ void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp) { ...@@ -250,25 +232,13 @@ void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp) {
} }
} }
void mnodeProcessMsg(SMnodeMsg *pMsg, EMnMsgType msgType) { void mnodeProcessReadMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); }
if (!mnodeIsMaster()) {
mnodeSendRedirectMsg(NULL, &pMsg->rpcMsg, true);
mnodeCleanupMsg(pMsg);
return;
}
switch (msgType) { void mnodeProcessWriteMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); }
case MN_MSG_TYPE_READ:
case MN_MSG_TYPE_WRITE: void mnodeProcessSyncMsg(SMnode *pMnode, SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); }
case MN_MSG_TYPE_SYNC:
mnodeProcessRpcMsg(pMsg); void mnodeProcessApplyMsg(SMnode *pMnode, SMnodeMsg *pMsg) {}
break;
case MN_MSG_TYPE_APPLY:
break;
default:
break;
}
}
#if 0 #if 0
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
int32_t mnodeInitAuth() { return 0; } int32_t mnodeInitAuth() { return 0; }
void mnodeCleanupAuth() {} void mnodeCleanupAuth() {}
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t mnodeRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, TSDB_NETTEST_USER) == 0) { if (strcmp(user, TSDB_NETTEST_USER) == 0) {
char pass[32] = {0}; char pass[32] = {0};
taosEncryptPass((uint8_t *)user, strlen(user), pass); taosEncryptPass((uint8_t *)user, strlen(user), pass);
......
...@@ -174,7 +174,7 @@ static void mnodeAddVersionInfo(SBufferWriter* bw) { ...@@ -174,7 +174,7 @@ static void mnodeAddVersionInfo(SBufferWriter* bw) {
static void mnodeAddRuntimeInfo(SBufferWriter* bw) { static void mnodeAddRuntimeInfo(SBufferWriter* bw) {
SMnodeLoad load = {0}; SMnodeLoad load = {0};
if (mnodeGetLoad(&load) != 0) { if (mnodeGetLoad(NULL, &load) != 0) {
return; return;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册