提交 e168709b 编写于 作者: S Shengliang Guan

[TD-10430] refact interface of dnode

上级 9fae7a3d
...@@ -64,9 +64,6 @@ typedef struct SRpcInit { ...@@ -64,9 +64,6 @@ typedef struct SRpcInit {
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled int idleTime; // milliseconds, 0 means idle timer is disabled
// owner of the rpc client/server,
void *owner; // set by the app when rpc init
// the following is for client app ecurity only // the following is for client app ecurity only
char *user; // user name char *user; // user name
char spi; // security parameter index char spi; // security parameter index
...@@ -75,10 +72,10 @@ typedef struct SRpcInit { ...@@ -75,10 +72,10 @@ typedef struct SRpcInit {
char *ckey; // ciphering key char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app // call back to process incoming msg, code shall be ignored by server app
void (*cfp)(void *owner, SRpcMsg *, SRpcEpSet *); void (*cfp)(SRpcMsg *, SRpcEpSet *);
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
int (*afp)(void *owner, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
} SRpcInit; } SRpcInit;
int32_t rpcInit(); int32_t rpcInit();
......
...@@ -22,58 +22,50 @@ extern "C" { ...@@ -22,58 +22,50 @@ extern "C" {
struct SRpcEpSet; struct SRpcEpSet;
struct SRpcMsg; struct SRpcMsg;
struct Dnode;
/** /**
* Initialize and start the dnode module. * Initialize and start the dnode module.
* *
* @return Instance of dnode module. * @return Error code.
*/ */
struct Dnode *dnodeCreateInstance(); int32_t dnodeInit();
/** /**
* Stop and cleanup dnode module. * Stop and cleanup dnode module.
*
* @param dnode, instance of dnode module.
*/ */
void dnodeDropInstance(struct Dnode *dnode); void dnodeCleanup();
/** /**
* Send messages to other dnodes, such as create vnode message. * Send messages to other dnodes, such as create vnode message.
* *
* @param dnode, the instance of Dnode module.
* @param epSet, the endpoint list of the dnodes. * @param epSet, the endpoint list of the dnodes.
* @param rpcMsg, message to be sent. * @param rpcMsg, message to be sent.
*/ */
void dnodeSendMsgToDnode(struct Dnode *dnode, struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
/** /**
* Send messages to mnode, such as config message. * Send messages to mnode, such as config message.
* *
* @param dnode, the instance of dnode module.
* @param rpcMsg, message to be sent. * @param rpcMsg, message to be sent.
*/ */
void dnodeSendMsgToMnode(struct Dnode *dnode, struct SRpcMsg *rpcMsg); void dnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
/** /**
* Send redirect message to dnode or shell. * Send redirect message to dnode or shell.
* *
* @param dnode, the instance of dnode module.
* @param rpcMsg, message to be sent. * @param rpcMsg, message to be sent.
* @param forShell, used to identify whether to send to shell or dnode. * @param forShell, used to identify whether to send to shell or dnode.
*/ */
void dnodeSendRedirectMsg(struct Dnode *dnode, struct SRpcMsg *rpcMsg, bool forShell); void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
/** /**
* Get the corresponding endpoint information from dnodeId. * Get the corresponding endpoint information from dnodeId.
* *
* @param dnode, the instance of dnode module.
* @param dnodeId, the id ot dnode. * @param dnodeId, the id ot dnode.
* @param ep, the endpoint of dnode. * @param ep, the endpoint of dnode.
* @param fqdn, the fqdn of dnode. * @param fqdn, the fqdn of dnode.
* @param port, the port of dnode. * @param port, the port of dnode.
*/ */
void dnodeGetDnodeEp(struct Dnode *dnode, int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); void dnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,34 +20,29 @@ ...@@ -20,34 +20,29 @@
extern "C" { extern "C" {
#endif #endif
struct Dnode;
typedef struct { typedef struct {
/** /**
* Send messages to other dnodes, such as create vnode message. * Send messages to other dnodes, such as create vnode message.
* *
* @param dnode, the instance of dnode module.
* @param epSet, the endpoint list of the dnodes. * @param epSet, the endpoint list of the dnodes.
* @param rpcMsg, message to be sent. * @param rpcMsg, message to be sent.
*/ */
void (*SendMsgToDnode)(struct Dnode *dnode, struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); void (*SendMsgToDnode)(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
/** /**
* Send messages to mnode, such as config message. * Send messages to mnode, such as config message.
* *
* @param dnode, the instance of dnode module.
* @param rpcMsg, message to be sent. * @param rpcMsg, message to be sent.
*/ */
void (*SendMsgToMnode)(struct Dnode *dnode, struct SRpcMsg *rpcMsg); void (*SendMsgToMnode)(struct SRpcMsg *rpcMsg);
/** /**
* Send redirect message to dnode or shell. * Send redirect message to dnode or shell.
* *
* @param dnode, the instance of dnode module.
* @param rpcMsg, message to be sent. * @param rpcMsg, message to be sent.
* @param forShell, used to identify whether to send to shell or dnode. * @param forShell, used to identify whether to send to shell or dnode.
*/ */
void (*SendRedirectMsg)(struct Dnode *dnode, struct SRpcMsg *rpcMsg, bool forShell); void (*SendRedirectMsg)(struct SRpcMsg *rpcMsg, bool forShell);
/** /**
* Get the corresponding endpoint information from dnodeId. * Get the corresponding endpoint information from dnodeId.
...@@ -58,12 +53,11 @@ typedef struct { ...@@ -58,12 +53,11 @@ typedef struct {
* @param fqdn, the fqdn of dnode. * @param fqdn, the fqdn of dnode.
* @param port, the port of dnode. * @param port, the port of dnode.
*/ */
void (*GetDnodeEp)(struct Dnode *dnode, int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
} SMnodeFp; } SMnodeFp;
typedef struct { typedef struct {
struct Dnode *dnode;
SMnodeFp fp; SMnodeFp fp;
char clusterId[TSDB_CLUSTER_ID_LEN]; char clusterId[TSDB_CLUSTER_ID_LEN];
int32_t dnodeId; int32_t dnodeId;
...@@ -73,40 +67,34 @@ typedef struct { ...@@ -73,40 +67,34 @@ typedef struct {
* Initialize and start mnode module. * Initialize and start mnode module.
* *
* @param para, initialization parameters. * @param para, initialization parameters.
* @return Instance of mnode module. * @return Error code.
*/ */
struct Mnode *mnodeCreateInstance(SMnodePara para); int32_t mnodeInit(SMnodePara para);
/** /**
* Stop and cleanup mnode module. * Stop and cleanup mnode module.
*
* @param mnode, instance of mnode module.
*/ */
void mnodeDropInstance(struct Mnode *vnode); void mnodeCleanup();
/** /**
* Deploy mnode instances in dnode. * Deploy mnode instances in dnode.
* *
* @param mnode, instance of mnode module.
* @param minfos, server information used to deploy the mnode instance. * @param minfos, server information used to deploy the mnode instance.
* @return Error Code. * @return Error Code.
*/ */
int32_t mnodeDeploy(struct Mnode *mnode, struct SMInfos *minfos); int32_t mnodeDeploy(struct SMInfos *minfos);
/** /**
* Delete the mnode instance deployed in dnode. * Delete the mnode instance deployed in dnode.
*
* @param mnode, instance of mnode module.
*/ */
void mnodeUnDeploy(struct Mnode *mnode); void mnodeUnDeploy();
/** /**
* Whether the mnode is in service. * Whether the mnode is in service.
* *
* @param mnode, instance of mnode module.
* @return Server status. * @return Server status.
*/ */
bool mnodeIsServing(struct Mnode *mnode); bool mnodeIsServing();
typedef struct { typedef struct {
int64_t numOfDnode; int64_t numOfDnode;
...@@ -124,16 +112,14 @@ typedef struct { ...@@ -124,16 +112,14 @@ typedef struct {
/** /**
* Get the statistical information of Mnode. * Get the statistical information of Mnode.
* *
* @param mnode, instance of mnode module.
* @param stat, statistical information. * @param stat, statistical information.
* @return Error Code. * @return Error Code.
*/ */
int32_t mnodeGetStatistics(struct Mnode *mnode, SMnodeStat *stat); int32_t mnodeGetStatistics(SMnodeStat *stat);
/** /**
* Get the statistical information of Mnode. * Get the statistical information of Mnode.
* *
* @param mnode, instance of mnode module.
* @param user, username. * @param user, username.
* @param spi, security parameter index. * @param spi, security parameter index.
* @param encrypt, encrypt algorithm. * @param encrypt, encrypt algorithm.
...@@ -141,16 +127,15 @@ int32_t mnodeGetStatistics(struct Mnode *mnode, SMnodeStat *stat); ...@@ -141,16 +127,15 @@ int32_t mnodeGetStatistics(struct Mnode *mnode, SMnodeStat *stat);
* @param ckey, ciphering key. * @param ckey, ciphering key.
* @return Error Code. * @return Error Code.
*/ */
int32_t mnodeRetriveAuth(struct Mnode *mnode, char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
/** /**
* Interface for processing messages. * Interface for processing messages.
* *
* @param mnode, instance of mnode module.
* @param rpcMsg, message to be processed. * @param rpcMsg, message to be processed.
* @return Error code. * @return Error code.
*/ */
void mnodeProcessMsg(struct Mnode *mnode, SRpcMsg *rpcMsg); void mnodeProcessMsg(SRpcMsg *rpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,41 +20,35 @@ ...@@ -20,41 +20,35 @@
extern "C" { extern "C" {
#endif #endif
struct Dnode;
typedef struct { typedef struct {
/** /**
* Send messages to other dnodes, such as create vnode message. * Send messages to other dnodes, such as create vnode message.
* *
* @param dnode, the instance of dnode module.
* @param epSet, the endpoint list of dnodes. * @param epSet, the endpoint list of dnodes.
* @param rpcMsg, message to be sent. * @param rpcMsg, message to be sent.
*/ */
void (*SendMsgToDnode)(struct Dnode *dnode, struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); void (*SendMsgToDnode)(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
/** /**
* Send messages to mnode, such as config message. * Send messages to mnode, such as config message.
* *
* @param dnode, the instance of dnode module.
* @param rpcMsg, message to be sent. * @param rpcMsg, message to be sent.
*/ */
void (*SendMsgToMnode)(struct Dnode *dnode, struct SRpcMsg *rpcMsg); void (*SendMsgToMnode)(struct SRpcMsg *rpcMsg);
/** /**
* Get the corresponding endpoint information from dnodeId. * Get the corresponding endpoint information from dnodeId.
* *
* @param dnode, the instance of dnode module.
* @param dnodeId, the id ot dnode. * @param dnodeId, the id ot dnode.
* @param ep, the endpoint of dnode. * @param ep, the endpoint of dnode.
* @param fqdn, the fqdn of dnode. * @param fqdn, the fqdn of dnode.
* @param port, the port of dnode. * @param port, the port of dnode.
*/ */
void (*GetDnodeEp)(struct Dnode *dnode, int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
} SVnodeFp; } SVnodeFp;
typedef struct { typedef struct {
struct Dnode *dnode;
SVnodeFp fp; SVnodeFp fp;
} SVnodePara; } SVnodePara;
...@@ -62,16 +56,14 @@ typedef struct { ...@@ -62,16 +56,14 @@ typedef struct {
* Start initialize vnode module. * Start initialize vnode module.
* *
* @param para, initialization parameters. * @param para, initialization parameters.
* @return Instance of vnode module. * @return Error code.
*/ */
struct Vnode *vnodeCreateInstance(SVnodePara para); int32_t vnodeInit(SVnodePara para);
/** /**
* Cleanup vnode module. * Cleanup vnode module.
*
* @param vnode, instance of vnode module.
*/ */
void vnodeDropInstance(struct Vnode *vnode); void vnodeCleanup();
typedef struct { typedef struct {
int32_t unused; int32_t unused;
...@@ -80,36 +72,32 @@ typedef struct { ...@@ -80,36 +72,32 @@ typedef struct {
/** /**
* Get the statistical information of vnode. * Get the statistical information of vnode.
* *
* @param vnode, instance of vnode module. * @param stat, statistical information.
* @param sta, statistical information.
* @return Error Code. * @return Error Code.
*/ */
int32_t vnodeGetStatistics(struct Vnode *vnode, SVnodeStat *stat); int32_t vnodeGetStatistics(SVnodeStat *stat);
/** /**
* Get the status of all vnodes. * Get the status of all vnodes.
* *
* @param vnode, instance of vnode module.
* @param status, status msg. * @param status, status msg.
*/ */
void vnodeGetStatus(struct Vnode *vnode, struct SStatusMsg *status); void vnodeGetStatus(struct SStatusMsg *status);
/** /**
* Set access permissions for all vnodes. * Set access permissions for all vnodes.
* *
* @param vnode, instance of vnode module.
* @param access, access permissions of vnodes. * @param access, access permissions of vnodes.
* @param numOfVnodes, the size of vnodes. * @param numOfVnodes, the size of vnodes.
*/ */
void vnodeSetAccess(struct Vnode *vnode, struct SVgroupAccess *access, int32_t numOfVnodes); void vnodeSetAccess(struct SVgroupAccess *access, int32_t numOfVnodes);
/** /**
* Interface for processing messages. * Interface for processing messages.
* *
* @param vnode, instance of vnode module.
* @param msg, message to be processed. * @param msg, message to be processed.
*/ */
void vnodeProcessMsg(struct Vnode *vnode, SRpcMsg *msg); void vnodeProcessMsg(SRpcMsg *msg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,29 +20,14 @@ ...@@ -20,29 +20,14 @@
extern "C" { extern "C" {
#endif #endif
typedef int32_t (*FnInitObj)(void *parent, void **self); typedef int32_t (*InitFp)(void **obj);
typedef void (*FnCleanupObj)(void **self); typedef void (*CleanupFp)(void **obj);
typedef void (*FnReportProgress)(void *parent, const char *name, const char *desc); typedef void (*ReportFp)(char *name, char *desc);
typedef struct SStepObj { struct SSteps *taosStepInit(int32_t maxsize, ReportFp fp);
const char * name; int32_t taosStepExec(struct SSteps *steps);
void * parent; void taosStepCleanup(struct SSteps *steps);
void ** self; int32_t taosStepAdd(struct SSteps *steps, char *name, void **obj, InitFp initFp, CleanupFp cleanupFp);
FnInitObj initFp;
FnCleanupObj cleanupFp;
FnReportProgress reportFp;
} SStepObj;
typedef struct SSteps {
int32_t cursize;
int32_t maxsize;
SStepObj *steps;
} SSteps;
SSteps *taosStepInit(int32_t stepsize);
int32_t taosStepAdd(SSteps *steps, SStepObj *step);
int32_t taosStepExec(SSteps *steps);
void taosStepCleanup(SSteps *steps);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -54,9 +54,8 @@ typedef struct { ...@@ -54,9 +54,8 @@ typedef struct {
char secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
void *owner; void (*cfp)(SRpcMsg *, SRpcEpSet *);
void (*cfp)(void * owner, SRpcMsg *, SRpcEpSet *); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
int (*afp)(void * owner, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t refCount; int32_t refCount;
void *idPool; // handle to ID pool void *idPool; // handle to ID pool
...@@ -259,7 +258,6 @@ void *rpcOpen(const SRpcInit *pInit) { ...@@ -259,7 +258,6 @@ void *rpcOpen(const SRpcInit *pInit) {
if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret)); if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret));
if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey)); if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey));
pRpc->spi = pInit->spi; pRpc->spi = pInit->spi;
pRpc->owner = pInit->owner;
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->refCount = 1; pRpc->refCount = 1;
...@@ -742,7 +740,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -742,7 +740,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
if (pConn->user[0] == 0) { if (pConn->user[0] == 0) {
terrno = TSDB_CODE_RPC_AUTH_REQUIRED; terrno = TSDB_CODE_RPC_AUTH_REQUIRED;
} else { } else {
terrno = (*pRpc->afp)(pRpc->owner, pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey); terrno = (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey);
} }
if (terrno != 0) { if (terrno != 0) {
...@@ -1022,7 +1020,7 @@ static void doRpcReportBrokenLinkToServer(void *param, void *id) { ...@@ -1022,7 +1020,7 @@ static void doRpcReportBrokenLinkToServer(void *param, void *id) {
SRpcMsg *pRpcMsg = (SRpcMsg *)(param); SRpcMsg *pRpcMsg = (SRpcMsg *)(param);
SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle); SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle);
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
(*(pRpc->cfp))(pRpc->owner, pRpcMsg, NULL); (*(pRpc->cfp))(pRpcMsg, NULL);
free(pRpcMsg); free(pRpcMsg);
} }
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) { static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
...@@ -1137,7 +1135,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { ...@@ -1137,7 +1135,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect)
pEpSet = &pContext->epSet; pEpSet = &pContext->epSet;
(*pRpc->cfp)(pRpc->owner, pMsg, pEpSet); (*pRpc->cfp)(pMsg, pEpSet);
} }
// free the request message // free the request message
...@@ -1161,7 +1159,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1161,7 +1159,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
rpcAddRef(pRpc); // add the refCount for requests rpcAddRef(pRpc); // add the refCount for requests
// notify the server app // notify the server app
(*(pRpc->cfp))(pRpc->owner, &rpcMsg, NULL); (*(pRpc->cfp))(&rpcMsg, NULL);
} else { } else {
// it's a response // it's a response
rpcMsg.handle = pContext; rpcMsg.handle = pContext;
......
...@@ -194,7 +194,7 @@ void taosStopTcpServer(void *handle) { ...@@ -194,7 +194,7 @@ void taosStopTcpServer(void *handle) {
pServerObj->stop = 1; pServerObj->stop = 1;
if (pServerObj->fd >= 0) { if (pServerObj->fd >= 0) {
taosShutDownSocketRD(pServerObj->fd); taosShutDownSocketRD(pServerObj->fd);
} }
if (taosCheckPthreadValid(pServerObj->thread)) { if (taosCheckPthreadValid(pServerObj->thread)) {
if (taosComparePthread(pServerObj->thread, pthread_self())) { if (taosComparePthread(pServerObj->thread, pthread_self())) {
...@@ -257,8 +257,8 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -257,8 +257,8 @@ static void *taosAcceptTcpConnection(void *arg) {
int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to));
if (ret != 0) { if (ret != 0) {
taosCloseSocket(connFd); taosCloseSocket(connFd);
tError("%s failed to set recv timeout fd(%s)for connection from:%hu", pServerObj->label, strerror(errno), tError("%s failed to set recv timeout fd(%s)for connection from:%s:%hu", pServerObj->label, strerror(errno),
htons(caddr.sin_port)); taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port));
continue; continue;
} }
...@@ -270,12 +270,12 @@ static void *taosAcceptTcpConnection(void *arg) { ...@@ -270,12 +270,12 @@ static void *taosAcceptTcpConnection(void *arg) {
if (pFdObj) { if (pFdObj) {
pFdObj->ip = caddr.sin_addr.s_addr; pFdObj->ip = caddr.sin_addr.s_addr;
pFdObj->port = htons(caddr.sin_port); pFdObj->port = htons(caddr.sin_port);
tDebug("%s new TCP connection from %hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label,
pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds);
} else { } else {
taosCloseSocket(connFd); taosCloseSocket(connFd);
tError("%s failed to malloc FdObj(%s) for connection from:%hu", pServerObj->label, strerror(errno), tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
htons(caddr.sin_port)); taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port));
} }
// pick up next thread for next connection // pick up next thread for next connection
......
...@@ -22,7 +22,6 @@ extern "C" { ...@@ -22,7 +22,6 @@ extern "C" {
#include "dnodeInt.h" #include "dnodeInt.h"
typedef struct DnCfg { typedef struct DnCfg {
Dnode * dnode;
int32_t dnodeId; int32_t dnodeId;
int32_t dropped; int32_t dropped;
char clusterId[TSDB_CLUSTER_ID_LEN]; char clusterId[TSDB_CLUSTER_ID_LEN];
...@@ -30,7 +29,7 @@ typedef struct DnCfg { ...@@ -30,7 +29,7 @@ typedef struct DnCfg {
pthread_mutex_t mutex; pthread_mutex_t mutex;
} DnCfg; } DnCfg;
int32_t dnodeInitCfg(Dnode *dnode, DnCfg **cfg); int32_t dnodeInitCfg(DnCfg **cfg);
void dnodeCleanupCfg(DnCfg **cfg); void dnodeCleanupCfg(DnCfg **cfg);
void dnodeUpdateCfg(DnCfg *cfg, SDnodeCfg *data); void dnodeUpdateCfg(DnCfg *cfg, SDnodeCfg *data);
int32_t dnodeGetDnodeId(DnCfg *cfg); int32_t dnodeGetDnodeId(DnCfg *cfg);
......
...@@ -22,10 +22,9 @@ extern "C" { ...@@ -22,10 +22,9 @@ extern "C" {
#include "dnodeInt.h" #include "dnodeInt.h"
typedef struct DnCheck { typedef struct DnCheck {
Dnode *dnode;
} DnCheck; } DnCheck;
int32_t dnodeInitCheck(Dnode *dnode, DnCheck **check); int32_t dnodeInitCheck(DnCheck **check);
void dnodeCleanupCheck(DnCheck **check); void dnodeCleanupCheck(DnCheck **check);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -23,7 +23,6 @@ extern "C" { ...@@ -23,7 +23,6 @@ extern "C" {
#include "dnodeInt.h" #include "dnodeInt.h"
typedef struct DnEps { typedef struct DnEps {
Dnode * dnode;
int32_t dnodeId; int32_t dnodeId;
int32_t dnodeNum; int32_t dnodeNum;
SDnodeEp * dnodeList; SDnodeEp * dnodeList;
...@@ -32,11 +31,11 @@ typedef struct DnEps { ...@@ -32,11 +31,11 @@ typedef struct DnEps {
pthread_mutex_t mutex; pthread_mutex_t mutex;
} DnEps; } DnEps;
int32_t dnodeInitEps(Dnode *dnode, DnEps **eps); int32_t dnodeInitEps(DnEps **eps);
void dnodeCleanupEps(DnEps **eps); void dnodeCleanupEps(DnEps **eps);
void dnodeUpdateEps(DnEps *eps, SDnodeEps *data); void dnodeUpdateEps(DnEps *eps, SDnodeEps *data);
bool dnodeIsDnodeEpChanged(DnEps *eps, int32_t dnodeId, char *epstr); bool dnodeIsDnodeEpChanged(DnEps *eps, int32_t dnodeId, char *epstr);
void dnodeGetDnodeEp(Dnode *dnode, int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -52,6 +52,8 @@ typedef struct Dnode { ...@@ -52,6 +52,8 @@ typedef struct Dnode {
struct Vnode * vnode; struct Vnode * vnode;
} Dnode; } Dnode;
Dnode* dnodeInst();
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }} #define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }} #define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }}
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", 255, __VA_ARGS__); }} #define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", 255, __VA_ARGS__); }}
......
...@@ -28,21 +28,20 @@ typedef enum { ...@@ -28,21 +28,20 @@ typedef enum {
} RunStat; } RunStat;
typedef struct DnMain { typedef struct DnMain {
Dnode * dnode;
RunStat runStatus; RunStat runStatus;
void * dnodeTimer; void * dnodeTimer;
SStartupStep startup; SStartupStep startup;
} DnMain; } DnMain;
int32_t dnodeInitMain(Dnode *dnode, DnMain **main); int32_t dnodeInitMain(DnMain **main);
void dnodeCleanupMain(DnMain **main); void dnodeCleanupMain(DnMain **main);
int32_t dnodeInitStorage(Dnode *dnode, void **unused); int32_t dnodeInitStorage();
void dnodeCleanupStorage(void **unused); void dnodeCleanupStorage();
void dnodeReportStartup(Dnode *dnode, char *name, char *desc); void dnodeReportStartup(char *name, char *desc);
void dnodeReportStartupFinished(Dnode *dnode, char *name, char *desc); void dnodeReportStartupFinished(char *name, char *desc);
void dnodeProcessStartupReq(Dnode *dnode, SRpcMsg *pMsg); void dnodeProcessStartupReq(SRpcMsg *pMsg);
void dnodeProcessCreateMnodeReq(Dnode *dnode, SRpcMsg *pMsg); void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg);
void dnodeProcessConfigDnodeReq(Dnode *dnode, SRpcMsg *pMsg); void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,20 +22,19 @@ extern "C" { ...@@ -22,20 +22,19 @@ extern "C" {
#include "dnodeInt.h" #include "dnodeInt.h"
typedef struct DnMnEps { typedef struct DnMnEps {
Dnode * dnode;
SRpcEpSet mnodeEpSet; SRpcEpSet mnodeEpSet;
SMInfos mnodeInfos; SMInfos mnodeInfos;
char file[PATH_MAX + 20]; char file[PATH_MAX + 20];
pthread_mutex_t mutex; pthread_mutex_t mutex;
} DnMnEps; } DnMnEps;
int32_t dnodeInitMnodeEps(Dnode *dnode, DnMnEps **meps); int32_t dnodeInitMnodeEps(DnMnEps **meps);
void dnodeCleanupMnodeEps(DnMnEps **meps); void dnodeCleanupMnodeEps(DnMnEps **meps);
void dnodeUpdateMnodeFromStatus(DnMnEps *meps, SMInfos *pMinfos); void dnodeUpdateMnodeFromStatus(DnMnEps *meps, SMInfos *pMinfos);
void dnodeUpdateMnodeFromPeer(DnMnEps *meps, SRpcEpSet *pEpSet); void dnodeUpdateMnodeFromPeer(DnMnEps *meps, SRpcEpSet *pEpSet);
void dnodeGetEpSetForPeer(DnMnEps *meps, SRpcEpSet *epSet); void dnodeGetEpSetForPeer(DnMnEps *meps, SRpcEpSet *epSet);
void dnodeGetEpSetForShell(DnMnEps *meps, SRpcEpSet *epSet); void dnodeGetEpSetForShell(DnMnEps *meps, SRpcEpSet *epSet);
void dnodeSendRedirectMsg(Dnode *dnode, SRpcMsg *rpcMsg, bool forShell); void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,15 +22,14 @@ extern "C" { ...@@ -22,15 +22,14 @@ extern "C" {
#include "dnodeInt.h" #include "dnodeInt.h"
typedef struct DnStatus { typedef struct DnStatus {
Dnode * dnode;
void * dnodeTimer; void * dnodeTimer;
void * statusTimer; void * statusTimer;
uint32_t rebootTime; uint32_t rebootTime;
} DnStatus; } DnStatus;
int32_t dnodeInitStatus(Dnode *dnode, DnStatus **status); int32_t dnodeInitStatus(DnStatus **status);
void dnodeCleanupStatus(DnStatus **status); void dnodeCleanupStatus(DnStatus **status);
void dnodeProcessStatusRsp(Dnode *dnode, SRpcMsg *pMsg); void dnodeProcessStatusRsp(SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -26,7 +26,6 @@ extern "C" { ...@@ -26,7 +26,6 @@ extern "C" {
* thus we use pthread_mutex_t/pthread_cond_t to simulate * thus we use pthread_mutex_t/pthread_cond_t to simulate
*/ */
typedef struct DnTelem { typedef struct DnTelem {
Dnode * dnode;
bool enable; bool enable;
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_cond_t cond; pthread_cond_t cond;
...@@ -35,8 +34,8 @@ typedef struct DnTelem { ...@@ -35,8 +34,8 @@ typedef struct DnTelem {
char email[TSDB_FQDN_LEN]; char email[TSDB_FQDN_LEN];
} DnTelem; } DnTelem;
int32_t dnodeInitTelemetry(Dnode *dnode, DnTelem **telem); int32_t dnodeInitTelem(DnTelem **telem);
void dnodeCleanupTelemetry(DnTelem **telem); void dnodeCleanupTelem(DnTelem **telem);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -21,30 +21,24 @@ extern "C" { ...@@ -21,30 +21,24 @@ extern "C" {
#endif #endif
#include "dnodeInt.h" #include "dnodeInt.h"
typedef void (*RpcMsgCfp)(void *owner, SRpcMsg *pMsg, SRpcEpSet *pEpSet); typedef void (*RpcMsgFp)( SRpcMsg *pMsg);
typedef void (*RpcMsgFp)(void *owner, SRpcMsg *pMsg);
typedef struct DnMsgFp {
void * module;
RpcMsgFp fp;
} DnMsgFp;
typedef struct DnTrans { typedef struct DnTrans {
Dnode * dnode; void * serverRpc;
void * serverRpc; void * clientRpc;
void * clientRpc; void * shellRpc;
void * shellRpc; int32_t queryReqNum;
int32_t queryReqNum; int32_t submitReqNum;
int32_t submitReqNum; RpcMsgFp peerMsgFp[TSDB_MSG_TYPE_MAX];
DnMsgFp fpPeerMsg[TSDB_MSG_TYPE_MAX]; RpcMsgFp shellMsgFp[TSDB_MSG_TYPE_MAX];
DnMsgFp fpShellMsg[TSDB_MSG_TYPE_MAX];
} DnTrans; } DnTrans;
int32_t dnodeInitTrans(Dnode *dnode, DnTrans **trans); int32_t dnodeInitTrans(DnTrans **rans);
void dnodeCleanupTrans(DnTrans **trans); void dnodeCleanupTrans(DnTrans **trans);
void dnodeSendMsgToMnode(Dnode *dnode, SRpcMsg *rpcMsg); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
void dnodeSendMsgToDnode(Dnode *dnode, SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnodeRecv(Dnode *dnode, SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -103,11 +103,10 @@ static int32_t dnodeWriteCfg(DnCfg *cfg) { ...@@ -103,11 +103,10 @@ static int32_t dnodeWriteCfg(DnCfg *cfg) {
return 0; return 0;
} }
int32_t dnodeInitCfg(Dnode *dnode, DnCfg **out) { int32_t dnodeInitCfg(DnCfg **out) {
DnCfg* cfg = calloc(1, sizeof(DnCfg)); DnCfg* cfg = calloc(1, sizeof(DnCfg));
if (cfg == NULL) return -1; if (cfg == NULL) return -1;
cfg->dnode = dnode;
cfg->dnodeId = 0; cfg->dnodeId = 0;
cfg->dropped = 0; cfg->dropped = 0;
cfg->clusterId[0] = 0; cfg->clusterId[0] = 0;
......
...@@ -145,11 +145,10 @@ static int32_t dnodeCheckAccess() { return 0; } ...@@ -145,11 +145,10 @@ static int32_t dnodeCheckAccess() { return 0; }
static int32_t dnodeCheckVersion() { return 0; } static int32_t dnodeCheckVersion() { return 0; }
static int32_t dnodeCheckDatafile() { return 0; } static int32_t dnodeCheckDatafile() { return 0; }
int32_t dnodeInitCheck(Dnode *dnode, DnCheck **out) { int32_t dnodeInitCheck(DnCheck **out) {
DnCheck *check = calloc(1, sizeof(DnCheck)); DnCheck *check = calloc(1, sizeof(DnCheck));
if (check == NULL) return -1; if (check == NULL) return -1;
check->dnode = dnode;
*out = check; *out = check;
if (dnodeCheckNetwork() != 0) { if (dnodeCheckNetwork() != 0) {
......
...@@ -182,15 +182,14 @@ static int32_t dnodeWriteEps(DnEps *eps) { ...@@ -182,15 +182,14 @@ static int32_t dnodeWriteEps(DnEps *eps) {
return 0; return 0;
} }
int32_t dnodeInitEps(Dnode *dnode, DnEps **out) { int32_t dnodeInitEps(DnEps **out) {
DnEps *eps = calloc(1, sizeof(DnEps)); DnEps *eps = calloc(1, sizeof(DnEps));
if (eps == NULL) return -1; if (eps == NULL) return -1;
eps->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); eps->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (eps->dnodeHash == NULL) return -1; if (eps->dnodeHash == NULL) return -1;
eps->dnode = dnode; eps->dnodeId = dnodeInst()->cfg->dnodeId;
eps->dnodeId = dnode->cfg->dnodeId;
eps->dnodeNum = 0; eps->dnodeNum = 0;
snprintf(eps->file, sizeof(eps->file), "%s/dnodeEps.json", tsDnodeDir); snprintf(eps->file, sizeof(eps->file), "%s/dnodeEps.json", tsDnodeDir);
pthread_mutex_init(&eps->mutex, NULL); pthread_mutex_init(&eps->mutex, NULL);
...@@ -269,10 +268,8 @@ bool dnodeIsDnodeEpChanged(DnEps *eps, int32_t dnodeId, char *epstr) { ...@@ -269,10 +268,8 @@ bool dnodeIsDnodeEpChanged(DnEps *eps, int32_t dnodeId, char *epstr) {
return changed; return changed;
} }
void dnodeGetDnodeEp(Dnode *dnode, int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) { void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) {
assert(dnode != NULL); DnEps *eps = dnodeInst()->eps;
DnEps *eps = dnode->eps;
pthread_mutex_lock(&eps->mutex); pthread_mutex_lock(&eps->mutex);
SDnodeEp *ep = taosHashGet(eps->dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *ep = taosHashGet(eps->dnodeHash, &dnodeId, sizeof(int32_t));
......
...@@ -28,202 +28,75 @@ ...@@ -28,202 +28,75 @@
#include "dnodeMain.h" #include "dnodeMain.h"
#include "dnodeMnodeEps.h" #include "dnodeMnodeEps.h"
#include "dnodeStatus.h" #include "dnodeStatus.h"
#include "dnodeTelemetry.h" #include "dnodeTelem.h"
#include "dnodeTrans.h" #include "dnodeTrans.h"
#include "mnode.h" #include "mnode.h"
#include "vnode.h" #include "vnode.h"
static int32_t dnodeInitRpcEnv(Dnode *dnode, void **unUsed) { return rpcInit(); } static Dnode tsDnode = {0};
static void dnodeCleanupRpcEnv(void **unUsed) { rpcCleanup(); }
#if 0 Dnode *dnodeInst() { return &tsDnode; }
static int32_t dnodeInitTfsEnv(Dnode *dnode, void **unUsed) { return tfInit(); }
static void dnodeCleanupTfsEnv(void **unUsed) { tfCleanup(); }
static int32_t dnodeInitScriptEnv(Dnode *dnode, void **unUsed) { return scriptEnvPoolInit(); }
static void dnodeCleanupScriptEnv(void **unUsed) { scriptEnvPoolCleanup(); }
static int32_t dnodeInitWalEnv(Dnode *dnode, void **unUsed) { return walInit(); }
static void dnodeCleanupWalEnv(void **unUsed) { walCleanUp(); }
static int32_t dnodeInitSyncEnv(Dnode *dnode, void **unUsed) { return syncInit(); }
static void dnodeCleanupSyncEnv(void **unUsed) { syncCleanUp(); }
#endif
static int32_t dnodeInitVnodeModule(Dnode *dnode, struct Vnode** out) { static int32_t dnodeInitVnodeModule(void **unused) {
SVnodePara para; SVnodePara para;
para.fp.GetDnodeEp = dnodeGetDnodeEp; para.fp.GetDnodeEp = dnodeGetDnodeEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode; para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode; para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.dnode = dnode;
struct Vnode *vnode = vnodeCreateInstance(para);
if (vnode == NULL) {
return -1;
}
*out = vnode; return vnodeInit(para);
return 0;
} }
static void dnodeCleanupVnodeModule(Dnode *dnode, struct Vnode **out) { static int32_t dnodeInitMnodeModule(void **unused) {
struct Vnode *vnode = *out; Dnode *dnode = dnodeInst();
*out = NULL;
vnodeDropInstance(vnode);
}
static int32_t dnodeInitMnodeModule(Dnode *dnode, struct Mnode **out) {
SMnodePara para; SMnodePara para;
para.fp.GetDnodeEp = dnodeGetDnodeEp; para.fp.GetDnodeEp = dnodeGetDnodeEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode; para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode; para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.fp.SendRedirectMsg = dnodeSendRedirectMsg; para.fp.SendRedirectMsg = dnodeSendRedirectMsg;
para.dnode = dnode;
para.dnodeId = dnode->cfg->dnodeId; para.dnodeId = dnode->cfg->dnodeId;
strncpy(para.clusterId, dnode->cfg->clusterId, sizeof(para.clusterId)); strncpy(para.clusterId, dnode->cfg->clusterId, sizeof(para.clusterId));
struct Mnode *mnode = mnodeCreateInstance(para); return mnodeInit(para);
if (mnode == NULL) {
return -1;
}
*out = mnode;
return 0;
}
static void dnodeCleanupMnodeModule(Dnode *dnode, struct Mnode **out) {
struct Mnode *mnode = *out;
*out = NULL;
mnodeDropInstance(mnode);
} }
Dnode *dnodeCreateInstance() { int32_t dnodeInit() {
Dnode *dnode = calloc(1, sizeof(Dnode)); struct SSteps *steps = taosStepInit(24, dnodeReportStartup);
if (dnode == NULL) { if (steps == NULL) return -1;
return NULL;
} Dnode *dnode = dnodeInst();
SSteps *steps = taosStepInit(24); taosStepAdd(steps, "dnode-main", (void **)&dnode->main, (InitFp)dnodeInitMain, (CleanupFp)dnodeCleanupMain);
if (steps == NULL) { taosStepAdd(steps, "dnode-storage", NULL, (InitFp)dnodeInitStorage, (CleanupFp)dnodeCleanupStorage);
return NULL; //taosStepAdd(steps, "dnode-tfs-env", NULL, (InitFp)tfInit, (CleanupFp)tfCleanup);
} taosStepAdd(steps, "dnode-rpc-env", NULL, (InitFp)rpcInit, (CleanupFp)rpcCleanup);
taosStepAdd(steps, "dnode-check", (void **)&dnode->check, (InitFp)dnodeInitCheck, (CleanupFp)dnodeCleanupCheck);
SStepObj step = {0}; taosStepAdd(steps, "dnode-cfg", (void **)&dnode->cfg, (InitFp)dnodeInitCfg, (CleanupFp)dnodeCleanupCfg);
step.parent = dnode; taosStepAdd(steps, "dnode-deps", (void **)&dnode->eps, (InitFp)dnodeInitEps, (CleanupFp)dnodeCleanupEps);
taosStepAdd(steps, "dnode-meps", (void **)&dnode->meps, (InitFp)dnodeInitMnodeEps, (CleanupFp)dnodeCleanupMnodeEps);
step.name = "dnode-main"; //taosStepAdd(steps, "dnode-wal", NULL, (InitFp)walInit, (CleanupFp)walCleanUp);
step.self = (void **)&dnode->main; //taosStepAdd(steps, "dnode-sync", NULL, (InitFp)syncInit, (CleanupFp)syncCleanUp);
step.initFp = (FnInitObj)dnodeInitMain; taosStepAdd(steps, "dnode-vnode", NULL, (InitFp)dnodeInitVnodeModule, (CleanupFp)vnodeCleanup);
step.cleanupFp = (FnCleanupObj)dnodeCleanupMain; taosStepAdd(steps, "dnode-mnode", NULL, (InitFp)dnodeInitMnodeModule, (CleanupFp)mnodeCleanup);
step.reportFp = NULL; taosStepAdd(steps, "dnode-trans", (void **)&dnode->trans, (InitFp)dnodeInitTrans, (CleanupFp)dnodeCleanupTrans);
taosStepAdd(steps, &step); taosStepAdd(steps, "dnode-status", (void **)&dnode->status, (InitFp)dnodeInitStatus, (CleanupFp)dnodeCleanupStatus);
taosStepAdd(steps, "dnode-telem", (void **)&dnode->meps, (InitFp)dnodeInitTelem, (CleanupFp)dnodeCleanupTelem);
step.name = "dnode-storage"; //taosStepAdd(steps, "dnode-script", NULL, (InitFp)scriptEnvPoolInit, (CleanupFp)scriptEnvPoolCleanup);
step.self = NULL;
step.initFp = (FnInitObj)dnodeInitStorage;
step.cleanupFp = (FnCleanupObj)dnodeCleanupStorage;
step.reportFp = (FnReportProgress)dnodeReportStartup;
taosStepAdd(steps, &step);
#if 0
step.name = "dnode-tfs-env";
step.self = NULL;
step.initFp = (FnInitObj)dnodeInitTfsEnv;
step.cleanupFp = (FnCleanupObj)dnodeCleanupTfsEnv;
taosStepAdd(steps, &step);
#endif
step.name = "dnode-rpc-env";
step.self = NULL;
step.initFp = (FnInitObj)dnodeInitRpcEnv;
step.cleanupFp = (FnCleanupObj)dnodeCleanupRpcEnv;
taosStepAdd(steps, &step);
step.name = "dnode-check";
step.self = (void **)&dnode->check;
step.initFp = (FnInitObj)dnodeInitCheck;
step.cleanupFp = (FnCleanupObj)dnodeCleanupCheck;
taosStepAdd(steps, &step);
step.name = "dnode-cfg";
step.self = (void **)&dnode->cfg;
step.initFp = (FnInitObj)dnodeInitCfg;
step.cleanupFp = (FnCleanupObj)dnodeCleanupCfg;
taosStepAdd(steps, &step);
step.name = "dnode-deps";
step.self = (void **)&dnode->eps;
step.initFp = (FnInitObj)dnodeInitEps;
step.cleanupFp = (FnCleanupObj)dnodeCleanupEps;
taosStepAdd(steps, &step);
step.name = "dnode-meps";
step.self = (void **)&dnode->meps;
step.initFp = (FnInitObj)dnodeInitMnodeEps;
step.cleanupFp = (FnCleanupObj)dnodeCleanupMnodeEps;
taosStepAdd(steps, &step);
#if 0
step.name = "dnode-wal";
step.self = NULL;
step.initFp = (FnInitObj)dnodeInitWalEnv;
step.cleanupFp = (FnCleanupObj)dnodeCleanupWalEnv;
taosStepAdd(steps, &step);
step.name = "dnode-sync";
step.self = NULL;
step.initFp = (FnInitObj)dnodeInitSyncEnv;
step.cleanupFp = (FnCleanupObj)dnodeCleanupSyncEnv;
taosStepAdd(steps, &step);
#endif
step.name = "dnode-vnode";
step.self = (void **)&dnode->vnode;
step.initFp = (FnInitObj)dnodeInitVnodeModule;
step.cleanupFp = (FnCleanupObj)dnodeCleanupVnodeModule;
taosStepAdd(steps, &step);
step.name = "dnode-mnode";
step.self = (void **)&dnode->mnode;
step.initFp = (FnInitObj)dnodeInitMnodeModule;
step.cleanupFp = (FnCleanupObj)dnodeCleanupMnodeModule;
taosStepAdd(steps, &step);
step.name = "dnode-trans";
step.self = (void **)&dnode->trans;
step.initFp = (FnInitObj)dnodeInitTrans;
step.cleanupFp = (FnCleanupObj)dnodeCleanupTrans;
taosStepAdd(steps, &step);
step.name = "dnode-status";
step.self = (void **)&dnode->status;
step.initFp = (FnInitObj)dnodeInitStatus;
step.cleanupFp = (FnCleanupObj)dnodeCleanupStatus;
taosStepAdd(steps, &step);
step.name = "dnode-telem";
step.self = (void **)&dnode->telem;
step.initFp = (FnInitObj)dnodeInitTelemetry;
step.cleanupFp = (FnCleanupObj)dnodeCleanupTelemetry;
taosStepAdd(steps, &step);
#if 0
step.name = "dnode-script";
step.self = NULL;
step.initFp = (FnInitObj)dnodeInitScriptEnv;
step.cleanupFp = (FnCleanupObj)dnodeCleanupScriptEnv;
taosStepAdd(steps, &step);
#endif
dnode->steps = steps; dnode->steps = steps;
taosStepExec(dnode->steps); taosStepExec(dnode->steps);
if (dnode->main) { if (dnode->main) {
dnode->main->runStatus = TD_RUN_STAT_RUNNING; dnode->main->runStatus = TD_RUN_STAT_RUNNING;
dnodeReportStartupFinished(dnode, "TDengine", "initialized successfully"); dnodeReportStartupFinished("TDengine", "initialized successfully");
dInfo("TDengine is initialized successfully"); dInfo("TDengine is initialized successfully");
} }
return dnode; return 0;
} }
void dnodeDropInstance(Dnode *dnode) { void dnodeCleanup() {
Dnode *dnode = dnodeInst();
if (dnode->main->runStatus != TD_RUN_STAT_STOPPED) { if (dnode->main->runStatus != TD_RUN_STAT_STOPPED) {
dnode->main->runStatus = TD_RUN_STAT_STOPPED; dnode->main->runStatus = TD_RUN_STAT_STOPPED;
taosStepCleanup(dnode->steps); taosStepCleanup(dnode->steps);
......
...@@ -55,11 +55,10 @@ void dnodePrintDiskInfo() { ...@@ -55,11 +55,10 @@ void dnodePrintDiskInfo() {
dInfo("=================================="); dInfo("==================================");
} }
int32_t dnodeInitMain(Dnode *dnode, DnMain **out) { int32_t dnodeInitMain(DnMain **out) {
DnMain* main = calloc(1, sizeof(DnMain)); DnMain* main = calloc(1, sizeof(DnMain));
if (main == NULL) return -1; if (main == NULL) return -1;
main->dnode = dnode;
main->runStatus = TD_RUN_STAT_STOPPED; main->runStatus = TD_RUN_STAT_STOPPED;
main->dnodeTimer = taosTmrInit(100, 200, 60000, "DND-TMR"); main->dnodeTimer = taosTmrInit(100, 200, 60000, "DND-TMR");
if (main->dnodeTimer == NULL) { if (main->dnodeTimer == NULL) {
...@@ -75,9 +74,8 @@ int32_t dnodeInitMain(Dnode *dnode, DnMain **out) { ...@@ -75,9 +74,8 @@ int32_t dnodeInitMain(Dnode *dnode, DnMain **out) {
taosResolveCRC(); taosResolveCRC();
taosInitGlobalCfg(); taosInitGlobalCfg();
taosReadGlobalLogCfg(); taosReadGlobalLogCfg();
#if 0
taosSetCoreDump(tsEnableCoreFile); taosSetCoreDump(tsEnableCoreFile);
#endif
if (!taosMkDir(tsLogDir)) { if (!taosMkDir(tsLogDir)) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
...@@ -121,7 +119,7 @@ void dnodeCleanupMain(DnMain **out) { ...@@ -121,7 +119,7 @@ void dnodeCleanupMain(DnMain **out) {
free(main); free(main);
} }
int32_t dnodeInitStorage(Dnode *dnode, void **m) { int32_t dnodeInitStorage() {
#ifdef TD_TSZ #ifdef TD_TSZ
// compress module init // compress module init
tsCompressInit(); tsCompressInit();
...@@ -191,7 +189,7 @@ int32_t dnodeInitStorage(Dnode *dnode, void **m) { ...@@ -191,7 +189,7 @@ int32_t dnodeInitStorage(Dnode *dnode, void **m) {
return 0; return 0;
} }
void dnodeCleanupStorage(void **m) { void dnodeCleanupStorage() {
#if 0 #if 0
// storage destroy // storage destroy
tfsDestroy(); tfsDestroy();
...@@ -203,23 +201,26 @@ void dnodeCleanupStorage(void **m) { ...@@ -203,23 +201,26 @@ void dnodeCleanupStorage(void **m) {
#endif #endif
} }
void dnodeReportStartup(Dnode *dnode, char *name, char *desc) { void dnodeReportStartup(char *name, char *desc) {
Dnode *dnode = dnodeInst();
SStartupStep *startup = &dnode->main->startup; SStartupStep *startup = &dnode->main->startup;
tstrncpy(startup->name, name, strlen(startup->name)); tstrncpy(startup->name, name, strlen(startup->name));
tstrncpy(startup->desc, desc, strlen(startup->desc)); tstrncpy(startup->desc, desc, strlen(startup->desc));
startup->finished = 0; startup->finished = 0;
} }
void dnodeReportStartupFinished(Dnode *dnode, char *name, char *desc) { void dnodeReportStartupFinished(char *name, char *desc) {
Dnode *dnode = dnodeInst();
SStartupStep *startup = &dnode->main->startup; SStartupStep *startup = &dnode->main->startup;
tstrncpy(startup->name, name, strlen(startup->name)); tstrncpy(startup->name, name, strlen(startup->name));
tstrncpy(startup->desc, desc, strlen(startup->desc)); tstrncpy(startup->desc, desc, strlen(startup->desc));
startup->finished = 1; startup->finished = 1;
} }
void dnodeProcessStartupReq(Dnode *dnode, SRpcMsg *pMsg) { void dnodeProcessStartupReq(SRpcMsg *pMsg) {
dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont); dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont);
Dnode *dnode = dnodeInst();
SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep)); SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
memcpy(pStep, &dnode->main->startup, sizeof(SStartupStep)); memcpy(pStep, &dnode->main->startup, sizeof(SStartupStep));
...@@ -230,10 +231,11 @@ void dnodeProcessStartupReq(Dnode *dnode, SRpcMsg *pMsg) { ...@@ -230,10 +231,11 @@ void dnodeProcessStartupReq(Dnode *dnode, SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
static int32_t dnodeStartMnode(Dnode *dnode, SRpcMsg *pMsg) { static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
Dnode *dnode = dnodeInst();
SCreateMnodeMsg *pCfg = pMsg->pCont; SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
if (pCfg->dnodeId != dnodeGetDnodeId(dnode->cfg)) { if (pCfg->dnodeId != dnode->cfg->dnodeId) {
dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId,
dnodeGetDnodeId(dnode->cfg)); dnodeGetDnodeId(dnode->cfg));
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
...@@ -252,11 +254,11 @@ static int32_t dnodeStartMnode(Dnode *dnode, SRpcMsg *pMsg) { ...@@ -252,11 +254,11 @@ static int32_t dnodeStartMnode(Dnode *dnode, SRpcMsg *pMsg) {
if (mnodeIsServing(dnode->mnode)) return 0; if (mnodeIsServing(dnode->mnode)) return 0;
return mnodeDeploy(dnode->mnode, &pCfg->mnodes); return mnodeDeploy(&pCfg->mnodes);
} }
void dnodeProcessCreateMnodeReq(Dnode *dnode, SRpcMsg *pMsg) { void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) {
int32_t code = dnodeStartMnode(dnode, pMsg); int32_t code = dnodeStartMnode(pMsg);
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
...@@ -264,7 +266,7 @@ void dnodeProcessCreateMnodeReq(Dnode *dnode, SRpcMsg *pMsg) { ...@@ -264,7 +266,7 @@ void dnodeProcessCreateMnodeReq(Dnode *dnode, SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
void dnodeProcessConfigDnodeReq(Dnode *dnode, SRpcMsg *pMsg) { void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) {
SCfgDnodeMsg *pCfg = pMsg->pCont; SCfgDnodeMsg *pCfg = pMsg->pCont;
int32_t code = taosCfgDynamicOptions(pCfg->config); int32_t code = taosCfgDynamicOptions(pCfg->config);
......
...@@ -179,7 +179,7 @@ PARSE_MINFOS_OVER: ...@@ -179,7 +179,7 @@ PARSE_MINFOS_OVER:
for (int32_t i = 0; i < mInfos.mnodeNum; ++i) { for (int32_t i = 0; i < mInfos.mnodeNum; ++i) {
SMInfo *mInfo = &mInfos.mnodeInfos[i]; SMInfo *mInfo = &mInfos.mnodeInfos[i];
dnodeGetDnodeEp(meps->dnode, mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL); dnodeGetDnodeEp(mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL);
} }
dnodeResetMnodeEps(meps, &mInfos); dnodeResetMnodeEps(meps, &mInfos);
...@@ -191,8 +191,8 @@ PARSE_MINFOS_OVER: ...@@ -191,8 +191,8 @@ PARSE_MINFOS_OVER:
return 0; return 0;
} }
void dnodeSendRedirectMsg(struct Dnode *dnode, SRpcMsg *rpcMsg, bool forShell) { void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
DnMnEps *meps = dnode->meps; DnMnEps *meps = dnodeInst()->meps;
SRpcConnInfo connInfo = {0}; SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo); rpcGetConnInfo(rpcMsg->handle, &connInfo);
...@@ -222,17 +222,16 @@ void dnodeSendRedirectMsg(struct Dnode *dnode, SRpcMsg *rpcMsg, bool forShell) { ...@@ -222,17 +222,16 @@ void dnodeSendRedirectMsg(struct Dnode *dnode, SRpcMsg *rpcMsg, bool forShell) {
rpcSendRedirectRsp(rpcMsg->handle, &epSet); rpcSendRedirectRsp(rpcMsg->handle, &epSet);
} }
int32_t dnodeInitMnodeEps(Dnode *dnode, DnMnEps **out) { int32_t dnodeInitMnodeEps(DnMnEps **out) {
DnMnEps *meps = calloc(1, sizeof(DnMnEps)); DnMnEps *meps = calloc(1, sizeof(DnMnEps));
if (meps == NULL) return -1; if (meps == NULL) return -1;
meps->dnode = dnode;
snprintf(meps->file, sizeof(meps->file), "%s/mnodeEpSet.json", tsDnodeDir); snprintf(meps->file, sizeof(meps->file), "%s/mnodeEpSet.json", tsDnodeDir);
pthread_mutex_init(&meps->mutex, NULL); pthread_mutex_init(&meps->mutex, NULL);
*out = meps; *out = meps;
dnodeResetMnodeEps(meps, NULL); dnodeResetMnodeEps(meps, NULL);
int32_t ret = dnodeReadMnodeEps(meps, dnode->eps); int32_t ret = dnodeReadMnodeEps(meps, dnodeInst()->eps);
if (ret == 0) { if (ret == 0) {
dInfo("dnode mInfos is initialized"); dInfo("dnode mInfos is initialized");
} }
......
...@@ -46,7 +46,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { ...@@ -46,7 +46,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
return; return;
} }
Dnode *dnode = status->dnode; Dnode *dnode = dnodeInst();
dnodeGetCfg(dnode->cfg, &pStatus->dnodeId, pStatus->clusterId); dnodeGetCfg(dnode->cfg, &pStatus->dnodeId, pStatus->clusterId);
pStatus->dnodeId = htonl(dnodeGetDnodeId(dnode->cfg)); pStatus->dnodeId = htonl(dnodeGetDnodeId(dnode->cfg));
pStatus->version = htonl(tsVersion); pStatus->version = htonl(tsVersion);
...@@ -76,16 +76,17 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { ...@@ -76,16 +76,17 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
pStatus->clusterCfg.slaveQuery = tsEnableSlaveQuery; pStatus->clusterCfg.slaveQuery = tsEnableSlaveQuery;
pStatus->clusterCfg.adjustMaster = tsEnableAdjustMaster; pStatus->clusterCfg.adjustMaster = tsEnableAdjustMaster;
vnodeGetStatus(dnode->vnode, pStatus); vnodeGetStatus(pStatus);
contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
pStatus->openVnodes = htons(pStatus->openVnodes); pStatus->openVnodes = htons(pStatus->openVnodes);
SRpcMsg rpcMsg = {.ahandle = status, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS}; SRpcMsg rpcMsg = {.ahandle = status, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS};
dnodeSendMsgToMnode(status->dnode, &rpcMsg); dnodeSendMsgToMnode(&rpcMsg);
} }
void dnodeProcessStatusRsp(Dnode *dnode, SRpcMsg *pMsg) { void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
Dnode *dnode = dnodeInst();
DnStatus *status = pMsg->ahandle; DnStatus *status = pMsg->ahandle;
if (pMsg->code != TSDB_CODE_SUCCESS) { if (pMsg->code != TSDB_CODE_SUCCESS) {
...@@ -114,7 +115,7 @@ void dnodeProcessStatusRsp(Dnode *dnode, SRpcMsg *pMsg) { ...@@ -114,7 +115,7 @@ void dnodeProcessStatusRsp(Dnode *dnode, SRpcMsg *pMsg) {
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
dnodeUpdateCfg(dnode->cfg, pCfg); dnodeUpdateCfg(dnode->cfg, pCfg);
vnodeSetAccess(dnode->vnode, pStatusRsp->vgAccess, pCfg->numOfVnodes); vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess)); SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess));
dnodeUpdateEps(dnode->eps, pEps); dnodeUpdateEps(dnode->eps, pEps);
...@@ -122,17 +123,14 @@ void dnodeProcessStatusRsp(Dnode *dnode, SRpcMsg *pMsg) { ...@@ -122,17 +123,14 @@ void dnodeProcessStatusRsp(Dnode *dnode, SRpcMsg *pMsg) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, status, status->dnodeTimer, &status->statusTimer); taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, status, status->dnodeTimer, &status->statusTimer);
} }
int32_t dnodeInitStatus(Dnode *dnode, DnStatus **out) { int32_t dnodeInitStatus(DnStatus **out) {
DnStatus *status = calloc(1, sizeof(DnStatus)); DnStatus *status = calloc(1, sizeof(DnStatus));
if (status == NULL) return -1; if (status == NULL) return -1;
status->dnode = dnode;
status->statusTimer = NULL; status->statusTimer = NULL;
status->dnodeTimer = dnode->main->dnodeTimer; status->dnodeTimer = dnodeInst()->main->dnodeTimer;
status->rebootTime = taosGetTimestampSec(); status->rebootTime = taosGetTimestampSec();
taosTmrReset(dnodeSendStatusMsg, 500, status, status->dnodeTimer, &status->statusTimer); taosTmrReset(dnodeSendStatusMsg, 500, status, status->dnodeTimer, &status->statusTimer);
*out = status; *out = status;
dInfo("dnode status timer is initialized"); dInfo("dnode status timer is initialized");
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "tbuffer.h" #include "tbuffer.h"
#include "tglobal.h" #include "tglobal.h"
#include "dnodeCfg.h" #include "dnodeCfg.h"
#include "dnodeTelemetry.h" #include "dnodeTelem.h"
#include "mnode.h" #include "mnode.h"
#define TELEMETRY_SERVER "telemetry.taosdata.com" #define TELEMETRY_SERVER "telemetry.taosdata.com"
...@@ -163,7 +163,7 @@ static void dnodeAddVersionInfo(DnTelem* telem, SBufferWriter* bw) { ...@@ -163,7 +163,7 @@ static void dnodeAddVersionInfo(DnTelem* telem, SBufferWriter* bw) {
static void dnodeAddRuntimeInfo(DnTelem* telem, SBufferWriter* bw) { static void dnodeAddRuntimeInfo(DnTelem* telem, SBufferWriter* bw) {
SMnodeStat stat = {0}; SMnodeStat stat = {0};
if (mnodeGetStatistics(telem->dnode->mnode, &stat) != 0) { if (mnodeGetStatistics(&stat) != 0) {
return; return;
} }
...@@ -192,9 +192,10 @@ static void dnodeSendTelemetryReport(DnTelem* telem) { ...@@ -192,9 +192,10 @@ static void dnodeSendTelemetryReport(DnTelem* telem) {
return; return;
} }
Dnode *dnode = dnodeInst();
SBufferWriter bw = tbufInitWriter(NULL, false); SBufferWriter bw = tbufInitWriter(NULL, false);
dnodeBeginObject(&bw); dnodeBeginObject(&bw);
dnodeAddStringField(&bw, "instanceId", telem->dnode->cfg->clusterId); dnodeAddStringField(&bw, "instanceId", dnode->cfg->clusterId);
dnodeAddIntField(&bw, "reportVersion", 1); dnodeAddIntField(&bw, "reportVersion", 1);
dnodeAddOsInfo(&bw); dnodeAddOsInfo(&bw);
dnodeAddCpuInfo(&bw); dnodeAddCpuInfo(&bw);
...@@ -243,7 +244,7 @@ static void* dnodeTelemThreadFp(void* param) { ...@@ -243,7 +244,7 @@ static void* dnodeTelemThreadFp(void* param) {
if (r == 0) break; if (r == 0) break;
if (r != ETIMEDOUT) continue; if (r != ETIMEDOUT) continue;
if (mnodeIsServing(telem->dnode->mnode)) { if (mnodeIsServing()) {
dnodeSendTelemetryReport(telem); dnodeSendTelemetryReport(telem);
} }
end.tv_sec += REPORT_INTERVAL; end.tv_sec += REPORT_INTERVAL;
...@@ -265,11 +266,10 @@ static void dnodeGetEmail(DnTelem* telem, char* filepath) { ...@@ -265,11 +266,10 @@ static void dnodeGetEmail(DnTelem* telem, char* filepath) {
taosCloseFile(fd); taosCloseFile(fd);
} }
int32_t dnodeInitTelemetry(Dnode* dnode, DnTelem** out) { int32_t dnodeInitTelem(DnTelem** out) {
DnTelem* telem = calloc(1, sizeof(DnTelem)); DnTelem* telem = calloc(1, sizeof(DnTelem));
if (telem == NULL) return -1; if (telem == NULL) return -1;
telem->dnode = dnode;
telem->enable = tsEnableTelemetryReporting; telem->enable = tsEnableTelemetryReporting;
*out = telem; *out = telem;
...@@ -296,7 +296,7 @@ int32_t dnodeInitTelemetry(Dnode* dnode, DnTelem** out) { ...@@ -296,7 +296,7 @@ int32_t dnodeInitTelemetry(Dnode* dnode, DnTelem** out) {
return 0; return 0;
} }
void dnodeCleanupTelemetry(DnTelem** out) { void dnodeCleanupTelem(DnTelem** out) {
DnTelem* telem = *out; DnTelem* telem = *out;
*out = NULL; *out = NULL;
......
...@@ -29,13 +29,13 @@ ...@@ -29,13 +29,13 @@
#include "vnode.h" #include "vnode.h"
#include "mnode.h" #include "mnode.h"
static void dnodeProcessPeerReq(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
Dnode * dnode = trans->dnode; Dnode * dnode = dnodeInst();
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
if (pMsg->pCont == NULL) return; if (pMsg->pCont == NULL) return;
if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) { if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
dnodeProcessStartupReq(dnode, pMsg); dnodeProcessStartupReq(pMsg);
return; return;
} }
...@@ -53,9 +53,9 @@ static void dnodeProcessPeerReq(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pEpSet ...@@ -53,9 +53,9 @@ static void dnodeProcessPeerReq(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pEpSet
return; return;
} }
DnMsgFp fp = trans->fpPeerMsg[pMsg->msgType]; RpcMsgFp fp = dnode->trans->peerMsgFp[pMsg->msgType];
if (fp.fp != NULL) { if (fp != NULL) {
(*fp.fp)(fp.module, pMsg); (*fp)(pMsg);
} else { } else {
dDebug("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); dDebug("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]);
rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
...@@ -65,41 +65,36 @@ static void dnodeProcessPeerReq(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pEpSet ...@@ -65,41 +65,36 @@ static void dnodeProcessPeerReq(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pEpSet
} }
int32_t dnodeInitServer(DnTrans *trans) { int32_t dnodeInitServer(DnTrans *trans) {
struct Dnode *dnode = trans->dnode; trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg;
struct Vnode *vnode = dnode->vnode; trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg;
struct Mnode *mnode = dnode->mnode; trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CREATE_TABLE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg};
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_DROP_TABLE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_ALTER_TABLE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_DROP_STABLE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CREATE_VNODE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_ALTER_VNODE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_SYNC_VNODE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg};
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeReq;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_DROP_VNODE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeReq;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_ALTER_STREAM] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg};
trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = (DnMsgFp){.module = dnode, .fp = (RpcMsgFp)dnodeProcessConfigDnodeReq}; trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CREATE_MNODE] = (DnMsgFp){.module = dnode, .fp = (RpcMsgFp)dnodeProcessCreateMnodeReq}; trans->peerMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg};
trans->fpPeerMsg[TSDB_MSG_TYPE_DM_AUTH] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg};
trans->fpPeerMsg[TSDB_MSG_TYPE_DM_GRANT] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg};
trans->fpPeerMsg[TSDB_MSG_TYPE_DM_STATUS] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg};
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeDnodePort; rpcInit.localPort = tsDnodeDnodePort;
rpcInit.label = "DND-S"; rpcInit.label = "DND-S";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = (RpcMsgCfp)dnodeProcessPeerReq; rpcInit.cfp = dnodeProcessPeerReq;
rpcInit.sessions = TSDB_MAX_VNODES << 4; rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.owner = trans;
trans->serverRpc = rpcOpen(&rpcInit); trans->serverRpc = rpcOpen(&rpcInit);
if (trans->serverRpc == NULL) { if (trans->serverRpc == NULL) {
...@@ -119,8 +114,8 @@ void dnodeCleanupServer(DnTrans *trans) { ...@@ -119,8 +114,8 @@ void dnodeCleanupServer(DnTrans *trans) {
} }
} }
static void dnodeProcessRspFromPeer(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessRspFromPeer(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
Dnode *dnode = trans->dnode; Dnode *dnode = dnodeInst();
if (dnode->main->runStatus == TD_RUN_STAT_STOPPED) { if (dnode->main->runStatus == TD_RUN_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return; if (pMsg == NULL || pMsg->pCont == NULL) return;
dTrace("msg:%p is ignored since dnode is stopping", pMsg); dTrace("msg:%p is ignored since dnode is stopping", pMsg);
...@@ -132,9 +127,9 @@ static void dnodeProcessRspFromPeer(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pE ...@@ -132,9 +127,9 @@ static void dnodeProcessRspFromPeer(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pE
dnodeUpdateMnodeFromPeer(dnode->meps, pEpSet); dnodeUpdateMnodeFromPeer(dnode->meps, pEpSet);
} }
DnMsgFp fp = trans->fpPeerMsg[pMsg->msgType]; RpcMsgFp fp = dnode->trans->peerMsgFp[pMsg->msgType];
if (fp.fp != NULL) { if (fp != NULL) {
(*fp.fp)(fp.module, pMsg); (*fp)(pMsg);
} else { } else {
dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]);
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
...@@ -147,43 +142,39 @@ static void dnodeProcessRspFromPeer(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pE ...@@ -147,43 +142,39 @@ static void dnodeProcessRspFromPeer(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pE
} }
int32_t dnodeInitClient(DnTrans *trans) { int32_t dnodeInitClient(DnTrans *trans) {
struct Dnode *dnode = trans->dnode; trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg;
struct Mnode *mnode = dnode->mnode; trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg};
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg};
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP] = mnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = mnodeProcessMsg;
trans->fpPeerMsg[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->peerMsgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
trans->fpPeerMsg[TSDB_MSG_TYPE_DM_AUTH_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg};
trans->fpPeerMsg[TSDB_MSG_TYPE_DM_GRANT_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg};
trans->fpPeerMsg[TSDB_MSG_TYPE_DM_STATUS_RSP] = (DnMsgFp){.module = dnode, .fp = (RpcMsgFp)dnodeProcessStatusRsp};
char secret[TSDB_KEY_LEN] = "secret"; char secret[TSDB_KEY_LEN] = "secret";
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C"; rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = (RpcMsgCfp)dnodeProcessRspFromPeer; rpcInit.cfp = dnodeProcessRspFromPeer;
rpcInit.sessions = TSDB_MAX_VNODES << 4; rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "t"; rpcInit.user = "t";
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.secret = secret; rpcInit.secret = secret;
rpcInit.owner = trans;
trans->clientRpc = rpcOpen(&rpcInit); trans->clientRpc = rpcOpen(&rpcInit);
if (trans->clientRpc == NULL) { if (trans->clientRpc == NULL) {
...@@ -203,8 +194,8 @@ void dnodeCleanupClient(DnTrans *trans) { ...@@ -203,8 +194,8 @@ void dnodeCleanupClient(DnTrans *trans) {
} }
} }
static void dnodeProcessMsgFromShell(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
Dnode * dnode = trans->dnode; Dnode * dnode = dnodeInst();
SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
if (pMsg->pCont == NULL) return; if (pMsg->pCont == NULL) return;
...@@ -222,15 +213,16 @@ static void dnodeProcessMsgFromShell(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *p ...@@ -222,15 +213,16 @@ static void dnodeProcessMsgFromShell(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *p
return; return;
} }
DnTrans *trans = dnode->trans;
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) { if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
atomic_fetch_add_32(&trans->queryReqNum, 1); atomic_fetch_add_32(&trans->queryReqNum, 1);
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
atomic_fetch_add_32(&trans->submitReqNum, 1); atomic_fetch_add_32(&trans->submitReqNum, 1);
} else {} } else {}
DnMsgFp fp = trans->fpShellMsg[pMsg->msgType]; RpcMsgFp fp = trans->shellMsgFp[pMsg->msgType];
if (fp.fp != NULL) { if (fp != NULL) {
(*fp.fp)(fp.module, pMsg); (*fp)(pMsg);
} else { } else {
dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]);
rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
...@@ -254,32 +246,34 @@ static int32_t dnodeAuthNetTest(char *user, char *spi, char *encrypt, char *secr ...@@ -254,32 +246,34 @@ static int32_t dnodeAuthNetTest(char *user, char *spi, char *encrypt, char *secr
return -1; return -1;
} }
void dnodeSendMsgToDnode(Dnode *dnode, SRpcEpSet *epSet, SRpcMsg *rpcMsg) { void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
Dnode *dnode = dnodeInst();
rpcSendRequest(dnode->trans->clientRpc, epSet, rpcMsg, NULL); rpcSendRequest(dnode->trans->clientRpc, epSet, rpcMsg, NULL);
} }
void dnodeSendMsgToMnode(Dnode *dnode, SRpcMsg *rpcMsg) { void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
Dnode * dnode = dnodeInst();
SRpcEpSet epSet = {0}; SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(dnode->meps, &epSet); dnodeGetEpSetForPeer(dnode->meps, &epSet);
dnodeSendMsgToDnode(dnode, &epSet, rpcMsg); dnodeSendMsgToDnode(&epSet, rpcMsg);
} }
void dnodeSendMsgToMnodeRecv(Dnode *dnode, SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
Dnode * dnode = dnodeInst();
SRpcEpSet epSet = {0}; SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(dnode->meps, &epSet); dnodeGetEpSetForPeer(dnode->meps, &epSet);
rpcSendRecv(dnode->trans->clientRpc, &epSet, rpcMsg, rpcRsp); rpcSendRecv(dnode->trans->clientRpc, &epSet, rpcMsg, rpcRsp);
} }
void dnodeSendMsgToDnodeRecv(Dnode *dnode, SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
Dnode *dnode = dnodeInst();
rpcSendRecv(dnode->trans->clientRpc, epSet, rpcMsg, rpcRsp); rpcSendRecv(dnode->trans->clientRpc, epSet, rpcMsg, rpcRsp);
} }
static int32_t dnodeRetrieveUserAuthInfo(void *owner, char *user, char *spi, char *encrypt, char *secret, char *ckey) { static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
DnTrans *trans = owner;
if (dnodeAuthNetTest(user, spi, encrypt, secret, ckey) == 0) return 0; if (dnodeAuthNetTest(user, spi, encrypt, secret, ckey) == 0) return 0;
int32_t code = mnodeRetriveAuth(trans->dnode->mnode, user, spi, encrypt, secret, ckey); int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_APP_NOT_READY) return code; if (code != TSDB_CODE_APP_NOT_READY) return code;
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
...@@ -292,7 +286,7 @@ static int32_t dnodeRetrieveUserAuthInfo(void *owner, char *user, char *spi, cha ...@@ -292,7 +286,7 @@ static int32_t dnodeRetrieveUserAuthInfo(void *owner, char *user, char *spi, cha
dDebug("user:%s, send auth msg to mnodes", user); dDebug("user:%s, send auth msg to mnodes", user);
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
dnodeSendMsgToMnodeRecv(trans->dnode, &rpcMsg, &rpcRsp); dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
...@@ -310,55 +304,51 @@ static int32_t dnodeRetrieveUserAuthInfo(void *owner, char *user, char *spi, cha ...@@ -310,55 +304,51 @@ static int32_t dnodeRetrieveUserAuthInfo(void *owner, char *user, char *spi, cha
} }
int32_t dnodeInitShell(DnTrans *trans) { int32_t dnodeInitShell(DnTrans *trans) {
struct Dnode *dnode = trans->dnode; trans->shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg;
struct Vnode *vnode = dnode->vnode; trans->shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg;
struct Mnode *mnode = dnode->mnode; trans->shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg;
trans->shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_SUBMIT] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg};
trans->fpShellMsg[TSDB_MSG_TYPE_QUERY] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg};
trans->fpShellMsg[TSDB_MSG_TYPE_FETCH] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg};
trans->fpShellMsg[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg};
// the following message shall be treated as mnode write // the following message shall be treated as mnode write
trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_ACCT] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_ALTER_ACCT] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_ACCT] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_USER] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_ALTER_USER] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_USER] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_DNODE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_DNODE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_DB] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_TP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_DB] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_SYNC_DB] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_TP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_ALTER_DB] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_ALTER_TP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_TABLE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_TABLE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_ALTER_TABLE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_ALTER_STREAM] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_KILL_QUERY] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_KILL_STREAM] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_KILL_CONN] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessMsg;
// the following message shall be treated as mnode query // the following message shall be treated as mnode query
trans->fpShellMsg[TSDB_MSG_TYPE_CM_HEARTBEAT] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_CONNECT] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_USE_DB] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_TABLE_META] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_TABLES_META] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_SHOW] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_RETRIEVE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; trans->shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessMsg;
trans->fpShellMsg[TSDB_MSG_TYPE_NETWORK_TEST] = (DnMsgFp){.module = dnode, .fp = (RpcMsgFp)dnodeProcessStartupReq}; trans->shellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
...@@ -370,12 +360,11 @@ int32_t dnodeInitShell(DnTrans *trans) { ...@@ -370,12 +360,11 @@ int32_t dnodeInitShell(DnTrans *trans) {
rpcInit.localPort = tsDnodeShellPort; rpcInit.localPort = tsDnodeShellPort;
rpcInit.label = "SHELL"; rpcInit.label = "SHELL";
rpcInit.numOfThreads = numOfThreads; rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = (RpcMsgCfp)dnodeProcessMsgFromShell; rpcInit.cfp = dnodeProcessMsgFromShell;
rpcInit.sessions = tsMaxShellConns; rpcInit.sessions = tsMaxShellConns;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.afp = dnodeRetrieveUserAuthInfo; rpcInit.afp = dnodeRetrieveUserAuthInfo;
rpcInit.owner = trans;
trans->shellRpc = rpcOpen(&rpcInit); trans->shellRpc = rpcOpen(&rpcInit);
if (trans->shellRpc == NULL) { if (trans->shellRpc == NULL) {
...@@ -394,11 +383,10 @@ void dnodeCleanupShell(DnTrans *trans) { ...@@ -394,11 +383,10 @@ void dnodeCleanupShell(DnTrans *trans) {
} }
} }
int32_t dnodeInitTrans(Dnode *dnode, DnTrans **out) { int32_t dnodeInitTrans(DnTrans **out) {
DnTrans* trans = calloc(1, sizeof(DnTrans)); DnTrans *trans = calloc(1, sizeof(DnTrans));
if (trans == NULL) return -1; if (trans == NULL) return -1;
trans->dnode = dnode;
*out = trans; *out = trans;
if (dnodeInitClient(trans) != 0) { if (dnodeInitClient(trans) != 0) {
......
...@@ -15,22 +15,18 @@ ...@@ -15,22 +15,18 @@
#include "mnodeInt.h" #include "mnodeInt.h"
struct Mnode *mnodeCreateInstance(SMnodePara para) { int32_t mnodeInit(SMnodePara para) { return 0; }
return NULL;
}
void mnodeDropInstance(struct Mnode *vnode) {} void mnodeCleanup() {}
int32_t mnodeDeploy(struct Mnode *mnode, struct SMInfos *minfos) { return 0; } int32_t mnodeDeploy(struct SMInfos *minfos) { return 0; }
void mnodeUnDeploy(struct Mnode *mnode) {} void mnodeUnDeploy() {}
bool mnodeIsServing(struct Mnode *mnode) { return false; } bool mnodeIsServing() { return false; }
int32_t mnodeGetStatistics(struct Mnode *mnode, SMnodeStat *stat) { return 0; } int32_t mnodeGetStatistics(SMnodeStat *stat) { return 0; }
int32_t mnodeRetriveAuth(struct Mnode *mnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { return 0; }
return 0;
}
void mnodeProcessMsg(struct Mnode *mnode, SRpcMsg *rpcMsg) {} void mnodeProcessMsg(SRpcMsg *rpcMsg) {}
...@@ -29,8 +29,8 @@ static void setSignalHandler() { ...@@ -29,8 +29,8 @@ static void setSignalHandler() {
int main(int argc, char const *argv[]) { int main(int argc, char const *argv[]) {
setSignalHandler(); setSignalHandler();
struct Dnode *dnode = dnodeCreateInstance(); int32_t code = dnodeInit();
if (dnode == NULL) { if (code != 0) {
uInfo("Failed to start TDengine, please check the log at:%s", tsLogDir); uInfo("Failed to start TDengine, please check the log at:%s", tsLogDir);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
...@@ -42,7 +42,7 @@ int main(int argc, char const *argv[]) { ...@@ -42,7 +42,7 @@ int main(int argc, char const *argv[]) {
} }
uInfo("TDengine is shut down!"); uInfo("TDengine is shut down!");
dnodeDropInstance(dnode); dnodeCleanup();
return 0; return 0;
} }
...@@ -15,16 +15,14 @@ ...@@ -15,16 +15,14 @@
#include "vnodeInt.h" #include "vnodeInt.h"
struct Vnode *vnodeCreateInstance(SVnodePara para) { int32_t vnodeInit(SVnodePara para) { return 0; }
return NULL;
}
void vnodeDropInstance(struct Vnode *vnode) {} void vnodeCleanup() {}
int32_t vnodeGetStatistics(struct Vnode *vnode, SVnodeStat *stat) { return 0; } int32_t vnodeGetStatistics(SVnodeStat *stat) { return 0; }
void vnodeGetStatus(struct Vnode *vnode, struct SStatusMsg *status) {} void vnodeGetStatus(struct SStatusMsg *status) {}
void vnodeSetAccess(struct Vnode *vnode, struct SVgroupAccess *access, int32_t numOfVnodes) {} void vnodeSetAccess(struct SVgroupAccess *access, int32_t numOfVnodes) {}
void vnodeProcessMsg(struct Vnode *vnode, SRpcMsg *msg) {} void vnodeProcessMsg(SRpcMsg *msg) {}
...@@ -18,32 +18,47 @@ ...@@ -18,32 +18,47 @@
#include "ulog.h" #include "ulog.h"
#include "tstep.h" #include "tstep.h"
SSteps *taosStepInit(int32_t maxsize) { typedef struct SStepObj {
char * name;
void ** self;
InitFp initFp;
CleanupFp cleanupFp;
} SStep;
typedef struct SSteps {
int32_t cursize;
int32_t maxsize;
SStep * steps;
ReportFp reportFp;
} SSteps;
SSteps *taosStepInit(int32_t maxsize, ReportFp fp) {
SSteps *steps = calloc(1, sizeof(SSteps)); SSteps *steps = calloc(1, sizeof(SSteps));
if (steps == NULL) return NULL; if (steps == NULL) return NULL;
steps->maxsize = maxsize; steps->maxsize = maxsize;
steps->cursize = 0; steps->cursize = 0;
steps->steps = calloc(maxsize, sizeof(SStepObj)); steps->steps = calloc(maxsize, sizeof(SStep));
steps->reportFp = fp;
return steps; return steps;
} }
int32_t taosStepAdd(SSteps *steps, SStepObj *step) { int32_t taosStepAdd(struct SSteps *steps, char *name, void **obj, InitFp initFp, CleanupFp cleanupFp) {
if (steps == NULL) return - 1; if (steps == NULL) return -1;
if (steps->cursize >= steps->maxsize) { if (steps->cursize >= steps->maxsize) {
uError("failed to add step since up to the maxsize"); uError("failed to add step since up to the maxsize");
return -1; return -1;
} }
steps->steps[steps->cursize++] = *step; SStep step = {.name = name, .self = obj, .initFp = initFp, .cleanupFp = cleanupFp};
steps->steps[steps->cursize++] = step;
return 0; return 0;
} }
static void taosStepCleanupImp(SSteps *steps, int32_t pos) { static void taosStepCleanupImp(SSteps *steps, int32_t pos) {
for (int32_t s = pos; s >= 0; s--) { for (int32_t s = pos; s >= 0; s--) {
SStepObj *step = steps->steps + s; SStep *step = steps->steps + s;
uDebug("step:%s will cleanup", step->name); uDebug("step:%s will cleanup", step->name);
if (step->cleanupFp != NULL) { if (step->cleanupFp != NULL) {
(*step->cleanupFp)(step->self); (*step->cleanupFp)(step->self);
...@@ -55,14 +70,14 @@ int32_t taosStepExec(SSteps *steps) { ...@@ -55,14 +70,14 @@ int32_t taosStepExec(SSteps *steps) {
if (steps == NULL) return -1; if (steps == NULL) return -1;
for (int32_t s = 0; s < steps->cursize; s++) { for (int32_t s = 0; s < steps->cursize; s++) {
SStepObj *step = steps->steps + s; SStep *step = steps->steps + s;
if (step->initFp == NULL) continue; if (step->initFp == NULL) continue;
if (step->reportFp != NULL) { if (steps->reportFp != NULL) {
(*step->reportFp)(step->parent, step->name, "start initialize"); (*steps->reportFp)(step->name, "start initialize");
} }
int32_t code = (*step->initFp)(step->parent, step->self); int32_t code = (*step->initFp)(step->self);
if (code != 0) { if (code != 0) {
uDebug("step:%s will cleanup", step->name); uDebug("step:%s will cleanup", step->name);
taosStepCleanupImp(steps, s); taosStepCleanupImp(steps, s);
...@@ -71,8 +86,8 @@ int32_t taosStepExec(SSteps *steps) { ...@@ -71,8 +86,8 @@ int32_t taosStepExec(SSteps *steps) {
uInfo("step:%s is initialized", step->name); uInfo("step:%s is initialized", step->name);
if (step->reportFp != NULL) { if (steps->reportFp != NULL) {
(*step->reportFp)(step->parent, step->name, "initialize completed"); (*steps->reportFp)(step->name, "initialize completed");
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册