From e168709b729ee1cf9af508c9ae77659419901ac3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 5 Oct 2021 13:16:54 +0800 Subject: [PATCH] [TD-10430] refact interface of dnode --- include/libs/transport/trpc.h | 7 +- include/server/dnode/dnode.h | 22 +- include/server/mnode/mnode.h | 41 +-- include/server/vnode/vnode.h | 34 +-- include/util/tstep.h | 31 +-- source/libs/transport/src/rpcMain.c | 14 +- source/libs/transport/src/rpcTcp.c | 14 +- source/server/dnode/inc/dnodeCfg.h | 3 +- source/server/dnode/inc/dnodeCheck.h | 3 +- source/server/dnode/inc/dnodeEps.h | 5 +- source/server/dnode/inc/dnodeInt.h | 2 + source/server/dnode/inc/dnodeMain.h | 17 +- source/server/dnode/inc/dnodeMnodeEps.h | 5 +- source/server/dnode/inc/dnodeStatus.h | 5 +- .../inc/{dnodeTelemetry.h => dnodeTelem.h} | 5 +- source/server/dnode/inc/dnodeTrans.h | 32 +-- source/server/dnode/src/dnodeCfg.c | 3 +- source/server/dnode/src/dnodeCheck.c | 3 +- source/server/dnode/src/dnodeEps.c | 11 +- source/server/dnode/src/dnodeInt.c | 197 +++----------- source/server/dnode/src/dnodeMain.c | 32 +-- source/server/dnode/src/dnodeMnodeEps.c | 11 +- source/server/dnode/src/dnodeStatus.c | 18 +- .../src/{dnodeTelemetry.c => dnodeTelem.c} | 14 +- source/server/dnode/src/dnodeTrans.c | 242 +++++++++--------- source/server/mnode/src/mnodeMain.c | 20 +- source/server/server.c | 6 +- source/server/vnode/src/vnodeMain.c | 14 +- source/util/src/tstep.c | 41 ++- 29 files changed, 325 insertions(+), 527 deletions(-) rename source/server/dnode/inc/{dnodeTelemetry.h => dnodeTelem.h} (89%) rename source/server/dnode/src/{dnodeTelemetry.c => dnodeTelem.c} (96%) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 2a0efab213..0ce2e3da14 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -64,9 +64,6 @@ typedef struct SRpcInit { int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int idleTime; // milliseconds, 0 means idle timer is disabled - // owner of the rpc client/server, - void *owner; // set by the app when rpc init - // the following is for client app ecurity only char *user; // user name char spi; // security parameter index @@ -75,10 +72,10 @@ typedef struct SRpcInit { char *ckey; // ciphering key // call back to process incoming msg, code shall be ignored by server app - void (*cfp)(void *owner, SRpcMsg *, SRpcEpSet *); + void (*cfp)(SRpcMsg *, SRpcEpSet *); // call back to retrieve the client auth info, for server app only - int (*afp)(void *owner, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); + int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); } SRpcInit; int32_t rpcInit(); diff --git a/include/server/dnode/dnode.h b/include/server/dnode/dnode.h index a41235e7c9..d7aaa0e008 100644 --- a/include/server/dnode/dnode.h +++ b/include/server/dnode/dnode.h @@ -22,58 +22,50 @@ extern "C" { struct SRpcEpSet; struct SRpcMsg; -struct Dnode; - /** * Initialize and start the dnode module. * - * @return Instance of dnode module. + * @return Error code. */ -struct Dnode *dnodeCreateInstance(); +int32_t dnodeInit(); /** * Stop and cleanup dnode module. - * - * @param dnode, instance of dnode module. */ -void dnodeDropInstance(struct Dnode *dnode); +void dnodeCleanup(); /** * Send messages to other dnodes, such as create vnode message. * - * @param dnode, the instance of Dnode module. * @param epSet, the endpoint list of the dnodes. * @param rpcMsg, message to be sent. */ -void dnodeSendMsgToDnode(struct Dnode *dnode, struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); +void dnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); /** * Send messages to mnode, such as config message. * - * @param dnode, the instance of dnode module. * @param rpcMsg, message to be sent. */ -void dnodeSendMsgToMnode(struct Dnode *dnode, struct SRpcMsg *rpcMsg); +void dnodeSendMsgToMnode(struct SRpcMsg *rpcMsg); /** * Send redirect message to dnode or shell. * - * @param dnode, the instance of dnode module. * @param rpcMsg, message to be sent. * @param forShell, used to identify whether to send to shell or dnode. */ -void dnodeSendRedirectMsg(struct Dnode *dnode, struct SRpcMsg *rpcMsg, bool forShell); +void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell); /** * Get the corresponding endpoint information from dnodeId. * - * @param dnode, the instance of dnode module. * @param dnodeId, the id ot dnode. * @param ep, the endpoint of dnode. * @param fqdn, the fqdn of dnode. * @param port, the port of dnode. */ -void dnodeGetDnodeEp(struct Dnode *dnode, int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); +void dnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); #ifdef __cplusplus } diff --git a/include/server/mnode/mnode.h b/include/server/mnode/mnode.h index 7a06880a16..e78994fc9c 100644 --- a/include/server/mnode/mnode.h +++ b/include/server/mnode/mnode.h @@ -20,34 +20,29 @@ extern "C" { #endif -struct Dnode; - typedef struct { /** * Send messages to other dnodes, such as create vnode message. * - * @param dnode, the instance of dnode module. * @param epSet, the endpoint list of the dnodes. * @param rpcMsg, message to be sent. */ - void (*SendMsgToDnode)(struct Dnode *dnode, struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); + void (*SendMsgToDnode)(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); /** * Send messages to mnode, such as config message. * - * @param dnode, the instance of dnode module. * @param rpcMsg, message to be sent. */ - void (*SendMsgToMnode)(struct Dnode *dnode, struct SRpcMsg *rpcMsg); + void (*SendMsgToMnode)(struct SRpcMsg *rpcMsg); /** * Send redirect message to dnode or shell. * - * @param dnode, the instance of dnode module. * @param rpcMsg, message to be sent. * @param forShell, used to identify whether to send to shell or dnode. */ - void (*SendRedirectMsg)(struct Dnode *dnode, struct SRpcMsg *rpcMsg, bool forShell); + void (*SendRedirectMsg)(struct SRpcMsg *rpcMsg, bool forShell); /** * Get the corresponding endpoint information from dnodeId. @@ -58,12 +53,11 @@ typedef struct { * @param fqdn, the fqdn of dnode. * @param port, the port of dnode. */ - void (*GetDnodeEp)(struct Dnode *dnode, int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); + void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); } SMnodeFp; typedef struct { - struct Dnode *dnode; SMnodeFp fp; char clusterId[TSDB_CLUSTER_ID_LEN]; int32_t dnodeId; @@ -73,40 +67,34 @@ typedef struct { * Initialize and start mnode module. * * @param para, initialization parameters. - * @return Instance of mnode module. + * @return Error code. */ -struct Mnode *mnodeCreateInstance(SMnodePara para); +int32_t mnodeInit(SMnodePara para); /** * Stop and cleanup mnode module. - * - * @param mnode, instance of mnode module. */ -void mnodeDropInstance(struct Mnode *vnode); +void mnodeCleanup(); /** * Deploy mnode instances in dnode. * - * @param mnode, instance of mnode module. * @param minfos, server information used to deploy the mnode instance. * @return Error Code. */ -int32_t mnodeDeploy(struct Mnode *mnode, struct SMInfos *minfos); +int32_t mnodeDeploy(struct SMInfos *minfos); /** * Delete the mnode instance deployed in dnode. - * - * @param mnode, instance of mnode module. */ -void mnodeUnDeploy(struct Mnode *mnode); +void mnodeUnDeploy(); /** * Whether the mnode is in service. * - * @param mnode, instance of mnode module. * @return Server status. */ -bool mnodeIsServing(struct Mnode *mnode); +bool mnodeIsServing(); typedef struct { int64_t numOfDnode; @@ -124,16 +112,14 @@ typedef struct { /** * Get the statistical information of Mnode. * - * @param mnode, instance of mnode module. * @param stat, statistical information. * @return Error Code. */ -int32_t mnodeGetStatistics(struct Mnode *mnode, SMnodeStat *stat); +int32_t mnodeGetStatistics(SMnodeStat *stat); /** * Get the statistical information of Mnode. * - * @param mnode, instance of mnode module. * @param user, username. * @param spi, security parameter index. * @param encrypt, encrypt algorithm. @@ -141,16 +127,15 @@ int32_t mnodeGetStatistics(struct Mnode *mnode, SMnodeStat *stat); * @param ckey, ciphering key. * @return Error Code. */ -int32_t mnodeRetriveAuth(struct Mnode *mnode, char *user, char *spi, char *encrypt, char *secret, char *ckey); +int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); /** * Interface for processing messages. * - * @param mnode, instance of mnode module. * @param rpcMsg, message to be processed. * @return Error code. */ -void mnodeProcessMsg(struct Mnode *mnode, SRpcMsg *rpcMsg); +void mnodeProcessMsg(SRpcMsg *rpcMsg); #ifdef __cplusplus } diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index ee015788d3..8fd4fd433f 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -20,41 +20,35 @@ extern "C" { #endif -struct Dnode; - typedef struct { /** * Send messages to other dnodes, such as create vnode message. * - * @param dnode, the instance of dnode module. * @param epSet, the endpoint list of dnodes. * @param rpcMsg, message to be sent. */ - void (*SendMsgToDnode)(struct Dnode *dnode, struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); + void (*SendMsgToDnode)(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); /** * Send messages to mnode, such as config message. * - * @param dnode, the instance of dnode module. * @param rpcMsg, message to be sent. */ - void (*SendMsgToMnode)(struct Dnode *dnode, struct SRpcMsg *rpcMsg); + void (*SendMsgToMnode)(struct SRpcMsg *rpcMsg); /** * Get the corresponding endpoint information from dnodeId. * - * @param dnode, the instance of dnode module. * @param dnodeId, the id ot dnode. * @param ep, the endpoint of dnode. * @param fqdn, the fqdn of dnode. * @param port, the port of dnode. */ - void (*GetDnodeEp)(struct Dnode *dnode, int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); + void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); } SVnodeFp; typedef struct { - struct Dnode *dnode; SVnodeFp fp; } SVnodePara; @@ -62,16 +56,14 @@ typedef struct { * Start initialize vnode module. * * @param para, initialization parameters. - * @return Instance of vnode module. + * @return Error code. */ -struct Vnode *vnodeCreateInstance(SVnodePara para); +int32_t vnodeInit(SVnodePara para); /** * Cleanup vnode module. - * - * @param vnode, instance of vnode module. */ -void vnodeDropInstance(struct Vnode *vnode); +void vnodeCleanup(); typedef struct { int32_t unused; @@ -80,36 +72,32 @@ typedef struct { /** * Get the statistical information of vnode. * - * @param vnode, instance of vnode module. - * @param sta, statistical information. + * @param stat, statistical information. * @return Error Code. */ -int32_t vnodeGetStatistics(struct Vnode *vnode, SVnodeStat *stat); +int32_t vnodeGetStatistics(SVnodeStat *stat); /** * Get the status of all vnodes. * - * @param vnode, instance of vnode module. * @param status, status msg. */ -void vnodeGetStatus(struct Vnode *vnode, struct SStatusMsg *status); +void vnodeGetStatus(struct SStatusMsg *status); /** * Set access permissions for all vnodes. * - * @param vnode, instance of vnode module. * @param access, access permissions of vnodes. * @param numOfVnodes, the size of vnodes. */ -void vnodeSetAccess(struct Vnode *vnode, struct SVgroupAccess *access, int32_t numOfVnodes); +void vnodeSetAccess(struct SVgroupAccess *access, int32_t numOfVnodes); /** * Interface for processing messages. * - * @param vnode, instance of vnode module. * @param msg, message to be processed. */ -void vnodeProcessMsg(struct Vnode *vnode, SRpcMsg *msg); +void vnodeProcessMsg(SRpcMsg *msg); #ifdef __cplusplus } diff --git a/include/util/tstep.h b/include/util/tstep.h index 90dd5dd0fc..87e95edd97 100644 --- a/include/util/tstep.h +++ b/include/util/tstep.h @@ -20,29 +20,14 @@ extern "C" { #endif -typedef int32_t (*FnInitObj)(void *parent, void **self); -typedef void (*FnCleanupObj)(void **self); -typedef void (*FnReportProgress)(void *parent, const char *name, const char *desc); - -typedef struct SStepObj { - const char * name; - void * parent; - void ** self; - FnInitObj initFp; - FnCleanupObj cleanupFp; - FnReportProgress reportFp; -} SStepObj; - -typedef struct SSteps { - int32_t cursize; - int32_t maxsize; - SStepObj *steps; -} SSteps; - -SSteps *taosStepInit(int32_t stepsize); -int32_t taosStepAdd(SSteps *steps, SStepObj *step); -int32_t taosStepExec(SSteps *steps); -void taosStepCleanup(SSteps *steps); +typedef int32_t (*InitFp)(void **obj); +typedef void (*CleanupFp)(void **obj); +typedef void (*ReportFp)(char *name, char *desc); + +struct SSteps *taosStepInit(int32_t maxsize, ReportFp fp); +int32_t taosStepExec(struct SSteps *steps); +void taosStepCleanup(struct SSteps *steps); +int32_t taosStepAdd(struct SSteps *steps, char *name, void **obj, InitFp initFp, CleanupFp cleanupFp); #ifdef __cplusplus } diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 800b146713..1f4a1ba3cf 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -54,9 +54,8 @@ typedef struct { char secret[TSDB_KEY_LEN]; // secret for the link char ckey[TSDB_KEY_LEN]; // ciphering key - void *owner; - void (*cfp)(void * owner, SRpcMsg *, SRpcEpSet *); - int (*afp)(void * owner, char *user, char *spi, char *encrypt, char *secret, char *ckey); + void (*cfp)(SRpcMsg *, SRpcEpSet *); + int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t refCount; void *idPool; // handle to ID pool @@ -259,7 +258,6 @@ void *rpcOpen(const SRpcInit *pInit) { if (pInit->secret) memcpy(pRpc->secret, pInit->secret, sizeof(pRpc->secret)); if (pInit->ckey) tstrncpy(pRpc->ckey, pInit->ckey, sizeof(pRpc->ckey)); pRpc->spi = pInit->spi; - pRpc->owner = pInit->owner; pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; pRpc->refCount = 1; @@ -742,7 +740,7 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { if (pConn->user[0] == 0) { terrno = TSDB_CODE_RPC_AUTH_REQUIRED; } else { - terrno = (*pRpc->afp)(pRpc->owner, pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey); + terrno = (*pRpc->afp)(pConn->user, &pConn->spi, &pConn->encrypt, pConn->secret, pConn->ckey); } if (terrno != 0) { @@ -1022,7 +1020,7 @@ static void doRpcReportBrokenLinkToServer(void *param, void *id) { SRpcMsg *pRpcMsg = (SRpcMsg *)(param); SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle); SRpcInfo *pRpc = pConn->pRpc; - (*(pRpc->cfp))(pRpc->owner, pRpcMsg, NULL); + (*(pRpc->cfp))(pRpcMsg, NULL); free(pRpcMsg); } static void rpcReportBrokenLinkToServer(SRpcConn *pConn) { @@ -1137,7 +1135,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) pEpSet = &pContext->epSet; - (*pRpc->cfp)(pRpc->owner, pMsg, pEpSet); + (*pRpc->cfp)(pMsg, pEpSet); } // free the request message @@ -1161,7 +1159,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte rpcAddRef(pRpc); // add the refCount for requests // notify the server app - (*(pRpc->cfp))(pRpc->owner, &rpcMsg, NULL); + (*(pRpc->cfp))(&rpcMsg, NULL); } else { // it's a response rpcMsg.handle = pContext; diff --git a/source/libs/transport/src/rpcTcp.c b/source/libs/transport/src/rpcTcp.c index d853fdb9f1..d0710c883f 100644 --- a/source/libs/transport/src/rpcTcp.c +++ b/source/libs/transport/src/rpcTcp.c @@ -194,7 +194,7 @@ void taosStopTcpServer(void *handle) { pServerObj->stop = 1; if (pServerObj->fd >= 0) { - taosShutDownSocketRD(pServerObj->fd); + taosShutDownSocketRD(pServerObj->fd); } if (taosCheckPthreadValid(pServerObj->thread)) { if (taosComparePthread(pServerObj->thread, pthread_self())) { @@ -257,8 +257,8 @@ static void *taosAcceptTcpConnection(void *arg) { int32_t ret = taosSetSockOpt(connFd, SOL_SOCKET, SO_RCVTIMEO, &to, sizeof(to)); if (ret != 0) { taosCloseSocket(connFd); - tError("%s failed to set recv timeout fd(%s)for connection from:%hu", pServerObj->label, strerror(errno), - htons(caddr.sin_port)); + tError("%s failed to set recv timeout fd(%s)for connection from:%s:%hu", pServerObj->label, strerror(errno), + taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); continue; } @@ -270,12 +270,12 @@ static void *taosAcceptTcpConnection(void *arg) { if (pFdObj) { pFdObj->ip = caddr.sin_addr.s_addr; pFdObj->port = htons(caddr.sin_port); - tDebug("%s new TCP connection from %hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, - pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); + tDebug("%s new TCP connection from %s:%hu, fd:%d FD:%p numOfFds:%d", pServerObj->label, + taosInetNtoa(caddr.sin_addr), pFdObj->port, connFd, pFdObj, pThreadObj->numOfFds); } else { taosCloseSocket(connFd); - tError("%s failed to malloc FdObj(%s) for connection from:%hu", pServerObj->label, strerror(errno), - htons(caddr.sin_port)); + tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno), + taosInetNtoa(caddr.sin_addr), htons(caddr.sin_port)); } // pick up next thread for next connection diff --git a/source/server/dnode/inc/dnodeCfg.h b/source/server/dnode/inc/dnodeCfg.h index 2284486e51..7a0d830a4b 100644 --- a/source/server/dnode/inc/dnodeCfg.h +++ b/source/server/dnode/inc/dnodeCfg.h @@ -22,7 +22,6 @@ extern "C" { #include "dnodeInt.h" typedef struct DnCfg { - Dnode * dnode; int32_t dnodeId; int32_t dropped; char clusterId[TSDB_CLUSTER_ID_LEN]; @@ -30,7 +29,7 @@ typedef struct DnCfg { pthread_mutex_t mutex; } DnCfg; -int32_t dnodeInitCfg(Dnode *dnode, DnCfg **cfg); +int32_t dnodeInitCfg(DnCfg **cfg); void dnodeCleanupCfg(DnCfg **cfg); void dnodeUpdateCfg(DnCfg *cfg, SDnodeCfg *data); int32_t dnodeGetDnodeId(DnCfg *cfg); diff --git a/source/server/dnode/inc/dnodeCheck.h b/source/server/dnode/inc/dnodeCheck.h index 84232fbc1d..db275259f5 100644 --- a/source/server/dnode/inc/dnodeCheck.h +++ b/source/server/dnode/inc/dnodeCheck.h @@ -22,10 +22,9 @@ extern "C" { #include "dnodeInt.h" typedef struct DnCheck { - Dnode *dnode; } DnCheck; -int32_t dnodeInitCheck(Dnode *dnode, DnCheck **check); +int32_t dnodeInitCheck(DnCheck **check); void dnodeCleanupCheck(DnCheck **check); #ifdef __cplusplus diff --git a/source/server/dnode/inc/dnodeEps.h b/source/server/dnode/inc/dnodeEps.h index aec412ee59..79c49bde40 100644 --- a/source/server/dnode/inc/dnodeEps.h +++ b/source/server/dnode/inc/dnodeEps.h @@ -23,7 +23,6 @@ extern "C" { #include "dnodeInt.h" typedef struct DnEps { - Dnode * dnode; int32_t dnodeId; int32_t dnodeNum; SDnodeEp * dnodeList; @@ -32,11 +31,11 @@ typedef struct DnEps { pthread_mutex_t mutex; } DnEps; -int32_t dnodeInitEps(Dnode *dnode, DnEps **eps); +int32_t dnodeInitEps(DnEps **eps); void dnodeCleanupEps(DnEps **eps); void dnodeUpdateEps(DnEps *eps, SDnodeEps *data); bool dnodeIsDnodeEpChanged(DnEps *eps, int32_t dnodeId, char *epstr); -void dnodeGetDnodeEp(Dnode *dnode, int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); +void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); #ifdef __cplusplus } diff --git a/source/server/dnode/inc/dnodeInt.h b/source/server/dnode/inc/dnodeInt.h index b0e5a448e0..b32e23db14 100644 --- a/source/server/dnode/inc/dnodeInt.h +++ b/source/server/dnode/inc/dnodeInt.h @@ -52,6 +52,8 @@ typedef struct Dnode { struct Vnode * vnode; } Dnode; +Dnode* dnodeInst(); + #define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }} #define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }} #define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", 255, __VA_ARGS__); }} diff --git a/source/server/dnode/inc/dnodeMain.h b/source/server/dnode/inc/dnodeMain.h index f9c15db1c1..9fe5d64800 100644 --- a/source/server/dnode/inc/dnodeMain.h +++ b/source/server/dnode/inc/dnodeMain.h @@ -28,21 +28,20 @@ typedef enum { } RunStat; typedef struct DnMain { - Dnode * dnode; RunStat runStatus; void * dnodeTimer; SStartupStep startup; } DnMain; -int32_t dnodeInitMain(Dnode *dnode, DnMain **main); +int32_t dnodeInitMain(DnMain **main); void dnodeCleanupMain(DnMain **main); -int32_t dnodeInitStorage(Dnode *dnode, void **unused); -void dnodeCleanupStorage(void **unused); -void dnodeReportStartup(Dnode *dnode, char *name, char *desc); -void dnodeReportStartupFinished(Dnode *dnode, char *name, char *desc); -void dnodeProcessStartupReq(Dnode *dnode, SRpcMsg *pMsg); -void dnodeProcessCreateMnodeReq(Dnode *dnode, SRpcMsg *pMsg); -void dnodeProcessConfigDnodeReq(Dnode *dnode, SRpcMsg *pMsg); +int32_t dnodeInitStorage(); +void dnodeCleanupStorage(); +void dnodeReportStartup(char *name, char *desc); +void dnodeReportStartupFinished(char *name, char *desc); +void dnodeProcessStartupReq(SRpcMsg *pMsg); +void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg); +void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/server/dnode/inc/dnodeMnodeEps.h b/source/server/dnode/inc/dnodeMnodeEps.h index 950fc3783d..b94b08c304 100644 --- a/source/server/dnode/inc/dnodeMnodeEps.h +++ b/source/server/dnode/inc/dnodeMnodeEps.h @@ -22,20 +22,19 @@ extern "C" { #include "dnodeInt.h" typedef struct DnMnEps { - Dnode * dnode; SRpcEpSet mnodeEpSet; SMInfos mnodeInfos; char file[PATH_MAX + 20]; pthread_mutex_t mutex; } DnMnEps; -int32_t dnodeInitMnodeEps(Dnode *dnode, DnMnEps **meps); +int32_t dnodeInitMnodeEps(DnMnEps **meps); void dnodeCleanupMnodeEps(DnMnEps **meps); void dnodeUpdateMnodeFromStatus(DnMnEps *meps, SMInfos *pMinfos); void dnodeUpdateMnodeFromPeer(DnMnEps *meps, SRpcEpSet *pEpSet); void dnodeGetEpSetForPeer(DnMnEps *meps, SRpcEpSet *epSet); void dnodeGetEpSetForShell(DnMnEps *meps, SRpcEpSet *epSet); -void dnodeSendRedirectMsg(Dnode *dnode, SRpcMsg *rpcMsg, bool forShell); +void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); #ifdef __cplusplus } diff --git a/source/server/dnode/inc/dnodeStatus.h b/source/server/dnode/inc/dnodeStatus.h index ed7782fb36..cfd58578c0 100644 --- a/source/server/dnode/inc/dnodeStatus.h +++ b/source/server/dnode/inc/dnodeStatus.h @@ -22,15 +22,14 @@ extern "C" { #include "dnodeInt.h" typedef struct DnStatus { - Dnode * dnode; void * dnodeTimer; void * statusTimer; uint32_t rebootTime; } DnStatus; -int32_t dnodeInitStatus(Dnode *dnode, DnStatus **status); +int32_t dnodeInitStatus(DnStatus **status); void dnodeCleanupStatus(DnStatus **status); -void dnodeProcessStatusRsp(Dnode *dnode, SRpcMsg *pMsg); +void dnodeProcessStatusRsp(SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/server/dnode/inc/dnodeTelemetry.h b/source/server/dnode/inc/dnodeTelem.h similarity index 89% rename from source/server/dnode/inc/dnodeTelemetry.h rename to source/server/dnode/inc/dnodeTelem.h index e3e097d2f2..48b10399bd 100644 --- a/source/server/dnode/inc/dnodeTelemetry.h +++ b/source/server/dnode/inc/dnodeTelem.h @@ -26,7 +26,6 @@ extern "C" { * thus we use pthread_mutex_t/pthread_cond_t to simulate */ typedef struct DnTelem { - Dnode * dnode; bool enable; pthread_mutex_t lock; pthread_cond_t cond; @@ -35,8 +34,8 @@ typedef struct DnTelem { char email[TSDB_FQDN_LEN]; } DnTelem; -int32_t dnodeInitTelemetry(Dnode *dnode, DnTelem **telem); -void dnodeCleanupTelemetry(DnTelem **telem); +int32_t dnodeInitTelem(DnTelem **telem); +void dnodeCleanupTelem(DnTelem **telem); #ifdef __cplusplus } diff --git a/source/server/dnode/inc/dnodeTrans.h b/source/server/dnode/inc/dnodeTrans.h index ad2894c6f2..306624428b 100644 --- a/source/server/dnode/inc/dnodeTrans.h +++ b/source/server/dnode/inc/dnodeTrans.h @@ -21,30 +21,24 @@ extern "C" { #endif #include "dnodeInt.h" -typedef void (*RpcMsgCfp)(void *owner, SRpcMsg *pMsg, SRpcEpSet *pEpSet); -typedef void (*RpcMsgFp)(void *owner, SRpcMsg *pMsg); - -typedef struct DnMsgFp { - void * module; - RpcMsgFp fp; -} DnMsgFp; +typedef void (*RpcMsgFp)( SRpcMsg *pMsg); typedef struct DnTrans { - Dnode * dnode; - void * serverRpc; - void * clientRpc; - void * shellRpc; - int32_t queryReqNum; - int32_t submitReqNum; - DnMsgFp fpPeerMsg[TSDB_MSG_TYPE_MAX]; - DnMsgFp fpShellMsg[TSDB_MSG_TYPE_MAX]; + void * serverRpc; + void * clientRpc; + void * shellRpc; + int32_t queryReqNum; + int32_t submitReqNum; + RpcMsgFp peerMsgFp[TSDB_MSG_TYPE_MAX]; + RpcMsgFp shellMsgFp[TSDB_MSG_TYPE_MAX]; + } DnTrans; -int32_t dnodeInitTrans(Dnode *dnode, DnTrans **trans); +int32_t dnodeInitTrans(DnTrans **rans); void dnodeCleanupTrans(DnTrans **trans); -void dnodeSendMsgToMnode(Dnode *dnode, SRpcMsg *rpcMsg); -void dnodeSendMsgToDnode(Dnode *dnode, SRpcEpSet *epSet, SRpcMsg *rpcMsg); -void dnodeSendMsgToDnodeRecv(Dnode *dnode, SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); +void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); +void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); +void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); #ifdef __cplusplus } diff --git a/source/server/dnode/src/dnodeCfg.c b/source/server/dnode/src/dnodeCfg.c index 2162e3f791..77329e2ad1 100644 --- a/source/server/dnode/src/dnodeCfg.c +++ b/source/server/dnode/src/dnodeCfg.c @@ -103,11 +103,10 @@ static int32_t dnodeWriteCfg(DnCfg *cfg) { return 0; } -int32_t dnodeInitCfg(Dnode *dnode, DnCfg **out) { +int32_t dnodeInitCfg(DnCfg **out) { DnCfg* cfg = calloc(1, sizeof(DnCfg)); if (cfg == NULL) return -1; - cfg->dnode = dnode; cfg->dnodeId = 0; cfg->dropped = 0; cfg->clusterId[0] = 0; diff --git a/source/server/dnode/src/dnodeCheck.c b/source/server/dnode/src/dnodeCheck.c index 7c3b539d70..9175b4e519 100644 --- a/source/server/dnode/src/dnodeCheck.c +++ b/source/server/dnode/src/dnodeCheck.c @@ -145,11 +145,10 @@ static int32_t dnodeCheckAccess() { return 0; } static int32_t dnodeCheckVersion() { return 0; } static int32_t dnodeCheckDatafile() { return 0; } -int32_t dnodeInitCheck(Dnode *dnode, DnCheck **out) { +int32_t dnodeInitCheck(DnCheck **out) { DnCheck *check = calloc(1, sizeof(DnCheck)); if (check == NULL) return -1; - check->dnode = dnode; *out = check; if (dnodeCheckNetwork() != 0) { diff --git a/source/server/dnode/src/dnodeEps.c b/source/server/dnode/src/dnodeEps.c index 8595b1b339..e633da9fc9 100644 --- a/source/server/dnode/src/dnodeEps.c +++ b/source/server/dnode/src/dnodeEps.c @@ -182,15 +182,14 @@ static int32_t dnodeWriteEps(DnEps *eps) { return 0; } -int32_t dnodeInitEps(Dnode *dnode, DnEps **out) { +int32_t dnodeInitEps(DnEps **out) { DnEps *eps = calloc(1, sizeof(DnEps)); if (eps == NULL) return -1; eps->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (eps->dnodeHash == NULL) return -1; - eps->dnode = dnode; - eps->dnodeId = dnode->cfg->dnodeId; + eps->dnodeId = dnodeInst()->cfg->dnodeId; eps->dnodeNum = 0; snprintf(eps->file, sizeof(eps->file), "%s/dnodeEps.json", tsDnodeDir); pthread_mutex_init(&eps->mutex, NULL); @@ -269,10 +268,8 @@ bool dnodeIsDnodeEpChanged(DnEps *eps, int32_t dnodeId, char *epstr) { return changed; } -void dnodeGetDnodeEp(Dnode *dnode, int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) { - assert(dnode != NULL); - - DnEps *eps = dnode->eps; +void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) { + DnEps *eps = dnodeInst()->eps; pthread_mutex_lock(&eps->mutex); SDnodeEp *ep = taosHashGet(eps->dnodeHash, &dnodeId, sizeof(int32_t)); diff --git a/source/server/dnode/src/dnodeInt.c b/source/server/dnode/src/dnodeInt.c index f38d9665ac..8c795be9de 100644 --- a/source/server/dnode/src/dnodeInt.c +++ b/source/server/dnode/src/dnodeInt.c @@ -28,202 +28,75 @@ #include "dnodeMain.h" #include "dnodeMnodeEps.h" #include "dnodeStatus.h" -#include "dnodeTelemetry.h" +#include "dnodeTelem.h" #include "dnodeTrans.h" #include "mnode.h" #include "vnode.h" -static int32_t dnodeInitRpcEnv(Dnode *dnode, void **unUsed) { return rpcInit(); } -static void dnodeCleanupRpcEnv(void **unUsed) { rpcCleanup(); } -#if 0 -static int32_t dnodeInitTfsEnv(Dnode *dnode, void **unUsed) { return tfInit(); } -static void dnodeCleanupTfsEnv(void **unUsed) { tfCleanup(); } -static int32_t dnodeInitScriptEnv(Dnode *dnode, void **unUsed) { return scriptEnvPoolInit(); } -static void dnodeCleanupScriptEnv(void **unUsed) { scriptEnvPoolCleanup(); } -static int32_t dnodeInitWalEnv(Dnode *dnode, void **unUsed) { return walInit(); } -static void dnodeCleanupWalEnv(void **unUsed) { walCleanUp(); } -static int32_t dnodeInitSyncEnv(Dnode *dnode, void **unUsed) { return syncInit(); } -static void dnodeCleanupSyncEnv(void **unUsed) { syncCleanUp(); } -#endif +static Dnode tsDnode = {0}; + +Dnode *dnodeInst() { return &tsDnode; } -static int32_t dnodeInitVnodeModule(Dnode *dnode, struct Vnode** out) { +static int32_t dnodeInitVnodeModule(void **unused) { SVnodePara para; para.fp.GetDnodeEp = dnodeGetDnodeEp; para.fp.SendMsgToDnode = dnodeSendMsgToDnode; para.fp.SendMsgToMnode = dnodeSendMsgToMnode; - para.dnode = dnode; - - struct Vnode *vnode = vnodeCreateInstance(para); - if (vnode == NULL) { - return -1; - } - *out = vnode; - return 0; + return vnodeInit(para); } -static void dnodeCleanupVnodeModule(Dnode *dnode, struct Vnode **out) { - struct Vnode *vnode = *out; - *out = NULL; - vnodeDropInstance(vnode); -} +static int32_t dnodeInitMnodeModule(void **unused) { + Dnode *dnode = dnodeInst(); -static int32_t dnodeInitMnodeModule(Dnode *dnode, struct Mnode **out) { SMnodePara para; para.fp.GetDnodeEp = dnodeGetDnodeEp; para.fp.SendMsgToDnode = dnodeSendMsgToDnode; para.fp.SendMsgToMnode = dnodeSendMsgToMnode; para.fp.SendRedirectMsg = dnodeSendRedirectMsg; - para.dnode = dnode; para.dnodeId = dnode->cfg->dnodeId; strncpy(para.clusterId, dnode->cfg->clusterId, sizeof(para.clusterId)); - struct Mnode *mnode = mnodeCreateInstance(para); - if (mnode == NULL) { - return -1; - } - - *out = mnode; - return 0; -} - -static void dnodeCleanupMnodeModule(Dnode *dnode, struct Mnode **out) { - struct Mnode *mnode = *out; - *out = NULL; - mnodeDropInstance(mnode); + return mnodeInit(para); } -Dnode *dnodeCreateInstance() { - Dnode *dnode = calloc(1, sizeof(Dnode)); - if (dnode == NULL) { - return NULL; - } - - SSteps *steps = taosStepInit(24); - if (steps == NULL) { - return NULL; - } - - SStepObj step = {0}; - step.parent = dnode; - - step.name = "dnode-main"; - step.self = (void **)&dnode->main; - step.initFp = (FnInitObj)dnodeInitMain; - step.cleanupFp = (FnCleanupObj)dnodeCleanupMain; - step.reportFp = NULL; - taosStepAdd(steps, &step); - - step.name = "dnode-storage"; - step.self = NULL; - step.initFp = (FnInitObj)dnodeInitStorage; - step.cleanupFp = (FnCleanupObj)dnodeCleanupStorage; - step.reportFp = (FnReportProgress)dnodeReportStartup; - taosStepAdd(steps, &step); - -#if 0 - step.name = "dnode-tfs-env"; - step.self = NULL; - step.initFp = (FnInitObj)dnodeInitTfsEnv; - step.cleanupFp = (FnCleanupObj)dnodeCleanupTfsEnv; - taosStepAdd(steps, &step); -#endif - - step.name = "dnode-rpc-env"; - step.self = NULL; - step.initFp = (FnInitObj)dnodeInitRpcEnv; - step.cleanupFp = (FnCleanupObj)dnodeCleanupRpcEnv; - taosStepAdd(steps, &step); - - step.name = "dnode-check"; - step.self = (void **)&dnode->check; - step.initFp = (FnInitObj)dnodeInitCheck; - step.cleanupFp = (FnCleanupObj)dnodeCleanupCheck; - taosStepAdd(steps, &step); - - step.name = "dnode-cfg"; - step.self = (void **)&dnode->cfg; - step.initFp = (FnInitObj)dnodeInitCfg; - step.cleanupFp = (FnCleanupObj)dnodeCleanupCfg; - taosStepAdd(steps, &step); - - step.name = "dnode-deps"; - step.self = (void **)&dnode->eps; - step.initFp = (FnInitObj)dnodeInitEps; - step.cleanupFp = (FnCleanupObj)dnodeCleanupEps; - taosStepAdd(steps, &step); - - step.name = "dnode-meps"; - step.self = (void **)&dnode->meps; - step.initFp = (FnInitObj)dnodeInitMnodeEps; - step.cleanupFp = (FnCleanupObj)dnodeCleanupMnodeEps; - taosStepAdd(steps, &step); - -#if 0 - step.name = "dnode-wal"; - step.self = NULL; - step.initFp = (FnInitObj)dnodeInitWalEnv; - step.cleanupFp = (FnCleanupObj)dnodeCleanupWalEnv; - taosStepAdd(steps, &step); - - step.name = "dnode-sync"; - step.self = NULL; - step.initFp = (FnInitObj)dnodeInitSyncEnv; - step.cleanupFp = (FnCleanupObj)dnodeCleanupSyncEnv; - taosStepAdd(steps, &step); -#endif - - step.name = "dnode-vnode"; - step.self = (void **)&dnode->vnode; - step.initFp = (FnInitObj)dnodeInitVnodeModule; - step.cleanupFp = (FnCleanupObj)dnodeCleanupVnodeModule; - taosStepAdd(steps, &step); - - step.name = "dnode-mnode"; - step.self = (void **)&dnode->mnode; - step.initFp = (FnInitObj)dnodeInitMnodeModule; - step.cleanupFp = (FnCleanupObj)dnodeCleanupMnodeModule; - taosStepAdd(steps, &step); - - step.name = "dnode-trans"; - step.self = (void **)&dnode->trans; - step.initFp = (FnInitObj)dnodeInitTrans; - step.cleanupFp = (FnCleanupObj)dnodeCleanupTrans; - taosStepAdd(steps, &step); - - step.name = "dnode-status"; - step.self = (void **)&dnode->status; - step.initFp = (FnInitObj)dnodeInitStatus; - step.cleanupFp = (FnCleanupObj)dnodeCleanupStatus; - taosStepAdd(steps, &step); - - step.name = "dnode-telem"; - step.self = (void **)&dnode->telem; - step.initFp = (FnInitObj)dnodeInitTelemetry; - step.cleanupFp = (FnCleanupObj)dnodeCleanupTelemetry; - taosStepAdd(steps, &step); - -#if 0 - step.name = "dnode-script"; - step.self = NULL; - step.initFp = (FnInitObj)dnodeInitScriptEnv; - step.cleanupFp = (FnCleanupObj)dnodeCleanupScriptEnv; - taosStepAdd(steps, &step); -#endif +int32_t dnodeInit() { + struct SSteps *steps = taosStepInit(24, dnodeReportStartup); + if (steps == NULL) return -1; + + Dnode *dnode = dnodeInst(); + + taosStepAdd(steps, "dnode-main", (void **)&dnode->main, (InitFp)dnodeInitMain, (CleanupFp)dnodeCleanupMain); + taosStepAdd(steps, "dnode-storage", NULL, (InitFp)dnodeInitStorage, (CleanupFp)dnodeCleanupStorage); + //taosStepAdd(steps, "dnode-tfs-env", NULL, (InitFp)tfInit, (CleanupFp)tfCleanup); + taosStepAdd(steps, "dnode-rpc-env", NULL, (InitFp)rpcInit, (CleanupFp)rpcCleanup); + taosStepAdd(steps, "dnode-check", (void **)&dnode->check, (InitFp)dnodeInitCheck, (CleanupFp)dnodeCleanupCheck); + taosStepAdd(steps, "dnode-cfg", (void **)&dnode->cfg, (InitFp)dnodeInitCfg, (CleanupFp)dnodeCleanupCfg); + taosStepAdd(steps, "dnode-deps", (void **)&dnode->eps, (InitFp)dnodeInitEps, (CleanupFp)dnodeCleanupEps); + taosStepAdd(steps, "dnode-meps", (void **)&dnode->meps, (InitFp)dnodeInitMnodeEps, (CleanupFp)dnodeCleanupMnodeEps); + //taosStepAdd(steps, "dnode-wal", NULL, (InitFp)walInit, (CleanupFp)walCleanUp); + //taosStepAdd(steps, "dnode-sync", NULL, (InitFp)syncInit, (CleanupFp)syncCleanUp); + taosStepAdd(steps, "dnode-vnode", NULL, (InitFp)dnodeInitVnodeModule, (CleanupFp)vnodeCleanup); + taosStepAdd(steps, "dnode-mnode", NULL, (InitFp)dnodeInitMnodeModule, (CleanupFp)mnodeCleanup); + taosStepAdd(steps, "dnode-trans", (void **)&dnode->trans, (InitFp)dnodeInitTrans, (CleanupFp)dnodeCleanupTrans); + taosStepAdd(steps, "dnode-status", (void **)&dnode->status, (InitFp)dnodeInitStatus, (CleanupFp)dnodeCleanupStatus); + taosStepAdd(steps, "dnode-telem", (void **)&dnode->meps, (InitFp)dnodeInitTelem, (CleanupFp)dnodeCleanupTelem); + //taosStepAdd(steps, "dnode-script", NULL, (InitFp)scriptEnvPoolInit, (CleanupFp)scriptEnvPoolCleanup); dnode->steps = steps; taosStepExec(dnode->steps); if (dnode->main) { dnode->main->runStatus = TD_RUN_STAT_RUNNING; - dnodeReportStartupFinished(dnode, "TDengine", "initialized successfully"); + dnodeReportStartupFinished("TDengine", "initialized successfully"); dInfo("TDengine is initialized successfully"); } - return dnode; + return 0; } -void dnodeDropInstance(Dnode *dnode) { +void dnodeCleanup() { + Dnode *dnode = dnodeInst(); if (dnode->main->runStatus != TD_RUN_STAT_STOPPED) { dnode->main->runStatus = TD_RUN_STAT_STOPPED; taosStepCleanup(dnode->steps); diff --git a/source/server/dnode/src/dnodeMain.c b/source/server/dnode/src/dnodeMain.c index 7854fe2b4b..2845192def 100644 --- a/source/server/dnode/src/dnodeMain.c +++ b/source/server/dnode/src/dnodeMain.c @@ -55,11 +55,10 @@ void dnodePrintDiskInfo() { dInfo("=================================="); } -int32_t dnodeInitMain(Dnode *dnode, DnMain **out) { +int32_t dnodeInitMain(DnMain **out) { DnMain* main = calloc(1, sizeof(DnMain)); if (main == NULL) return -1; - main->dnode = dnode; main->runStatus = TD_RUN_STAT_STOPPED; main->dnodeTimer = taosTmrInit(100, 200, 60000, "DND-TMR"); if (main->dnodeTimer == NULL) { @@ -75,9 +74,8 @@ int32_t dnodeInitMain(Dnode *dnode, DnMain **out) { taosResolveCRC(); taosInitGlobalCfg(); taosReadGlobalLogCfg(); -#if 0 taosSetCoreDump(tsEnableCoreFile); -#endif + if (!taosMkDir(tsLogDir)) { printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); @@ -121,7 +119,7 @@ void dnodeCleanupMain(DnMain **out) { free(main); } -int32_t dnodeInitStorage(Dnode *dnode, void **m) { +int32_t dnodeInitStorage() { #ifdef TD_TSZ // compress module init tsCompressInit(); @@ -191,7 +189,7 @@ int32_t dnodeInitStorage(Dnode *dnode, void **m) { return 0; } -void dnodeCleanupStorage(void **m) { +void dnodeCleanupStorage() { #if 0 // storage destroy tfsDestroy(); @@ -203,23 +201,26 @@ void dnodeCleanupStorage(void **m) { #endif } -void dnodeReportStartup(Dnode *dnode, char *name, char *desc) { +void dnodeReportStartup(char *name, char *desc) { + Dnode *dnode = dnodeInst(); SStartupStep *startup = &dnode->main->startup; tstrncpy(startup->name, name, strlen(startup->name)); tstrncpy(startup->desc, desc, strlen(startup->desc)); startup->finished = 0; } -void dnodeReportStartupFinished(Dnode *dnode, char *name, char *desc) { +void dnodeReportStartupFinished(char *name, char *desc) { + Dnode *dnode = dnodeInst(); SStartupStep *startup = &dnode->main->startup; tstrncpy(startup->name, name, strlen(startup->name)); tstrncpy(startup->desc, desc, strlen(startup->desc)); startup->finished = 1; } -void dnodeProcessStartupReq(Dnode *dnode, SRpcMsg *pMsg) { +void dnodeProcessStartupReq(SRpcMsg *pMsg) { dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont); + Dnode *dnode = dnodeInst(); SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep)); memcpy(pStep, &dnode->main->startup, sizeof(SStartupStep)); @@ -230,10 +231,11 @@ void dnodeProcessStartupReq(Dnode *dnode, SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -static int32_t dnodeStartMnode(Dnode *dnode, SRpcMsg *pMsg) { +static int32_t dnodeStartMnode(SRpcMsg *pMsg) { + Dnode *dnode = dnodeInst(); SCreateMnodeMsg *pCfg = pMsg->pCont; pCfg->dnodeId = htonl(pCfg->dnodeId); - if (pCfg->dnodeId != dnodeGetDnodeId(dnode->cfg)) { + if (pCfg->dnodeId != dnode->cfg->dnodeId) { dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId(dnode->cfg)); return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; @@ -252,11 +254,11 @@ static int32_t dnodeStartMnode(Dnode *dnode, SRpcMsg *pMsg) { if (mnodeIsServing(dnode->mnode)) return 0; - return mnodeDeploy(dnode->mnode, &pCfg->mnodes); + return mnodeDeploy(&pCfg->mnodes); } -void dnodeProcessCreateMnodeReq(Dnode *dnode, SRpcMsg *pMsg) { - int32_t code = dnodeStartMnode(dnode, pMsg); +void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) { + int32_t code = dnodeStartMnode(pMsg); SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; @@ -264,7 +266,7 @@ void dnodeProcessCreateMnodeReq(Dnode *dnode, SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -void dnodeProcessConfigDnodeReq(Dnode *dnode, SRpcMsg *pMsg) { +void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) { SCfgDnodeMsg *pCfg = pMsg->pCont; int32_t code = taosCfgDynamicOptions(pCfg->config); diff --git a/source/server/dnode/src/dnodeMnodeEps.c b/source/server/dnode/src/dnodeMnodeEps.c index 9858f9d21f..e072bb5bfd 100644 --- a/source/server/dnode/src/dnodeMnodeEps.c +++ b/source/server/dnode/src/dnodeMnodeEps.c @@ -179,7 +179,7 @@ PARSE_MINFOS_OVER: for (int32_t i = 0; i < mInfos.mnodeNum; ++i) { SMInfo *mInfo = &mInfos.mnodeInfos[i]; - dnodeGetDnodeEp(meps->dnode, mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL); + dnodeGetDnodeEp(mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL); } dnodeResetMnodeEps(meps, &mInfos); @@ -191,8 +191,8 @@ PARSE_MINFOS_OVER: return 0; } -void dnodeSendRedirectMsg(struct Dnode *dnode, SRpcMsg *rpcMsg, bool forShell) { - DnMnEps *meps = dnode->meps; +void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { + DnMnEps *meps = dnodeInst()->meps; SRpcConnInfo connInfo = {0}; rpcGetConnInfo(rpcMsg->handle, &connInfo); @@ -222,17 +222,16 @@ void dnodeSendRedirectMsg(struct Dnode *dnode, SRpcMsg *rpcMsg, bool forShell) { rpcSendRedirectRsp(rpcMsg->handle, &epSet); } -int32_t dnodeInitMnodeEps(Dnode *dnode, DnMnEps **out) { +int32_t dnodeInitMnodeEps(DnMnEps **out) { DnMnEps *meps = calloc(1, sizeof(DnMnEps)); if (meps == NULL) return -1; - meps->dnode = dnode; snprintf(meps->file, sizeof(meps->file), "%s/mnodeEpSet.json", tsDnodeDir); pthread_mutex_init(&meps->mutex, NULL); *out = meps; dnodeResetMnodeEps(meps, NULL); - int32_t ret = dnodeReadMnodeEps(meps, dnode->eps); + int32_t ret = dnodeReadMnodeEps(meps, dnodeInst()->eps); if (ret == 0) { dInfo("dnode mInfos is initialized"); } diff --git a/source/server/dnode/src/dnodeStatus.c b/source/server/dnode/src/dnodeStatus.c index 58effb5b44..308be7d1b6 100644 --- a/source/server/dnode/src/dnodeStatus.c +++ b/source/server/dnode/src/dnodeStatus.c @@ -46,7 +46,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { return; } - Dnode *dnode = status->dnode; + Dnode *dnode = dnodeInst(); dnodeGetCfg(dnode->cfg, &pStatus->dnodeId, pStatus->clusterId); pStatus->dnodeId = htonl(dnodeGetDnodeId(dnode->cfg)); pStatus->version = htonl(tsVersion); @@ -76,16 +76,17 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { pStatus->clusterCfg.slaveQuery = tsEnableSlaveQuery; pStatus->clusterCfg.adjustMaster = tsEnableAdjustMaster; - vnodeGetStatus(dnode->vnode, pStatus); + vnodeGetStatus(pStatus); contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); pStatus->openVnodes = htons(pStatus->openVnodes); SRpcMsg rpcMsg = {.ahandle = status, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS}; - dnodeSendMsgToMnode(status->dnode, &rpcMsg); + dnodeSendMsgToMnode(&rpcMsg); } -void dnodeProcessStatusRsp(Dnode *dnode, SRpcMsg *pMsg) { +void dnodeProcessStatusRsp(SRpcMsg *pMsg) { + Dnode *dnode = dnodeInst(); DnStatus *status = pMsg->ahandle; if (pMsg->code != TSDB_CODE_SUCCESS) { @@ -114,7 +115,7 @@ void dnodeProcessStatusRsp(Dnode *dnode, SRpcMsg *pMsg) { pCfg->dnodeId = htonl(pCfg->dnodeId); dnodeUpdateCfg(dnode->cfg, pCfg); - vnodeSetAccess(dnode->vnode, pStatusRsp->vgAccess, pCfg->numOfVnodes); + vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes); SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess)); dnodeUpdateEps(dnode->eps, pEps); @@ -122,17 +123,14 @@ void dnodeProcessStatusRsp(Dnode *dnode, SRpcMsg *pMsg) { taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, status, status->dnodeTimer, &status->statusTimer); } -int32_t dnodeInitStatus(Dnode *dnode, DnStatus **out) { +int32_t dnodeInitStatus(DnStatus **out) { DnStatus *status = calloc(1, sizeof(DnStatus)); if (status == NULL) return -1; - - status->dnode = dnode; status->statusTimer = NULL; - status->dnodeTimer = dnode->main->dnodeTimer; + status->dnodeTimer = dnodeInst()->main->dnodeTimer; status->rebootTime = taosGetTimestampSec(); taosTmrReset(dnodeSendStatusMsg, 500, status, status->dnodeTimer, &status->statusTimer); *out = status; - dInfo("dnode status timer is initialized"); return TSDB_CODE_SUCCESS; } diff --git a/source/server/dnode/src/dnodeTelemetry.c b/source/server/dnode/src/dnodeTelem.c similarity index 96% rename from source/server/dnode/src/dnodeTelemetry.c rename to source/server/dnode/src/dnodeTelem.c index 5d1d9b4aa4..6b0bbfa6a0 100644 --- a/source/server/dnode/src/dnodeTelemetry.c +++ b/source/server/dnode/src/dnodeTelem.c @@ -18,7 +18,7 @@ #include "tbuffer.h" #include "tglobal.h" #include "dnodeCfg.h" -#include "dnodeTelemetry.h" +#include "dnodeTelem.h" #include "mnode.h" #define TELEMETRY_SERVER "telemetry.taosdata.com" @@ -163,7 +163,7 @@ static void dnodeAddVersionInfo(DnTelem* telem, SBufferWriter* bw) { static void dnodeAddRuntimeInfo(DnTelem* telem, SBufferWriter* bw) { SMnodeStat stat = {0}; - if (mnodeGetStatistics(telem->dnode->mnode, &stat) != 0) { + if (mnodeGetStatistics(&stat) != 0) { return; } @@ -192,9 +192,10 @@ static void dnodeSendTelemetryReport(DnTelem* telem) { return; } + Dnode *dnode = dnodeInst(); SBufferWriter bw = tbufInitWriter(NULL, false); dnodeBeginObject(&bw); - dnodeAddStringField(&bw, "instanceId", telem->dnode->cfg->clusterId); + dnodeAddStringField(&bw, "instanceId", dnode->cfg->clusterId); dnodeAddIntField(&bw, "reportVersion", 1); dnodeAddOsInfo(&bw); dnodeAddCpuInfo(&bw); @@ -243,7 +244,7 @@ static void* dnodeTelemThreadFp(void* param) { if (r == 0) break; if (r != ETIMEDOUT) continue; - if (mnodeIsServing(telem->dnode->mnode)) { + if (mnodeIsServing()) { dnodeSendTelemetryReport(telem); } end.tv_sec += REPORT_INTERVAL; @@ -265,11 +266,10 @@ static void dnodeGetEmail(DnTelem* telem, char* filepath) { taosCloseFile(fd); } -int32_t dnodeInitTelemetry(Dnode* dnode, DnTelem** out) { +int32_t dnodeInitTelem(DnTelem** out) { DnTelem* telem = calloc(1, sizeof(DnTelem)); if (telem == NULL) return -1; - telem->dnode = dnode; telem->enable = tsEnableTelemetryReporting; *out = telem; @@ -296,7 +296,7 @@ int32_t dnodeInitTelemetry(Dnode* dnode, DnTelem** out) { return 0; } -void dnodeCleanupTelemetry(DnTelem** out) { +void dnodeCleanupTelem(DnTelem** out) { DnTelem* telem = *out; *out = NULL; diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index 4d03338835..c6139101bb 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -29,13 +29,13 @@ #include "vnode.h" #include "mnode.h" -static void dnodeProcessPeerReq(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - Dnode * dnode = trans->dnode; +static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + Dnode * dnode = dnodeInst(); SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; if (pMsg->pCont == NULL) return; if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) { - dnodeProcessStartupReq(dnode, pMsg); + dnodeProcessStartupReq(pMsg); return; } @@ -53,9 +53,9 @@ static void dnodeProcessPeerReq(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pEpSet return; } - DnMsgFp fp = trans->fpPeerMsg[pMsg->msgType]; - if (fp.fp != NULL) { - (*fp.fp)(fp.module, pMsg); + RpcMsgFp fp = dnode->trans->peerMsgFp[pMsg->msgType]; + if (fp != NULL) { + (*fp)(pMsg); } else { dDebug("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; @@ -65,41 +65,36 @@ static void dnodeProcessPeerReq(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pEpSet } int32_t dnodeInitServer(DnTrans *trans) { - struct Dnode *dnode = trans->dnode; - struct Vnode *vnode = dnode->vnode; - struct Mnode *mnode = dnode->mnode; - - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CREATE_TABLE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_DROP_TABLE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_ALTER_TABLE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_DROP_STABLE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; - - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CREATE_VNODE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_ALTER_VNODE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_SYNC_VNODE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_DROP_VNODE] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_ALTER_STREAM] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; - - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = (DnMsgFp){.module = dnode, .fp = (RpcMsgFp)dnodeProcessConfigDnodeReq}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CREATE_MNODE] = (DnMsgFp){.module = dnode, .fp = (RpcMsgFp)dnodeProcessCreateMnodeReq}; - - trans->fpPeerMsg[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_DM_AUTH] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_DM_GRANT] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_DM_STATUS] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessMsg; + + trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMsg; + + trans->peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeReq; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeReq; + + trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = tsDnodeDnodePort; rpcInit.label = "DND-S"; rpcInit.numOfThreads = 1; - rpcInit.cfp = (RpcMsgCfp)dnodeProcessPeerReq; + rpcInit.cfp = dnodeProcessPeerReq; rpcInit.sessions = TSDB_MAX_VNODES << 4; rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.owner = trans; trans->serverRpc = rpcOpen(&rpcInit); if (trans->serverRpc == NULL) { @@ -119,8 +114,8 @@ void dnodeCleanupServer(DnTrans *trans) { } } -static void dnodeProcessRspFromPeer(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - Dnode *dnode = trans->dnode; +static void dnodeProcessRspFromPeer(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + Dnode *dnode = dnodeInst(); if (dnode->main->runStatus == TD_RUN_STAT_STOPPED) { if (pMsg == NULL || pMsg->pCont == NULL) return; dTrace("msg:%p is ignored since dnode is stopping", pMsg); @@ -132,9 +127,9 @@ static void dnodeProcessRspFromPeer(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pE dnodeUpdateMnodeFromPeer(dnode->meps, pEpSet); } - DnMsgFp fp = trans->fpPeerMsg[pMsg->msgType]; - if (fp.fp != NULL) { - (*fp.fp)(fp.module, pMsg); + RpcMsgFp fp = dnode->trans->peerMsgFp[pMsg->msgType]; + if (fp != NULL) { + (*fp)(pMsg); } else { dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; @@ -147,43 +142,39 @@ static void dnodeProcessRspFromPeer(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pE } int32_t dnodeInitClient(DnTrans *trans) { - struct Dnode *dnode = trans->dnode; - struct Mnode *mnode = dnode->mnode; - - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - - trans->fpPeerMsg[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_DM_AUTH_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_DM_GRANT_RSP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpPeerMsg[TSDB_MSG_TYPE_DM_STATUS_RSP] = (DnMsgFp){.module = dnode, .fp = (RpcMsgFp)dnodeProcessStatusRsp}; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessMsg; + + trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP] = mnodeProcessMsg; + + trans->peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP] = mnodeProcessMsg; + + trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = mnodeProcessMsg; + trans->peerMsgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; char secret[TSDB_KEY_LEN] = "secret"; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.label = "DND-C"; rpcInit.numOfThreads = 1; - rpcInit.cfp = (RpcMsgCfp)dnodeProcessRspFromPeer; + rpcInit.cfp = dnodeProcessRspFromPeer; rpcInit.sessions = TSDB_MAX_VNODES << 4; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.user = "t"; rpcInit.ckey = "key"; rpcInit.secret = secret; - rpcInit.owner = trans; trans->clientRpc = rpcOpen(&rpcInit); if (trans->clientRpc == NULL) { @@ -203,8 +194,8 @@ void dnodeCleanupClient(DnTrans *trans) { } } -static void dnodeProcessMsgFromShell(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - Dnode * dnode = trans->dnode; +static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + Dnode * dnode = dnodeInst(); SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; if (pMsg->pCont == NULL) return; @@ -222,15 +213,16 @@ static void dnodeProcessMsgFromShell(DnTrans *trans, SRpcMsg *pMsg, SRpcEpSet *p return; } + DnTrans *trans = dnode->trans; if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) { atomic_fetch_add_32(&trans->queryReqNum, 1); } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { atomic_fetch_add_32(&trans->submitReqNum, 1); } else {} - DnMsgFp fp = trans->fpShellMsg[pMsg->msgType]; - if (fp.fp != NULL) { - (*fp.fp)(fp.module, pMsg); + RpcMsgFp fp = trans->shellMsgFp[pMsg->msgType]; + if (fp != NULL) { + (*fp)(pMsg); } else { dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; @@ -254,32 +246,34 @@ static int32_t dnodeAuthNetTest(char *user, char *spi, char *encrypt, char *secr return -1; } -void dnodeSendMsgToDnode(Dnode *dnode, SRpcEpSet *epSet, SRpcMsg *rpcMsg) { +void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { + Dnode *dnode = dnodeInst(); rpcSendRequest(dnode->trans->clientRpc, epSet, rpcMsg, NULL); } -void dnodeSendMsgToMnode(Dnode *dnode, SRpcMsg *rpcMsg) { +void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { + Dnode * dnode = dnodeInst(); SRpcEpSet epSet = {0}; dnodeGetEpSetForPeer(dnode->meps, &epSet); - dnodeSendMsgToDnode(dnode, &epSet, rpcMsg); + dnodeSendMsgToDnode(&epSet, rpcMsg); } -void dnodeSendMsgToMnodeRecv(Dnode *dnode, SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { +void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { + Dnode * dnode = dnodeInst(); SRpcEpSet epSet = {0}; dnodeGetEpSetForPeer(dnode->meps, &epSet); rpcSendRecv(dnode->trans->clientRpc, &epSet, rpcMsg, rpcRsp); } -void dnodeSendMsgToDnodeRecv(Dnode *dnode, SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { +void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { + Dnode *dnode = dnodeInst(); rpcSendRecv(dnode->trans->clientRpc, epSet, rpcMsg, rpcRsp); } -static int32_t dnodeRetrieveUserAuthInfo(void *owner, char *user, char *spi, char *encrypt, char *secret, char *ckey) { - DnTrans *trans = owner; - +static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { if (dnodeAuthNetTest(user, spi, encrypt, secret, ckey) == 0) return 0; - int32_t code = mnodeRetriveAuth(trans->dnode->mnode, user, spi, encrypt, secret, ckey); + int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); if (code != TSDB_CODE_APP_NOT_READY) return code; SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); @@ -292,7 +286,7 @@ static int32_t dnodeRetrieveUserAuthInfo(void *owner, char *user, char *spi, cha dDebug("user:%s, send auth msg to mnodes", user); SRpcMsg rpcRsp = {0}; - dnodeSendMsgToMnodeRecv(trans->dnode, &rpcMsg, &rpcRsp); + dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); @@ -310,55 +304,51 @@ static int32_t dnodeRetrieveUserAuthInfo(void *owner, char *user, char *spi, cha } int32_t dnodeInitShell(DnTrans *trans) { - struct Dnode *dnode = trans->dnode; - struct Vnode *vnode = dnode->vnode; - struct Mnode *mnode = dnode->mnode; - - trans->fpShellMsg[TSDB_MSG_TYPE_SUBMIT] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_QUERY] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_FETCH] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = (DnMsgFp){.module = vnode, .fp = (RpcMsgFp)vnodeProcessMsg}; + trans->shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; // the following message shall be treated as mnode write - trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_ACCT] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_ALTER_ACCT] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_ACCT] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_USER] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_ALTER_USER] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_USER] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_DNODE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_DNODE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_DB] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_TP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_DB] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_SYNC_DB] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_TP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_ALTER_DB] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_ALTER_TP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_CREATE_TABLE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_DROP_TABLE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_ALTER_TABLE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_ALTER_STREAM] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_KILL_QUERY] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_KILL_STREAM] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_KILL_CONN] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessMsg; // the following message shall be treated as mnode query - trans->fpShellMsg[TSDB_MSG_TYPE_CM_HEARTBEAT] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_CONNECT] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_USE_DB] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_TABLE_META] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_TABLES_META] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_SHOW] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_RETRIEVE] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - trans->fpShellMsg[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = (DnMsgFp){.module = mnode, .fp = (RpcMsgFp)mnodeProcessMsg}; - - trans->fpShellMsg[TSDB_MSG_TYPE_NETWORK_TEST] = (DnMsgFp){.module = dnode, .fp = (RpcMsgFp)dnodeProcessStartupReq}; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessMsg; + trans->shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessMsg; + + trans->shellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq; int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); if (numOfThreads < 1) { @@ -370,12 +360,11 @@ int32_t dnodeInitShell(DnTrans *trans) { rpcInit.localPort = tsDnodeShellPort; rpcInit.label = "SHELL"; rpcInit.numOfThreads = numOfThreads; - rpcInit.cfp = (RpcMsgCfp)dnodeProcessMsgFromShell; + rpcInit.cfp = dnodeProcessMsgFromShell; rpcInit.sessions = tsMaxShellConns; rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.afp = dnodeRetrieveUserAuthInfo; - rpcInit.owner = trans; trans->shellRpc = rpcOpen(&rpcInit); if (trans->shellRpc == NULL) { @@ -394,11 +383,10 @@ void dnodeCleanupShell(DnTrans *trans) { } } -int32_t dnodeInitTrans(Dnode *dnode, DnTrans **out) { - DnTrans* trans = calloc(1, sizeof(DnTrans)); +int32_t dnodeInitTrans(DnTrans **out) { + DnTrans *trans = calloc(1, sizeof(DnTrans)); if (trans == NULL) return -1; - trans->dnode = dnode; *out = trans; if (dnodeInitClient(trans) != 0) { diff --git a/source/server/mnode/src/mnodeMain.c b/source/server/mnode/src/mnodeMain.c index 3747534e36..8e5b4692e3 100644 --- a/source/server/mnode/src/mnodeMain.c +++ b/source/server/mnode/src/mnodeMain.c @@ -15,22 +15,18 @@ #include "mnodeInt.h" -struct Mnode *mnodeCreateInstance(SMnodePara para) { - return NULL; -} +int32_t mnodeInit(SMnodePara para) { return 0; } -void mnodeDropInstance(struct Mnode *vnode) {} +void mnodeCleanup() {} -int32_t mnodeDeploy(struct Mnode *mnode, struct SMInfos *minfos) { return 0; } +int32_t mnodeDeploy(struct SMInfos *minfos) { return 0; } -void mnodeUnDeploy(struct Mnode *mnode) {} +void mnodeUnDeploy() {} -bool mnodeIsServing(struct Mnode *mnode) { return false; } +bool mnodeIsServing() { return false; } -int32_t mnodeGetStatistics(struct Mnode *mnode, SMnodeStat *stat) { return 0; } +int32_t mnodeGetStatistics(SMnodeStat *stat) { return 0; } -int32_t mnodeRetriveAuth(struct Mnode *mnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { - return 0; -} +int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { return 0; } -void mnodeProcessMsg(struct Mnode *mnode, SRpcMsg *rpcMsg) {} +void mnodeProcessMsg(SRpcMsg *rpcMsg) {} diff --git a/source/server/server.c b/source/server/server.c index 0e50b1f97f..67e8e7bc58 100644 --- a/source/server/server.c +++ b/source/server/server.c @@ -29,8 +29,8 @@ static void setSignalHandler() { int main(int argc, char const *argv[]) { setSignalHandler(); - struct Dnode *dnode = dnodeCreateInstance(); - if (dnode == NULL) { + int32_t code = dnodeInit(); + if (code != 0) { uInfo("Failed to start TDengine, please check the log at:%s", tsLogDir); exit(EXIT_FAILURE); } @@ -42,7 +42,7 @@ int main(int argc, char const *argv[]) { } uInfo("TDengine is shut down!"); - dnodeDropInstance(dnode); + dnodeCleanup(); return 0; } diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index 0f26041787..ac14d2a756 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -15,16 +15,14 @@ #include "vnodeInt.h" -struct Vnode *vnodeCreateInstance(SVnodePara para) { - return NULL; -} +int32_t vnodeInit(SVnodePara para) { return 0; } -void vnodeDropInstance(struct Vnode *vnode) {} +void vnodeCleanup() {} -int32_t vnodeGetStatistics(struct Vnode *vnode, SVnodeStat *stat) { return 0; } +int32_t vnodeGetStatistics(SVnodeStat *stat) { return 0; } -void vnodeGetStatus(struct Vnode *vnode, struct SStatusMsg *status) {} +void vnodeGetStatus(struct SStatusMsg *status) {} -void vnodeSetAccess(struct Vnode *vnode, struct SVgroupAccess *access, int32_t numOfVnodes) {} +void vnodeSetAccess(struct SVgroupAccess *access, int32_t numOfVnodes) {} -void vnodeProcessMsg(struct Vnode *vnode, SRpcMsg *msg) {} +void vnodeProcessMsg(SRpcMsg *msg) {} diff --git a/source/util/src/tstep.c b/source/util/src/tstep.c index e307977e70..b04135194a 100644 --- a/source/util/src/tstep.c +++ b/source/util/src/tstep.c @@ -18,32 +18,47 @@ #include "ulog.h" #include "tstep.h" -SSteps *taosStepInit(int32_t maxsize) { +typedef struct SStepObj { + char * name; + void ** self; + InitFp initFp; + CleanupFp cleanupFp; +} SStep; + +typedef struct SSteps { + int32_t cursize; + int32_t maxsize; + SStep * steps; + ReportFp reportFp; +} SSteps; + +SSteps *taosStepInit(int32_t maxsize, ReportFp fp) { SSteps *steps = calloc(1, sizeof(SSteps)); if (steps == NULL) return NULL; steps->maxsize = maxsize; steps->cursize = 0; - steps->steps = calloc(maxsize, sizeof(SStepObj)); + steps->steps = calloc(maxsize, sizeof(SStep)); + steps->reportFp = fp; return steps; } -int32_t taosStepAdd(SSteps *steps, SStepObj *step) { - if (steps == NULL) return - 1; - +int32_t taosStepAdd(struct SSteps *steps, char *name, void **obj, InitFp initFp, CleanupFp cleanupFp) { + if (steps == NULL) return -1; if (steps->cursize >= steps->maxsize) { uError("failed to add step since up to the maxsize"); return -1; } - steps->steps[steps->cursize++] = *step; + SStep step = {.name = name, .self = obj, .initFp = initFp, .cleanupFp = cleanupFp}; + steps->steps[steps->cursize++] = step; return 0; } static void taosStepCleanupImp(SSteps *steps, int32_t pos) { for (int32_t s = pos; s >= 0; s--) { - SStepObj *step = steps->steps + s; + SStep *step = steps->steps + s; uDebug("step:%s will cleanup", step->name); if (step->cleanupFp != NULL) { (*step->cleanupFp)(step->self); @@ -55,14 +70,14 @@ int32_t taosStepExec(SSteps *steps) { if (steps == NULL) return -1; for (int32_t s = 0; s < steps->cursize; s++) { - SStepObj *step = steps->steps + s; + SStep *step = steps->steps + s; if (step->initFp == NULL) continue; - if (step->reportFp != NULL) { - (*step->reportFp)(step->parent, step->name, "start initialize"); + if (steps->reportFp != NULL) { + (*steps->reportFp)(step->name, "start initialize"); } - int32_t code = (*step->initFp)(step->parent, step->self); + int32_t code = (*step->initFp)(step->self); if (code != 0) { uDebug("step:%s will cleanup", step->name); taosStepCleanupImp(steps, s); @@ -71,8 +86,8 @@ int32_t taosStepExec(SSteps *steps) { uInfo("step:%s is initialized", step->name); - if (step->reportFp != NULL) { - (*step->reportFp)(step->parent, step->name, "initialize completed"); + if (steps->reportFp != NULL) { + (*steps->reportFp)(step->name, "initialize completed"); } } -- GitLab