diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index e03d3ffd18c2fb9b8a1e7462ac67e3863ea225ab..1ef3bd579fcb32c30ba241e463f45d8612af72e9 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -43,16 +43,24 @@ typedef struct { int64_t compStorage; } SMnodeLoad; +typedef struct SMnode SMnode; +typedef struct SServer SServer; + +typedef void (*SendMsgToDnodeFp)(SServer *pServer, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +typedef void (*SendMsgToMnodeFp)(SServer *pServer, struct SRpcMsg *rpcMsg); +typedef void (*SendRedirectMsgFp)(SServer *pServer, struct SRpcMsg *rpcMsg, bool forShell); +typedef int32_t (*PutMsgToMnodeQFp)(SServer *pServer, SMnodeMsg *pMsg); + typedef struct { - int32_t dnodeId; - int64_t clusterId; - void (*SendMsgToDnode)(struct SEpSet *epSet, struct SRpcMsg *rpcMsg); - void (*SendMsgToMnode)(struct SRpcMsg *rpcMsg); - void (*SendRedirectMsg)(struct SRpcMsg *rpcMsg, bool forShell); - int32_t (*PutMsgIntoApplyQueue)(SMnodeMsg *pMsg); + int32_t dnodeId; + int64_t clusterId; + PutMsgToMnodeQFp putMsgToApplyMsgFp; + SendMsgToDnodeFp sendMsgToDnodeFp; + SendMsgToMnodeFp sendMsgToMnodeFp; + SendRedirectMsgFp sendRedirectMsgFp; } SMnodePara; -int32_t mnodeInit(SMnodePara para); +SMnode* mnodeCreate(SMnodePara para); void mnodeCleanup(); int32_t mnodeDeploy(SMnodeCfg *pCfg); diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index cb514cef53307b510f50ace0e0eaa1f6a148513b..784672e0ecb88875f34f48603ef0f4d9f0a34663 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -144,6 +144,8 @@ typedef struct { SdbDeleteFp deleteFp; } SSdbTable; +typedef struct SSdb SSdb; + int32_t sdbInit(); void sdbCleanup(); void sdbSetTable(SSdbTable table); diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 52470d60a925ddf0ba7d7a7e33548f28ffbb7325..419c9dfcfc9b0168a8e45d30193cc7398bf8ed79 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -184,10 +184,16 @@ typedef struct { SRpcMsg rpcMsg[]; } SVnodeMsg; +typedef struct SServer SServer; +typedef void (*SendMsgToDnodeFp)(SServer *pServer, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +typedef void (*SendMsgToMnodeFp)(SServer *pServer, struct SRpcMsg *rpcMsg); +typedef void (*SendRedirectMsgFp)(SServer *pServer, struct SRpcMsg *rpcMsg, bool forShell); +typedef int32_t (*PutMsgToVnodeQFp)(SServer *pServer, int32_t vgId, SVnodeMsg *pMsg); + typedef struct { - void (*SendMsgToDnode)(SEpSet *pEpSet, SRpcMsg *pMsg); - void (*SendMsgToMnode)(SRpcMsg *pMsg); - int32_t (*PutMsgIntoApplyQueue)(int32_t vgId, SVnodeMsg *pMsg); + PutMsgToVnodeQFp putMsgToApplyQueueFp; + SendMsgToDnodeFp sendMsgToDnodeFp; + SendMsgToMnodeFp sendMsgToMnodeFp; } SVnodePara; int32_t vnodeInit(SVnodePara); diff --git a/source/dnode/mgmt/inc/dnodeDnode.h b/source/dnode/mgmt/inc/dnodeDnode.h index 2ca1368e63254d35cb8d8eddaf0b367c7af7d895..0d1e93e60f55130923c6c6569fc4570751d24c23 100644 --- a/source/dnode/mgmt/inc/dnodeDnode.h +++ b/source/dnode/mgmt/inc/dnodeDnode.h @@ -30,7 +30,7 @@ int64_t dnodeGetClusterId(); void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); void dnodeGetMnodeEpSetForPeer(SEpSet *epSet); void dnodeGetMnodeEpSetForShell(SEpSet *epSet); -void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); +void dnodeSendRedirectMsg(SServer *pServer, SRpcMsg *rpcMsg, bool forShell); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/dnodeInt.h b/source/dnode/mgmt/inc/dnodeInt.h index a4226a47b838736b540ac9505c9de62eb55d55b6..48da1ee558144a7a32603093036fbad492cba2ad 100644 --- a/source/dnode/mgmt/inc/dnodeInt.h +++ b/source/dnode/mgmt/inc/dnodeInt.h @@ -37,6 +37,9 @@ extern int32_t dDebugFlag; typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat; typedef void (*MsgFp)(SRpcMsg *pMsg, SEpSet *pEpSet); +typedef struct SServer { +} SServer; + int32_t dnodeInit(); void dnodeCleanup(); diff --git a/source/dnode/mgmt/inc/dnodeTransport.h b/source/dnode/mgmt/inc/dnodeTransport.h index 95ca1b81e59cc89ff75c22cc458d70cf3ffffe03..4a9518fe093cefa279a75b6930f4cf785e15e0a5 100644 --- a/source/dnode/mgmt/inc/dnodeTransport.h +++ b/source/dnode/mgmt/inc/dnodeTransport.h @@ -23,8 +23,8 @@ extern "C" { int32_t dnodeInitTrans(); void dnodeCleanupTrans(); -void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); -void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg); +void dnodeSendMsgToMnode(SServer *pServer, SRpcMsg *rpcMsg); +void dnodeSendMsgToDnode(SServer *pServer, SEpSet *epSet, SRpcMsg *rpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/src/dnodeDnode.c b/source/dnode/mgmt/src/dnodeDnode.c index 7843075b30041bba1d95c58eb86bbc7ee14df53a..8a326c72d5e2fa23f831a4605f8c9be5040a498a 100644 --- a/source/dnode/mgmt/src/dnodeDnode.c +++ b/source/dnode/mgmt/src/dnodeDnode.c @@ -78,7 +78,7 @@ void dnodeGetMnodeEpSetForShell(SEpSet *pEpSet) { pthread_mutex_unlock(&tsDnode.mutex); } -void dnodeSendRedirectMsg(SRpcMsg *pMsg, bool forShell) { +void dnodeSendRedirectMsg(SServer *pServer, SRpcMsg *pMsg, bool forShell) { int32_t msgType = pMsg->msgType; SEpSet epSet = {0}; @@ -383,7 +383,7 @@ static void dnodeSendStatusMsg() { contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS}; - dnodeSendMsgToMnode(&rpcMsg); + dnodeSendMsgToMnode(NULL, &rpcMsg); } static void dnodeUpdateCfg(SDnodeCfg *pCfg) { diff --git a/source/dnode/mgmt/src/dnodeMnode.c b/source/dnode/mgmt/src/dnodeMnode.c index b6d54a28d2150488c08e8a779643b714022d2654..232af968976ed58716e9f2cbe6a3908b24cdf381 100644 --- a/source/dnode/mgmt/src/dnodeMnode.c +++ b/source/dnode/mgmt/src/dnodeMnode.c @@ -38,6 +38,7 @@ static struct { taos_queue pSyncQ; taos_queue pMgmtQ; SSteps *pSteps; + SMnode *pMnode; SRWLatch latch; } tsMnode = {0}; @@ -360,7 +361,7 @@ void dnodeProcessMnodeWriteMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteMnodeMsgToQueue(tsMnode.pWriteQ, pMsg); dnodeReleaseMnode(); } else { - dnodeSendRedirectMsg(pMsg, 0); + dnodeSendRedirectMsg(NULL, pMsg, 0); } } @@ -381,7 +382,7 @@ void dnodeProcessMnodeReadMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { dnodeWriteMnodeMsgToQueue(tsMnode.pReadQ, pMsg); dnodeReleaseMnode(); } else { - dnodeSendRedirectMsg(pMsg, 0); + dnodeSendRedirectMsg(NULL, pMsg, 0); } } @@ -505,11 +506,15 @@ static int32_t dnodeInitMnodeModule() { SMnodePara para; para.dnodeId = dnodeGetDnodeId(); para.clusterId = dnodeGetClusterId(); - para.SendMsgToDnode = dnodeSendMsgToDnode; - para.SendMsgToMnode = dnodeSendMsgToMnode; - para.SendRedirectMsg = dnodeSendRedirectMsg; + para.sendMsgToDnodeFp = dnodeSendMsgToDnode; + para.sendMsgToMnodeFp = dnodeSendMsgToMnode; + para.sendMsgToMnodeFp = dnodeSendRedirectMsg; - return mnodeInit(para); + tsMnode.pMnode = mnodeCreate(para); + if (tsMnode.pMnode != NULL) { + return -1; + } + return 0; } static void dnodeCleanupMnodeModule() { mnodeCleanup(); } diff --git a/source/dnode/mgmt/src/dnodeTransport.c b/source/dnode/mgmt/src/dnodeTransport.c index ed5650ace87ad7707f4264addca0fb09098bc8ed..b3263aadca74e22151c3d247d9d6e36421d940cd 100644 --- a/source/dnode/mgmt/src/dnodeTransport.c +++ b/source/dnode/mgmt/src/dnodeTransport.c @@ -382,14 +382,14 @@ void dnodeCleanupTrans() { dnodeCleanupClient(); } -void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg) { +void dnodeSendMsgToDnode(SServer *pServer, SEpSet *epSet, SRpcMsg *rpcMsg) { #if 0 rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); #endif } -void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { +void dnodeSendMsgToMnode(SServer *pServer, SRpcMsg *rpcMsg) { SEpSet epSet = {0}; dnodeGetMnodeEpSetForPeer(&epSet); - dnodeSendMsgToDnode(&epSet, rpcMsg); + dnodeSendMsgToDnode(NULL, &epSet, rpcMsg); } \ No newline at end of file diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c index 4ec9e1dc6082119c97fcd52e6a37a3e69f49e204..bd15850c42189084a6a40ade7d20a2e6dcbfb0bf 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/src/dnodeVnodes.c @@ -815,7 +815,7 @@ void dnodeProcessVnodeFetchMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { } } -static int32_t dnodePutMsgIntoVnodeApplyQueue(int32_t vgId, SVnodeMsg *pMsg) { +static int32_t dnodePutMsgIntoVnodeApplyQueue(SServer *pServer, int32_t vgId, SVnodeMsg *pMsg) { SVnodeObj *pVnode = dnodeAcquireVnode(vgId); if (pVnode == NULL) { return terrno; @@ -973,9 +973,9 @@ static void dnodeCleanupVnodeSyncWorker() { tMWorkerCleanup(&tsVnodes.syncPool); static int32_t dnodeInitVnodeModule() { SVnodePara para; - para.SendMsgToDnode = dnodeSendMsgToDnode; - para.SendMsgToMnode = dnodeSendMsgToMnode; - para.PutMsgIntoApplyQueue = dnodePutMsgIntoVnodeApplyQueue; + para.sendMsgToDnodeFp = dnodeSendMsgToDnode; + para.sendMsgToMnodeFp = dnodeSendMsgToMnode; + para.putMsgToApplyQueueFp = dnodePutMsgIntoVnodeApplyQueue; return vnodeInit(para); } diff --git a/source/dnode/mnode/impl/inc/mnodeInt.h b/source/dnode/mnode/impl/inc/mnodeInt.h index 373be6aa84523cea295ea3397d4ac9a2da800534..b0005acc20e456c4b700866e03aa230c999858db 100644 --- a/source/dnode/mnode/impl/inc/mnodeInt.h +++ b/source/dnode/mnode/impl/inc/mnodeInt.h @@ -26,6 +26,16 @@ extern "C" { typedef int32_t (*MnodeRpcFp)(SMnodeMsg *pMsg); +typedef struct SMnodeBak { + int32_t dnodeId; + int64_t clusterId; + tmr_h timer; + SSteps *pInitSteps; + SSteps *pStartSteps; + SMnodePara para; + MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX]; +} SMnodeBak; + typedef struct SMnode { int32_t dnodeId; int64_t clusterId; @@ -34,15 +44,22 @@ typedef struct SMnode { SSteps *pStartSteps; SMnodePara para; MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX]; + + struct SSdb *pSdb; + struct SServer *pServer; + PutMsgToMnodeQFp putMsgToApplyMsgFp; + SendMsgToDnodeFp sendMsgToDnodeFp; + SendMsgToMnodeFp sendMsgToMnodeFp; + SendRedirectMsgFp sendRedirectMsgFp; } SMnode; tmr_h mnodeGetTimer(); int32_t mnodeGetDnodeId(); int64_t mnodeGetClusterId(); -void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg); -void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell); +void mnodeSendMsgToDnode(SMnode *pMnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); +void mnodeSendMsgToMnode(SMnode *pMnode, struct SRpcMsg *rpcMsg); +void mnodeSendRedirectMsg(SMnode *pMnode, struct SRpcMsg *rpcMsg, bool forShell); void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index ae909917de1fd7fd722c3a04bf43cf42e6b522e0..9ea4ebe0e606a067b0c7ee647363c3f83731cad8 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -35,17 +35,26 @@ #include "mnodeVgroup.h" #include "mnodeTrans.h" -SMnode tsMint = {0}; +SMnodeBak tsMint = {0}; int32_t mnodeGetDnodeId() { return tsMint.para.dnodeId; } int64_t mnodeGetClusterId() { return tsMint.para.clusterId; } -void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg) { (*tsMint.para.SendMsgToDnode)(epSet, rpcMsg); } +void mnodeSendMsgToDnode(SMnode *pMnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg) { + assert(pMnode); + (*pMnode->sendMsgToDnodeFp)(pMnode->pServer, epSet, rpcMsg); +} -void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg) { return (*tsMint.para.SendMsgToMnode)(rpcMsg); } +void mnodeSendMsgToMnode(SMnode *pMnode, struct SRpcMsg *rpcMsg) { + assert(pMnode); + (*pMnode->sendMsgToMnodeFp)(pMnode->pServer, rpcMsg); +} -void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell) { (*tsMint.para.SendRedirectMsg)(rpcMsg, forShell); } +void mnodeSendRedirectMsg(SMnode *pMnode, struct SRpcMsg *rpcMsg, bool forShell) { + assert(pMnode); + (*pMnode->sendRedirectMsgFp)(pMnode->pServer, rpcMsg, forShell); +} static int32_t mnodeInitTimer() { if (tsMint.timer == NULL) { @@ -68,35 +77,40 @@ static void mnodeCleanupTimer() { tmr_h mnodeGetTimer() { return tsMint.timer; } -static int32_t mnodeSetPara(SMnodePara para) { - tsMint.para = para; +static int32_t mnodeSetPara(SMnode *pMnode, SMnodePara para) { + pMnode->dnodeId = para.dnodeId; + pMnode->clusterId = para.clusterId; + pMnode->putMsgToApplyMsgFp = para.putMsgToApplyMsgFp; + pMnode->sendMsgToDnodeFp = para.sendMsgToDnodeFp; + pMnode->sendMsgToMnodeFp = para.sendMsgToMnodeFp; + pMnode->sendRedirectMsgFp = para.sendRedirectMsgFp; - if (tsMint.para.SendMsgToDnode == NULL) { + if (pMnode->sendMsgToDnodeFp == NULL) { terrno = TSDB_CODE_MND_APP_ERROR; return -1; } - if (tsMint.para.SendMsgToMnode == NULL) { + if (pMnode->sendMsgToMnodeFp == NULL) { terrno = TSDB_CODE_MND_APP_ERROR; return -1; } - if (tsMint.para.SendRedirectMsg == NULL) { + if (pMnode->sendRedirectMsgFp == NULL) { terrno = TSDB_CODE_MND_APP_ERROR; return -1; } - if (tsMint.para.PutMsgIntoApplyQueue == NULL) { + if (pMnode->putMsgToApplyMsgFp == NULL) { terrno = TSDB_CODE_MND_APP_ERROR; return -1; } - if (tsMint.para.dnodeId < 0) { + if (pMnode->dnodeId < 0) { terrno = TSDB_CODE_MND_APP_ERROR; return -1; } - if (tsMint.para.clusterId < 0) { + if (pMnode->clusterId < 0) { terrno = TSDB_CODE_MND_APP_ERROR; return -1; } @@ -142,23 +156,27 @@ static int32_t mnodeAllocStartSteps() { return 0; } -int32_t mnodeInit(SMnodePara para) { - if (mnodeSetPara(para) != 0) { +SMnode *mnodeCreate(SMnodePara para) { + SMnode *pMnode = calloc(1, sizeof(SMnode)); + + if (mnodeSetPara(pMnode, para) != 0) { + free(pMnode); mError("failed to init mnode para since %s", terrstr()); - return -1; + return NULL; } if (mnodeAllocInitSteps() != 0) { mError("failed to alloc init steps since %s", terrstr()); - return -1; + return NULL; } if (mnodeAllocStartSteps() != 0) { mError("failed to alloc start steps since %s", terrstr()); - return -1; + return NULL; } - return taosStepExec(tsMint.pInitSteps); + taosStepExec(tsMint.pInitSteps); + return NULL; } void mnodeCleanup() { taosStepCleanup(tsMint.pInitSteps); } @@ -234,7 +252,7 @@ void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp) { void mnodeProcessMsg(SMnodeMsg *pMsg, EMnMsgType msgType) { if (!mnodeIsMaster()) { - mnodeSendRedirectMsg(&pMsg->rpcMsg, true); + mnodeSendRedirectMsg(NULL, &pMsg->rpcMsg, true); mnodeCleanupMsg(pMsg); return; } diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index aa0b3c8a58560e826bcf9f55fbf4ebcc24e7272b..2b3c577ba9bcb2ee5d5750e50f228223559a5b61 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -52,7 +52,7 @@ typedef struct SSdbRow { char pObj[]; } SSdbRow; -typedef struct { +typedef struct SSdb { char *currDir; char *syncDir; char *tmpDir; @@ -67,9 +67,9 @@ typedef struct { SdbDeployFp deployFps[SDB_MAX]; SdbEncodeFp encodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX]; -} SSdbMgr; +} SSdb; -extern SSdbMgr tsSdb; +extern SSdb tsSdb; int32_t sdbWriteImp(SSdbRaw *pRaw); diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 8e9b7fbeccbf9e626715db812268fe77dfe7bdd5..83496f379454be4874d196b99d886b11239b064e 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -17,7 +17,7 @@ #include "sdbInt.h" #include "tglobal.h" -SSdbMgr tsSdb = {0}; +SSdb tsSdb = {0}; int32_t sdbInit() { char path[PATH_MAX + 100];