提交 6ee88322 编写于 作者: S Shengliang Guan

TD-10431 fix compile errors in mnode

上级 a17d9811
......@@ -48,7 +48,7 @@ typedef struct {
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
struct SDnode *pDnode;
SDnode *pDnode;
PutMsgToMnodeQFp putMsgToApplyMsgFp;
SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp;
......@@ -63,14 +63,14 @@ typedef struct {
* @param pOption Option of the mnode
* @return SMnode* The mnode object
*/
SMnode *mnodeOpen(const char *path, const SMnodeOpt *pOption);
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption);
/**
* @brief Close a mnode
*
* @param pMnode The mnode object to close
*/
void mnodeClose(SMnode *pMnode);
void mndClose(SMnode *pMnode);
/**
* @brief Close a mnode
......@@ -79,14 +79,14 @@ void mnodeClose(SMnode *pMnode);
* @param pOption Options of the mnode
* @return int32_t 0 for success, -1 for failure
*/
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOpt *pOption);
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption);
/**
* @brief Drop a mnode.
*
* @param path Path of the mnode.
*/
void mnodeDestroy(const char *path);
void mndDestroy(const char *path);
/**
* @brief Get mnode statistics info
......@@ -95,7 +95,7 @@ void mnodeDestroy(const char *path);
* @param pLoad Statistics of the mnode.
* @return int32_t 0 for success, -1 for failure
*/
int32_t mnodeGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
/**
* @brief Get user authentication info
......@@ -108,7 +108,7 @@ int32_t mnodeGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
* @param ckey
* @return int32_t 0 for success, -1 for failure
*/
int32_t mnodeRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey);
/**
* @brief Initialize mnode msg
......@@ -117,14 +117,14 @@ int32_t mnodeRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, c
* @param pMsg The request rpc msg
* @return int32_t The created mnode msg
*/
SMnodeMsg *mnodeInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg);
SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg);
/**
* @brief Cleanup mnode msg
*
* @param pMsg The request msg
*/
void mnodeCleanupMsg(SMnodeMsg *pMsg);
void mndCleanupMsg(SMnodeMsg *pMsg);
/**
* @brief Cleanup mnode msg
......@@ -132,7 +132,7 @@ void mnodeCleanupMsg(SMnodeMsg *pMsg);
* @param pMsg The request msg
* @param code The error code
*/
void mnodeSendRsp(SMnodeMsg *pMsg, int32_t code);
void mndSendRsp(SMnodeMsg *pMsg, int32_t code);
/**
* @brief Process the read request
......@@ -140,7 +140,7 @@ void mnodeSendRsp(SMnodeMsg *pMsg, int32_t code);
* @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure
*/
void mnodeProcessReadMsg(SMnodeMsg *pMsg);
void mndProcessReadMsg(SMnodeMsg *pMsg);
/**
* @brief Process the write request
......@@ -148,7 +148,7 @@ void mnodeProcessReadMsg(SMnodeMsg *pMsg);
* @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure
*/
void mnodeProcessWriteMsg(SMnodeMsg *pMsg);
void mndProcessWriteMsg(SMnodeMsg *pMsg);
/**
* @brief Process the sync request
......@@ -156,7 +156,7 @@ void mnodeProcessWriteMsg(SMnodeMsg *pMsg);
* @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure
*/
void mnodeProcessSyncMsg(SMnodeMsg *pMsg);
void mndProcessSyncMsg(SMnodeMsg *pMsg);
/**
* @brief Process the apply request
......@@ -164,7 +164,7 @@ void mnodeProcessSyncMsg(SMnodeMsg *pMsg);
* @param pMsg The request msg
* @return int32_t 0 for success, -1 for failure
*/
void mnodeProcessApplyMsg(SMnodeMsg *pMsg);
void mndProcessApplyMsg(SMnodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -396,7 +396,7 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
return code;
}
SMnode *pMnode = mnodeOpen(pDnode->dir.mnode, pOption);
SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption);
if (pMnode == NULL) {
dError("failed to open mnode since %s", terrstr());
code = terrno;
......@@ -409,8 +409,8 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
dError("failed to write mnode file since %s", terrstr());
code = terrno;
dndStopMnodeWorker(pDnode);
mnodeClose(pMnode);
mnodeDestroy(pDnode->dir.mnode);
mndClose(pMnode);
mndDestroy(pDnode->dir.mnode);
terrno = code;
return code;
}
......@@ -432,7 +432,7 @@ static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) {
return -1;
}
if (mnodeAlter(pMnode, pOption) != 0) {
if (mndAlter(pMnode, pOption) != 0) {
dError("failed to alter mnode since %s", terrstr());
dndReleaseMnode(pDnode, pMnode);
return -1;
......@@ -467,8 +467,8 @@ static int32_t dndDropMnode(SDnode *pDnode) {
dndStopMnodeWorker(pDnode);
dndWriteMnodeFile(pDnode);
mnodeClose(pMnode);
mnodeDestroy(pDnode->dir.mnode);
mndClose(pMnode);
mndDestroy(pDnode->dir.mnode);
return 0;
}
......@@ -495,6 +495,7 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
return -1;
}
return dndOpenMnode(pDnode, &option);
}
}
......@@ -554,13 +555,13 @@ static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) {
mnodeProcessReadMsg(pMsg);
mndProcessReadMsg(pMsg);
dndReleaseMnode(pDnode, pMnode);
} else {
mnodeSendRsp(pMsg, terrno);
mndSendRsp(pMsg, terrno);
}
mnodeCleanupMsg(pMsg);
mndCleanupMsg(pMsg);
}
static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
......@@ -568,13 +569,13 @@ static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) {
mnodeProcessWriteMsg(pMsg);
mndProcessWriteMsg(pMsg);
dndReleaseMnode(pDnode, pMnode);
} else {
mnodeSendRsp(pMsg, terrno);
mndSendRsp(pMsg, terrno);
}
mnodeCleanupMsg(pMsg);
mndCleanupMsg(pMsg);
}
static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
......@@ -582,13 +583,13 @@ static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) {
mnodeProcessApplyMsg(pMsg);
mndProcessApplyMsg(pMsg);
dndReleaseMnode(pDnode, pMnode);
} else {
mnodeSendRsp(pMsg, terrno);
mndSendRsp(pMsg, terrno);
}
mnodeCleanupMsg(pMsg);
mndCleanupMsg(pMsg);
}
static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
......@@ -596,26 +597,26 @@ static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) {
mnodeProcessSyncMsg(pMsg);
mndProcessSyncMsg(pMsg);
dndReleaseMnode(pDnode, pMnode);
} else {
mnodeSendRsp(pMsg, terrno);
mndSendRsp(pMsg, terrno);
}
mnodeCleanupMsg(pMsg);
mndCleanupMsg(pMsg);
}
static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg) {
assert(pQueue);
SMnodeMsg *pMsg = mnodeInitMsg(pMnode, pRpcMsg);
SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg);
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (taosWriteQitem(pQueue, pMsg) != 0) {
mnodeCleanupMsg(pMsg);
mndCleanupMsg(pMsg);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......@@ -877,7 +878,7 @@ int32_t dndInitMnode(SDnode *pDnode) {
if (pMgmt->dropped) {
dInfo("mnode has been deployed and needs to be deleted");
mnodeDestroy(pDnode->dir.mnode);
mndDestroy(pDnode->dir.mnode);
return 0;
}
......@@ -920,7 +921,7 @@ int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *enc
return -1;
}
int32_t code = mnodeRetriveAuth(pMnode, user, spi, encrypt, secret, ckey);
int32_t code = mndRetriveAuth(pMnode, user, spi, encrypt, secret, ckey);
dndReleaseMnode(pDnode, pMnode);
return code;
}
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitAcct();
void mnodeCleanupAcct();
int32_t mndInitAcct();
void mndCleanupAcct();
#ifdef __cplusplus
}
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitAuth();
void mnodeCleanupAuth();
int32_t mndInitAuth();
void mndCleanupAuth();
#ifdef __cplusplus
}
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitBalance();
void mnodeCleanupBalance();
int32_t mndInitBalance();
void mndCleanupBalance();
#ifdef __cplusplus
}
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitCluster();
void mnodeCleanupCluster();
int32_t mndInitCluster();
void mndCleanupCluster();
#ifdef __cplusplus
}
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitDb();
void mnodeCleanupDb();
int32_t mndInitDb();
void mndCleanupDb();
#ifdef __cplusplus
}
......
......@@ -39,42 +39,34 @@ extern int32_t mDebugFlag;
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
// #define mLError(...) { monSaveLog(2, __VA_ARGS__); mError(__VA_ARGS__) }
// #define mLWarn(...) { monSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) }
// #define mLInfo(...) { monSaveLog(0, __VA_ARGS__); mInfo(__VA_ARGS__) }
#define mLError(...) {mError(__VA_ARGS__) }
#define mLWarn(...) {mWarn(__VA_ARGS__) }
#define mLInfo(...) {mInfo(__VA_ARGS__) }
typedef struct SClusterObj SClusterObj;
typedef struct SDnodeObj SDnodeObj;
typedef struct SMnodeObj SMnodeObj;
typedef struct SAcctObj SAcctObj;
typedef struct SUserObj SUserObj;
typedef struct SDbObj SDbObj;
typedef struct SVgObj SVgObj;
typedef struct SSTableObj SSTableObj;
typedef struct SFuncObj SFuncObj;
typedef struct SOperObj SOperObj;
typedef struct SClusterObj SClusterObj;
typedef struct SDnodeObj SDnodeObj;
typedef struct SMnodeObj SMnodeObj;
typedef struct SAcctObj SAcctObj;
typedef struct SUserObj SUserObj;
typedef struct SDbObj SDbObj;
typedef struct SVgObj SVgObj;
typedef struct SSTableObj SSTableObj;
typedef struct SFuncObj SFuncObj;
typedef struct SOperObj SOperObj;
typedef enum {
MN_AUTH_ACCT_START = 0,
MN_AUTH_ACCT_USER,
MN_AUTH_ACCT_DNODE,
MN_AUTH_ACCT_MNODE,
MN_AUTH_ACCT_DB,
MN_AUTH_ACCT_TABLE,
MN_AUTH_ACCT_MAX
} EMnAuthAcct;
MND_AUTH_ACCT_START = 0,
MND_AUTH_ACCT_USER,
MND_AUTH_ACCT_DNODE,
MND_AUTH_ACCT_MNODE,
MND_AUTH_ACCT_DB,
MND_AUTH_ACCT_TABLE,
MND_AUTH_ACCT_MAX
} EAuthAcct;
typedef enum {
MN_AUTH_OP_START = 0,
MN_AUTH_OP_CREATE_USER,
MN_AUTH_OP_ALTER_USER,
MN_AUTH_OP_DROP_USER,
MN_AUTH_MAX
} EMnAuthOp;
MND_AUTH_OP_START = 0,
MND_AUTH_OP_CREATE_USER,
MND_AUTH_OP_ALTER_USER,
MND_AUTH_OP_DROP_USER,
MND_AUTH_MAX
} EAuthOp;
typedef enum {
TRN_STAGE_PREPARE = 1,
......@@ -86,7 +78,6 @@ typedef enum {
typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy;
typedef struct STrans {
int32_t id;
ETrnStage stage;
......@@ -99,7 +90,6 @@ typedef struct STrans {
SArray *undoActions;
} STrans;
typedef struct SClusterObj {
int64_t id;
char uid[TSDB_CLUSTER_ID_LEN];
......@@ -202,6 +192,7 @@ typedef struct SDbObj {
int64_t createdTime;
int64_t updateTime;
SDbCfg cfg;
int64_t uid;
int8_t status;
int32_t numOfVgroups;
int32_t numOfTables;
......@@ -240,13 +231,13 @@ typedef struct SVgObj {
} SVgObj;
typedef struct SSTableObj {
char tableId[TSDB_TABLE_NAME_LEN];
uint64_t uid;
int64_t createdTime;
int64_t updateTime;
int32_t numOfColumns; // used by normal table
int32_t numOfTags;
SSchema * schema;
char tableId[TSDB_TABLE_NAME_LEN];
uint64_t uid;
int64_t createdTime;
int64_t updateTime;
int32_t numOfColumns; // used by normal table
int32_t numOfTags;
SSchema *schema;
} SSTableObj;
typedef struct SFuncObj {
......@@ -284,21 +275,22 @@ typedef struct {
typedef struct {
int32_t len;
void *rsp;
} SMnRsp;
} SMnodeRsp;
typedef struct SMnodeMsg {
SMnode *pMnode;
void (*fp)(SMnodeMsg *pMsg, int32_t code);
SRpcConnInfo conn;
SUserObj *pUser;
int16_t received;
int16_t successed;
int16_t expected;
int16_t retry;
int32_t code;
int64_t createdTime;
SMnRsp rpcRsp;
SRpcMsg rpcMsg;
char pCont[];
SUserObj *pUser;
int16_t received;
int16_t successed;
int16_t expected;
int16_t retry;
int32_t code;
int64_t createdTime;
SMnodeRsp rpcRsp;
SRpcMsg rpcMsg;
char pCont[];
} SMnodeMsg;
#ifdef __cplusplus
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitDnode();
void mnodeCleanupDnode();
int32_t mndInitDnode();
void mndCleanupDnode();
#ifdef __cplusplus
}
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitFunc();
void mnodeCleanupFunc();
int32_t mndInitFunc();
void mndCleanupFunc();
#ifdef __cplusplus
}
......
......@@ -18,23 +18,21 @@
#include "mndDef.h"
#include "sdb.h"
#include "tstep.h"
#include "tqueue.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef int32_t (*MndMsgFp)(SMnode *pMnode, SMnodeMsg *pMsg);
typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef void (*MndCleanupFp)(SMnode *pMnode);
typedef struct SMnodeBak {
int32_t dnodeId;
int64_t clusterId;
tmr_h timer;
SSteps *pInitSteps;
SSteps *pStartSteps;
SMnodeOpt para;
MndMsgFp msgFp[TSDB_MSG_TYPE_MAX];
} SMnodeBak;
typedef struct {
const char *name;
MndInitFp initFp;
MndCleanupFp cleanupFp;
} SMnodeStep;
typedef struct SMnode {
int32_t dnodeId;
......@@ -43,25 +41,24 @@ typedef struct SMnode {
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
tmr_h timer;
SSteps *pInitSteps;
SSteps *pStartSteps;
struct SSdb *pSdb;
struct SDnode *pServer;
SSdb *pSdb;
SDnode *pDnode;
SArray steps;
MndMsgFp msgFp[TSDB_MSG_TYPE_MAX];
PutMsgToMnodeQFp putMsgToApplyMsgFp;
SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp;
PutMsgToMnodeQFp putMsgToApplyMsgFp;
} SMnode;
tmr_h mnodeGetTimer();
int32_t mnodeGetDnodeId();
int64_t mnodeGetClusterId();
tmr_h mndGetTimer(SMnode *pMnode);
int32_t mndGetDnodeId(SMnode *pMnode);
int64_t mndGetClusterId(SMnode *pMnode);
void mnodeSendMsgToDnode(SMnode *pMnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
void mnodeSendMsgToMnode(SMnode *pMnode, struct SRpcMsg *rpcMsg);
void mnodeSendRedirectMsg(SMnode *pMnode, struct SRpcMsg *rpcMsg, bool forShell);
void mnodeSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp);
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg);
void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg);
void mndSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp);
#ifdef __cplusplus
}
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitMnode();
void mnodeCleanupMnode();
int32_t mndInitMnode();
void mndCleanupMnode();
void mnodeGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect);
void mnodeGetMnodeEpSetForShell(SEpSet *epSet, bool redirect);
......
......@@ -20,8 +20,8 @@
extern "C" {
#endif
int32_t mnodeInitOper();
void mnodeCleanupOper();
int32_t mndInitOper();
void mndCleanupOper();
#ifdef __cplusplus
}
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitProfile();
void mnodeCleanupProfile();
int32_t mndInitProfile();
void mndCleanupProfile();
#ifdef __cplusplus
}
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitShow();
void mnodeCleanUpShow();
int32_t mndInitShow();
void mndCleanupShow();
#ifdef __cplusplus
}
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitStable();
void mnodeCleanupStable();
int32_t mndInitStable();
void mndCleanupStable();
#ifdef __cplusplus
}
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitSync();
void mnodeCleanUpSync();
int32_t mndInitSync();
void mndCleanupSync();
int32_t mnodeSyncPropose(SSdbRaw *pRaw, void *pData);
bool mnodeIsMaster();
......
......@@ -21,8 +21,8 @@ extern "C" {
#endif
#include "mndInt.h"
int32_t mnodeInitTelem();
void mnodeCleanupTelem();
int32_t mndInitTelem();
void mndCleanupTelem();
#ifdef __cplusplus
}
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitTrans();
void mnodeCleanupTrans();
int32_t mndInitTrans();
void mndCleanupTrans();
STrans *trnCreate(ETrnPolicy policy, void *rpcHandle);
void trnDrop(STrans *pTrans);
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitUser();
void mnodeCleanupUser();
int32_t mndInitUser();
void mndCleanupUser();
#ifdef __cplusplus
}
......
......@@ -22,8 +22,8 @@
extern "C" {
#endif
int32_t mnodeInitVgroup();
void mnodeCleanupVgroup();
int32_t mndInitVgroup();
void mndCleanupVgroup();
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_INT_H_
#define _TD_MND_INT_H_
#include "mndDef.h"
#include "sdb.h"
#include "tstep.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef int32_t (*MndMsgFp)(SMnode *pMnode, SMnodeMsg *pMsg);
typedef struct SMnodeBak {
int32_t dnodeId;
int64_t clusterId;
tmr_h timer;
SSteps *pInitSteps;
SSteps *pStartSteps;
SMnodeOpt para;
MndMsgFp msgFp[TSDB_MSG_TYPE_MAX];
} SMnodeBak;
typedef struct SMnode {
int32_t dnodeId;
int64_t clusterId;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
tmr_h timer;
SSteps *pInitSteps;
SSteps *pStartSteps;
struct SSdb *pSdb;
struct SDnode *pServer;
MndMsgFp msgFp[TSDB_MSG_TYPE_MAX];
PutMsgToMnodeQFp putMsgToApplyMsgFp;
SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp;
} SMnode;
tmr_h mnodeGetTimer();
int32_t mnodeGetDnodeId();
int64_t mnodeGetClusterId();
void mnodeSendMsgToDnode(SMnode *pMnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
void mnodeSendMsgToMnode(SMnode *pMnode, struct SRpcMsg *rpcMsg);
void mnodeSendRedirectMsg(SMnode *pMnode, struct SRpcMsg *rpcMsg, bool forShell);
void mnodeSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_INT_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_USER_H_
#define _TD_MND_USER_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mnodeInitUser();
void mnodeCleanupUser();
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_USER_H_*/
......@@ -101,7 +101,7 @@ static int32_t mnodeCreateDefaultAcct() {
return sdbWrite(pRaw);
}
int32_t mnodeInitAcct() {
int32_t mndInitAcct() {
SSdbTable table = {.sdbType = SDB_ACCT,
.keyType = SDB_KEY_BINARY,
.deployFp = (SdbDeployFp)mnodeCreateDefaultAcct,
......@@ -115,4 +115,4 @@ int32_t mnodeInitAcct() {
return 0;
}
void mnodeCleanupAcct() {}
void mndCleanupAcct() {}
......@@ -17,9 +17,9 @@
#include "os.h"
#include "mndAuth.h"
int32_t mnodeInitAuth() { return 0; }
void mnodeCleanupAuth() {}
int32_t mndInitAuth() { return 0; }
void mndCleanupAuth() {}
int32_t mnodeRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
return 0;
}
\ No newline at end of file
......@@ -17,5 +17,5 @@
#include "os.h"
#include "mndInt.h"
int32_t mnodeInitBalance() { return 0; }
void mnodeCleanupBalance() {}
\ No newline at end of file
int32_t mndInitBalance() { return 0; }
void mndCleanupBalance() {}
\ No newline at end of file
......@@ -17,5 +17,5 @@
#include "os.h"
#include "mndInt.h"
int32_t mnodeInitCluster() { return 0; }
void mnodeCleanupCluster() {}
\ No newline at end of file
int32_t mndInitCluster() { return 0; }
void mndCleanupCluster() {}
\ No newline at end of file
......@@ -17,5 +17,5 @@
#include "os.h"
#include "mndInt.h"
int32_t mnodeInitDb() { return 0; }
void mnodeCleanupDb() {}
\ No newline at end of file
int32_t mndInitDb() { return 0; }
void mndCleanupDb() {}
\ No newline at end of file
......@@ -17,5 +17,5 @@
#include "os.h"
#include "mndInt.h"
int32_t mnodeInitDnode() { return 0; }
void mnodeCleanupDnode() {}
\ No newline at end of file
int32_t mndInitDnode() { return 0; }
void mndCleanupDnode() {}
\ No newline at end of file
......@@ -17,5 +17,5 @@
#include "os.h"
#include "mndInt.h"
int32_t mnodeInitFunc() { return 0; }
void mnodeCleanupFunc() {}
\ No newline at end of file
int32_t mndInitFunc() { return 0; }
void mndCleanupFunc() {}
\ No newline at end of file
......@@ -17,8 +17,8 @@
#include "os.h"
#include "mndInt.h"
int32_t mnodeInitMnode() { return 0; }
void mnodeCleanupMnode() {}
int32_t mndInitMnode() { return 0; }
void mndCleanupMnode() {}
void mnodeGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect) {}
void mnodeGetMnodeEpSetForShell(SEpSet *epSet, bool redirect) {}
\ No newline at end of file
......@@ -17,5 +17,5 @@
#include "os.h"
#include "mndInt.h"
int32_t mnodeInitOper() { return 0; }
void mnodeCleanupOper() {}
\ No newline at end of file
int32_t mndInitOper() { return 0; }
void mndCleanupOper() {}
\ No newline at end of file
......@@ -17,5 +17,5 @@
#include "os.h"
#include "mndInt.h"
int32_t mnodeInitProfile() { return 0; }
void mnodeCleanupProfile() {}
\ No newline at end of file
int32_t mndInitProfile() { return 0; }
void mndCleanupProfile() {}
\ No newline at end of file
......@@ -17,5 +17,5 @@
#include "os.h"
#include "mndInt.h"
int32_t mnodeInitShow() { return 0; }
void mnodeCleanUpShow() {}
\ No newline at end of file
int32_t mndInitShow() { return 0; }
void mndCleanupShow() {}
\ No newline at end of file
......@@ -17,5 +17,5 @@
#include "os.h"
#include "mndInt.h"
int32_t mnodeInitStable() { return 0; }
void mnodeCleanupStable() {}
\ No newline at end of file
int32_t mndInitStable() { return 0; }
void mndCleanupStable() {}
\ No newline at end of file
......@@ -18,8 +18,8 @@
#include "mndInt.h"
#include "mndTrans.h"
int32_t mnodeInitSync() { return 0; }
void mnodeCleanUpSync() {}
int32_t mndInitSync() { return 0; }
void mndCleanupSync() {}
int32_t mnodeSyncPropose(SSdbRaw *pRaw, void *pData) {
trnApply(pData, pData, 0);
......
......@@ -174,7 +174,7 @@ static void mnodeAddVersionInfo(SBufferWriter* bw) {
static void mnodeAddRuntimeInfo(SBufferWriter* bw) {
SMnodeLoad load = {0};
if (mnodeGetLoad(NULL, &load) != 0) {
if (mndGetLoad(NULL, &load) != 0) {
return;
}
......@@ -203,7 +203,7 @@ static void mnodeSendTelemetryReport() {
return;
}
int64_t clusterId = mnodeGetClusterId();
int64_t clusterId = mndGetClusterId(NULL);
char clusterIdStr[20] = {0};
snprintf(clusterIdStr, sizeof(clusterIdStr), "%" PRId64, clusterId);
......@@ -278,7 +278,7 @@ static void mnodeGetEmail(char* filepath) {
taosCloseFile(fd);
}
int32_t mnodeInitTelem() {
int32_t mndInitTelem() {
tsTelem.enable = tsEnableTelemetryReporting;
if (!tsTelem.enable) return 0;
......@@ -303,7 +303,7 @@ int32_t mnodeInitTelem() {
return 0;
}
void mnodeCleanupTelem() {
void mndCleanupTelem() {
if (!tsTelem.enable) return;
if (taosCheckPthreadValid(tsTelem.thread)) {
......
......@@ -312,7 +312,7 @@ int32_t trnAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
return code;
}
int32_t mnodeInitTrans() {
int32_t mndInitTrans() {
SSdbTable table = {.sdbType = SDB_TRANS,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)trnActionEncode,
......@@ -326,7 +326,7 @@ int32_t mnodeInitTrans() {
return 0;
}
void mnodeCleanupTrans() { mInfo("trn module is cleaned up"); }
void mndCleanupTrans() { mInfo("trn module is cleaned up"); }
int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)) {
......
......@@ -220,7 +220,7 @@ static int32_t mnodeProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
int32_t mnodeInitUser() {
int32_t mndInitUser() {
SSdbTable table = {.sdbType = SDB_USER,
.keyType = SDB_KEY_BINARY,
.deployFp = (SdbDeployFp)mnodeCreateDefaultUsers,
......@@ -231,9 +231,9 @@ int32_t mnodeInitUser() {
.deleteFp = (SdbDeleteFp)mnodeUserActionDelete};
sdbSetTable(table);
mnodeSetMsgHandle(NULL, TSDB_MSG_TYPE_CREATE_USER, mnodeProcessCreateUserMsg);
mndSetMsgHandle(NULL, TSDB_MSG_TYPE_CREATE_USER, mnodeProcessCreateUserMsg);
return 0;
}
void mnodeCleanupUser() {}
\ No newline at end of file
void mndCleanupUser() {}
\ No newline at end of file
......@@ -17,5 +17,5 @@
#include "os.h"
#include "mndInt.h"
int32_t mnodeInitVgroup() { return 0; }
void mnodeCleanupVgroup() {}
\ No newline at end of file
int32_t mndInitVgroup() { return 0; }
void mndCleanupVgroup() {}
\ No newline at end of file
......@@ -14,9 +14,6 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "tqueue.h"
#include "mndAcct.h"
#include "mndAuth.h"
#include "mndBalance.h"
......@@ -35,55 +32,70 @@
#include "mndUser.h"
#include "mndVgroup.h"
SMnodeBak tsMint = {0};
int32_t mnodeGetDnodeId() { return tsMint.para.dnodeId; }
int32_t mndGetDnodeId(SMnode *pMnode) {
if (pMnode != NULL) {
return pMnode->dnodeId;
}
return -1;
}
int64_t mnodeGetClusterId() { return tsMint.para.clusterId; }
int64_t mndGetClusterId(SMnode *pMnode) {
if (pMnode != NULL) {
return pMnode->clusterId;
}
return -1;
}
void mnodeSendMsgToDnode(SMnode *pMnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg) {
assert(pMnode);
(*pMnode->sendMsgToDnodeFp)(pMnode->pServer, epSet, rpcMsg);
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
if (pMnode != NULL && pMnode->sendMsgToDnodeFp != NULL) {
(*pMnode->sendMsgToDnodeFp)(pMnode->pDnode, pEpSet, pMsg);
}
}
void mnodeSendMsgToMnode(SMnode *pMnode, struct SRpcMsg *rpcMsg) {
assert(pMnode);
(*pMnode->sendMsgToMnodeFp)(pMnode->pServer, rpcMsg);
void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
if (pMnode != NULL && pMnode->sendMsgToMnodeFp != NULL) {
(*pMnode->sendMsgToMnodeFp)(pMnode->pDnode, pMsg);
}
}
void mnodeSendRedirectMsg(SMnode *pMnode, struct SRpcMsg *rpcMsg, bool forShell) {
assert(pMnode);
(*pMnode->sendRedirectMsgFp)(pMnode->pServer, rpcMsg);
void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg) {
if (pMnode != NULL && pMnode->sendRedirectMsgFp != NULL) {
(*pMnode->sendRedirectMsgFp)(pMnode->pDnode, pMsg);
}
}
static int32_t mnodeInitTimer() {
if (tsMint.timer == NULL) {
tsMint.timer = taosTmrInit(tsMaxShellConns, 200, 3600000, "MND");
static int32_t mndInitTimer(SMnode *pMnode) {
if (pMnode->timer == NULL) {
pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
}
if (tsMint.timer == NULL) {
if (pMnode->timer == NULL) {
return -1;
}
return 0;
}
static void mnodeCleanupTimer() {
if (tsMint.timer != NULL) {
taosTmrCleanUp(tsMint.timer);
tsMint.timer = NULL;
static void mndCleanupTimer(SMnode *pMnode) {
if (pMnode->timer != NULL) {
taosTmrCleanUp(pMnode->timer);
pMnode->timer = NULL;
}
}
tmr_h mnodeGetTimer() { return tsMint.timer; }
tmr_h mndGetTimer(SMnode *pMnode) {
if (pMnode != NULL) {
return pMnode->timer;
}
}
static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->dnodeId = pOption->dnodeId;
pMnode->clusterId = pOption->clusterId;
pMnode->replica = pOption->replica;
pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pServer = pOption->pDnode;
pMnode->pDnode = pOption->pDnode;
pMnode->putMsgToApplyMsgFp = pOption->putMsgToApplyMsgFp;
pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp;
pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp;
......@@ -98,88 +110,143 @@ static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
return 0;
}
static int32_t mnodeAllocInitSteps() {
struct SSteps *steps = taosStepInit(16, NULL);
if (steps == NULL) return -1;
if (taosStepAdd(steps, "mnode-trans", mnodeInitTrans, mnodeCleanupTrans) != 0) return -1;
if (taosStepAdd(steps, "mnode-cluster", mnodeInitCluster, mnodeCleanupCluster) != 0) return -1;
if (taosStepAdd(steps, "mnode-dnode", mnodeInitDnode, mnodeCleanupDnode) != 0) return -1;
if (taosStepAdd(steps, "mnode-mnode", mnodeInitMnode, mnodeCleanupMnode) != 0) return -1;
if (taosStepAdd(steps, "mnode-acct", mnodeInitAcct, mnodeCleanupAcct) != 0) return -1;
if (taosStepAdd(steps, "mnode-auth", mnodeInitAuth, mnodeCleanupAuth) != 0) return -1;
if (taosStepAdd(steps, "mnode-user", mnodeInitUser, mnodeCleanupUser) != 0) return -1;
if (taosStepAdd(steps, "mnode-db", mnodeInitDb, mnodeCleanupDb) != 0) return -1;
if (taosStepAdd(steps, "mnode-vgroup", mnodeInitVgroup, mnodeCleanupVgroup) != 0) return -1;
if (taosStepAdd(steps, "mnode-stable", mnodeInitStable, mnodeCleanupStable) != 0) return -1;
if (taosStepAdd(steps, "mnode-func", mnodeInitFunc, mnodeCleanupFunc) != 0) return -1;
if (taosStepAdd(steps, "mnode-sdb", sdbInit, sdbCleanup) != 0) return -1;
tsMint.pInitSteps = steps;
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
SMnodeStep step = {0};
step.name = name;
step.initFp = initFp;
step.cleanupFp = cleanupFp;
if (taosArrayPush(&pMnode->steps, &step) != NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to alloc step:%s since %s", name, terrstr());
return -1;
}
return 0;
}
static int32_t mnodeAllocStartSteps() {
struct SSteps *steps = taosStepInit(8, NULL);
if (steps == NULL) return -1;
static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-stable", mndInitStable, mndCleanupStable) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sdb", sdbInit, sdbCleanup) != 0) return -1;
if (pMnode->replica == 1) {
if (mndAllocStep(pMnode, "mnode-deploy-sdb", sdbDeploy, sdbClose) != 0) return -1;
} else {
if (mndAllocStep(pMnode, "mnode-open-sdb", sdbOpen, sdbClose) != 0) return -1;
}
taosStepAdd(steps, "mnode-timer", mnodeInitTimer, NULL);
taosStepAdd(steps, "mnode-sdb-file", sdbOpen, sdbClose);
taosStepAdd(steps, "mnode-balance", mnodeInitBalance, mnodeCleanupBalance);
taosStepAdd(steps, "mnode-profile", mnodeInitProfile, mnodeCleanupProfile);
taosStepAdd(steps, "mnode-show", mnodeInitShow, mnodeCleanUpShow);
taosStepAdd(steps, "mnode-sync", mnodeInitSync, mnodeCleanUpSync);
taosStepAdd(steps, "mnode-telem", mnodeInitTelem, mnodeCleanupTelem);
taosStepAdd(steps, "mnode-timer", NULL, mnodeCleanupTimer);
if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sdb-file", sdbOpen, sdbClose) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-balance", mndInitBalance, mndCleanupBalance) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
tsMint.pStartSteps = steps;
return 0;
}
SMnode *mnodeOpen(const char *path, const SMnodeOpt *pOption) {
SMnode *pMnode = calloc(1, sizeof(SMnode));
if (mnodeSetOptions(pMnode, pOption) != 0) {
free(pMnode);
mError("failed to init mnode options since %s", terrstr());
return NULL;
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
if (pos == -1) {
pos = taosArrayGetSize(&pMnode->steps);
}
if (mnodeAllocInitSteps() != 0) {
mError("failed to alloc init steps since %s", terrstr());
return NULL;
for (int32_t s = pos; s >= 0; s--) {
SMnodeStep *pStep = taosArrayGet(&pMnode->steps, pos);
mDebug("step:%s will cleanup", pStep->name);
if (pStep->cleanupFp != NULL) {
(*pStep->cleanupFp)(pMnode);
}
}
if (mnodeAllocStartSteps() != 0) {
mError("failed to alloc start steps since %s", terrstr());
return NULL;
}
taosArrayClear(&pMnode->steps);
}
taosStepExec(tsMint.pInitSteps);
static int32_t mndExecSteps(SMnode *pMnode) {
int32_t size = taosArrayGetSize(&pMnode->steps);
for (int32_t pos = 0; pos < size; pos++) {
SMnodeStep *pStep = taosArrayGet(&pMnode->steps, pos);
if (pStep->initFp == NULL) continue;
if (tsMint.para.dnodeId <= 0 && tsMint.para.clusterId <= 0) {
if (sdbDeploy() != 0) {
mError("failed to deploy sdb since %s", terrstr());
return NULL;
// (*pMnode->reportProgress)(pStep->name, "start initialize");
int32_t code = (*pStep->initFp)(pMnode);
if (code != 0) {
mError("step:%s exec failed since %s, start to cleanup", pStep->name, tstrerror(code));
mndCleanupSteps(pMnode, pos);
terrno = code;
return code;
} else {
mInfo("mnode is deployed");
mDebug("step:%s is initialized", pStep->name);
}
// (*pMnode->reportProgress)(pStep->name, "initialize completed");
}
}
taosStepExec(tsMint.pStartSteps);
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
SMnode *pMnode = calloc(1, sizeof(SMnode));
int32_t code = mndSetOptions(pMnode, pOption);
if (code != 0) {
mndClose(pMnode);
terrno = code;
mError("failed to set mnode options since %s", terrstr());
return NULL;
}
code = mndInitSteps(pMnode);
if (code != 0) {
mndClose(pMnode);
terrno = code;
mError("failed to int steps since %s", terrstr());
return NULL;
}
code = mndExecSteps(pMnode);
if (code != 0) {
mndClose(pMnode);
terrno = code;
mError("failed to execute steps since %s", terrstr());
return NULL;
}
mDebug("mnode:%p object is created", pMnode);
return pMnode;
}
void mnodeClose(SMnode *pMnode) { free(pMnode); }
void mndClose(SMnode *pMnode) {
mndCleanupSteps(pMnode, -1);
free(pMnode);
mDebug("mnode:%p object is cleaned up", pMnode);
}
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOpt *pOption) { return 0; }
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
assert(1);
return 0;
}
void mnodeDestroy(const char *path) { sdbUnDeploy(); }
void mndDestroy(const char *path) {
mDebug("mnode in %s will be destroyed", path);
sdbUnDeploy();
}
int32_t mnodeGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { return 0; }
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
assert(1);
return 0;
}
SMnodeMsg *mnodeInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -187,7 +254,7 @@ SMnodeMsg *mnodeInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
}
if (rpcGetConnInfo(pRpcMsg->handle, &pMsg->conn) != 0) {
mnodeCleanupMsg(pMsg);
mndCleanupMsg(pMsg);
mError("can not get user from conn:%p", pMsg->rpcMsg.handle);
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
return NULL;
......@@ -199,7 +266,7 @@ SMnodeMsg *mnodeInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
return pMsg;
}
void mnodeCleanupMsg(SMnodeMsg *pMsg) {
void mndCleanupMsg(SMnodeMsg *pMsg) {
if (pMsg->pUser != NULL) {
sdbRelease(pMsg->pUser);
}
......@@ -207,40 +274,50 @@ void mnodeCleanupMsg(SMnodeMsg *pMsg) {
taosFreeQitem(pMsg);
}
void mnodeSendRsp(SMnodeMsg *pMsg, int32_t code) {}
void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {}
static void mnodeProcessRpcMsg(SMnodeMsg *pMsg) {
if (!mnodeIsMaster()) {
mnodeSendRedirectMsg(NULL, &pMsg->rpcMsg, true);
mnodeCleanupMsg(pMsg);
static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
if (!mnodeIsMaster(pMnode)) {
mndSendRedirectMsg(pMnode, &pMsg->rpcMsg);
mndCleanupMsg(pMsg);
return;
}
int32_t msgType = pMsg->rpcMsg.msgType;
MndMsgFp fp = tsMint.msgFp[msgType];
int32_t msgType = pMsg->rpcMsg.msgType;
MndMsgFp fp = pMnode->msgFp[msgType];
if (fp == NULL) {
mError("RPC %p, req:%s is not processed", pMsg->rpcMsg.handle, taosMsg[msgType]);
SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
mndCleanupMsg(pMsg);
return;
}
int32_t code = (*fp)(NULL, pMsg);
int32_t code = (*fp)(pMnode, pMsg);
if (code != 0) {
assert(code);
mError("RPC %p, req:%s processed error since %s", pMsg->rpcMsg.handle, taosMsg[msgType], tstrerror(code));
SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .code = TSDB_CODE_MSG_NOT_PROCESSED};
rpcSendResponse(&rspMsg);
}
mndCleanupMsg(pMsg);
}
void mnodeSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp) {
void mndSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp) {
if (msgType >= 0 && msgType < TSDB_MSG_TYPE_MAX) {
tsMint.msgFp[msgType] = fp;
pMnode->msgFp[msgType] = fp;
}
}
void mnodeProcessReadMsg(SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); }
void mndProcessReadMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
void mnodeProcessWriteMsg(SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); }
void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
void mnodeProcessSyncMsg(SMnodeMsg *pMsg) { mnodeProcessRpcMsg(pMsg); }
void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
void mnodeProcessApplyMsg(SMnodeMsg *pMsg) {}
void mndProcessApplyMsg(SMnodeMsg *pMsg) {}
#if 0
......@@ -256,7 +333,7 @@ static void mnodeProcessWriteReq(SMnodeMsg *pMsg, void *unused) {
}
if (!mnodeIsMaster()) {
SMnRsp *rpcRsp = &pMsg->rpcRsp;
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
mnodeGetMnodeEpSetForShell(epSet, true);
rpcRsp->rsp = epSet;
......@@ -278,7 +355,7 @@ static void mnodeProcessWriteReq(SMnodeMsg *pMsg, void *unused) {
code = (*tsMworker.writeMsgFp[msgType])(pMsg);
PROCESS_WRITE_REQ_END:
mnodeSendRsp(pMsg, code);
mndSendRsp(pMsg, code);
}
static void mnodeProcessReadReq(SMnodeMsg *pMsg, void *unused) {
......@@ -293,7 +370,7 @@ static void mnodeProcessReadReq(SMnodeMsg *pMsg, void *unused) {
}
if (!mnodeIsMaster()) {
SMnRsp *rpcRsp = &pMsg->rpcRsp;
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
if (!epSet) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -319,7 +396,7 @@ static void mnodeProcessReadReq(SMnodeMsg *pMsg, void *unused) {
code = (*tsMworker.readMsgFp[msgType])(pMsg);
PROCESS_READ_REQ_END:
mnodeSendRsp(pMsg, code);
mndSendRsp(pMsg, code);
}
static void mnodeProcessPeerReq(SMnodeMsg *pMsg, void *unused) {
......@@ -334,7 +411,7 @@ static void mnodeProcessPeerReq(SMnodeMsg *pMsg, void *unused) {
}
if (!mnodeIsMaster()) {
SMnRsp *rpcRsp = &pMsg->rpcRsp;
SMnodeRsp *rpcRsp = &pMsg->rpcRsp;
SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
mnodeGetMnodeEpSetForPeer(epSet, true);
rpcRsp->rsp = epSet;
......@@ -356,7 +433,7 @@ static void mnodeProcessPeerReq(SMnodeMsg *pMsg, void *unused) {
code = (*tsMworker.peerReqFp[msgType])(pMsg);
PROCESS_PEER_REQ_END:
mnodeSendRsp(pMsg, code);
mndSendRsp(pMsg, code);
}
static void mnodeProcessPeerRsp(SMnodeMsg *pMsg, void *unused) {
......@@ -365,7 +442,7 @@ static void mnodeProcessPeerRsp(SMnodeMsg *pMsg, void *unused) {
if (!mnodeIsMaster()) {
mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
mnodeCleanupMsg2(pMsg);
mndCleanupMsg2(pMsg);
}
if (tsMworker.peerRspFp[msgType]) {
......@@ -374,6 +451,6 @@ static void mnodeProcessPeerRsp(SMnodeMsg *pMsg, void *unused) {
mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
}
mnodeCleanupMsg2(pMsg);
mndCleanupMsg2(pMsg);
}
#endif
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册