diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 94e2bdf7e122b7a9ef80a82148dec9a2ac5328f2..841824a7c75c4d5c9f2fb858e7690f29d266ca6b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -404,7 +404,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { return -1; } - tsNumOfThreadsPerCore = cfgGetItem(pCfg, "maxTmrCtrl")->fval; + tsNumOfThreadsPerCore = cfgGetItem(pCfg, "numOfThreadsPerCore")->fval; tsMaxTmrCtrl = cfgGetItem(pCfg, "maxTmrCtrl")->i32; tsRpcTimer = cfgGetItem(pCfg, "rpcTimer")->i32; tsRpcMaxTime = cfgGetItem(pCfg, "rpcMaxTime")->i32; diff --git a/source/dnode/mgmt/bnode/inc/bmHandle.h b/source/dnode/mgmt/bnode/inc/bmHandle.h index a24884849c36359d82d1d88b947ec1a48239e6da..b4fecd36c4f8d6a948e44d29c1aa27ae1de9e9d5 100644 --- a/source/dnode/mgmt/bnode/inc/bmHandle.h +++ b/source/dnode/mgmt/bnode/inc/bmHandle.h @@ -25,6 +25,10 @@ extern "C" { void bmInitMsgHandles(SMgmtWrapper *pWrapper); SMsgHandle bmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); +int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); + + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/bnode/inc/bmInt.h b/source/dnode/mgmt/bnode/inc/bmInt.h index 1390980423547563977d015f22a8cd9de6c8ee6f..55fde8d4f850dfe0550e7f1383bb3cb449a3b47b 100644 --- a/source/dnode/mgmt/bnode/inc/bmInt.h +++ b/source/dnode/mgmt/bnode/inc/bmInt.h @@ -37,14 +37,14 @@ typedef struct SBnodeMgmt { bool singleProc; } SBnodeMgmt; -SMgmtFp bmGetMgmtFp(); +void bmGetMgmtFp(SMgmtWrapper *pMgmt); int32_t dndInitBnode(SDnode *pDnode); void dndCleanupBnode(SDnode *pDnode); void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/bnode/inc/bmWorker.h b/source/dnode/mgmt/bnode/inc/bmWorker.h index 3a5c2e7169480f568f724a7f807d3ebe1de7b657..ab45c5340828476c8698f4eb4eabccb702478635 100644 --- a/source/dnode/mgmt/bnode/inc/bmWorker.h +++ b/source/dnode/mgmt/bnode/inc/bmWorker.h @@ -31,9 +31,9 @@ int32_t bmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); void bmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); void bmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); -void bmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void bmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void bmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +void bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +void bmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +void bmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/bnode/src/bmHandle.c b/source/dnode/mgmt/bnode/src/bmHandle.c index fd90e0c915a87771bd68de934907ca3da9473e4f..100921833a1d816afeb128cf8b0337bf1f30fb6a 100644 --- a/source/dnode/mgmt/bnode/src/bmHandle.c +++ b/source/dnode/mgmt/bnode/src/bmHandle.c @@ -17,6 +17,10 @@ #include "bmHandle.h" #include "bmWorker.h" +int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} +int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} + + void bmInitMsgHandles(SMgmtWrapper *pWrapper) { } diff --git a/source/dnode/mgmt/bnode/src/bmInt.c b/source/dnode/mgmt/bnode/src/bmInt.c index 300dc1ec8df8577fe3e698204e8511daf8f2dc9f..7ac360adc5ed68a9405e05dc8103ccff0d9bd0ce 100644 --- a/source/dnode/mgmt/bnode/src/bmInt.c +++ b/source/dnode/mgmt/bnode/src/bmInt.c @@ -19,12 +19,14 @@ bool bmRequireNode(SMgmtWrapper *pWrapper) { return false; } - -SMgmtFp bmGetMgmtFp() { +void bmGetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = NULL; mgmtFp.closeFp = NULL; mgmtFp.requiredFp = bmRequireNode; mgmtFp.getMsgHandleFp = bmGetMsgHandle; - return mgmtFp; + + // bmInitMsgHandles(pWrapper); + pWrapper->name = "snode"; + pWrapper->fp = mgmtFp; } diff --git a/source/dnode/mgmt/bnode/src/bmMgmt.c b/source/dnode/mgmt/bnode/src/bmMgmt.c index 003cf19c628af0618ebe2df199719fd5512a8fa0..6c86850d3bbe5221832e9261855a054069f5085f 100644 --- a/source/dnode/mgmt/bnode/src/bmMgmt.c +++ b/source/dnode/mgmt/bnode/src/bmMgmt.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE // #include "dndBnode.h" -// #include "dmMgmt.h" +// #include "dmInt.h" // #include "dndTransport.h" // #include "dndWorker.h" @@ -261,7 +261,7 @@ static int32_t dndDropBnode(SDnode *pDnode) { return 0; } -int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t bmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { SDCreateBnodeReq createReq = {0}; if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -277,7 +277,7 @@ int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pReq) { } } -int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t bmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { SDDropBnodeReq dropReq = {0}; if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index 49e9548d9a865cd081f053b3689474a5850f6792..1af2e0f24afe77b79fec5b2eab9c76070e58f7db 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -72,8 +72,8 @@ typedef struct SQnodeMgmt SQnodeMgmt; typedef struct SSnodeMgmt SSnodeMgmt; typedef struct SBnodeMgmt SBnodeMgmt; -typedef void (*RpcMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEps); -typedef void (*NodeMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +typedef void (*RpcMsgFp)(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEps); +typedef void (*NodeMsgFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper); typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper); typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper); @@ -132,7 +132,6 @@ typedef struct SDnode { SStartupReq startup; TdFilePtr pLockFile; STransMgmt trans; - SMgmtFp fps[NODE_MAX]; SMgmtWrapper wrappers[NODE_MAX]; } SDnode; @@ -155,7 +154,7 @@ SDnode *dndCreate(SDndCfg *pCfg); void dndClose(SDnode *pDnode); int32_t dndRun(SDnode *pDnode); void dndeHandleEvent(SDnode *pDnode, EDndEvent event); -void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); // dndTransport.h int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/container/inc/dndNode.h b/source/dnode/mgmt/container/inc/dndNode.h index 9e5dff6e25e8332bb87c4053919e726fcd3b6ded..7a922480216e6b623ff4315903b9af09137a66dd 100644 --- a/source/dnode/mgmt/container/inc/dndNode.h +++ b/source/dnode/mgmt/container/inc/dndNode.h @@ -26,7 +26,7 @@ SDnode *dndCreate(SDndCfg *pCfg); void dndClose(SDnode *pDnode); int32_t dndRun(SDnode *pDnode); void dndeHandleEvent(SDnode *pDnode, EDndEvent event); -void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/container/src/dndNode.c b/source/dnode/mgmt/container/src/dndNode.c index 9bb241f45ace32b9b4b39db8b8007328e48a184b..cb82e88d148fdd6bdb0e4996d4a6e41a3aa8cebc 100644 --- a/source/dnode/mgmt/container/src/dndNode.c +++ b/source/dnode/mgmt/container/src/dndNode.c @@ -90,18 +90,12 @@ SDnode *dndCreate(SDndCfg *pCfg) { goto _OVER; } - pDnode->wrappers[DNODE].fp = dmGetMgmtFp(); - pDnode->wrappers[MNODE].fp = mmGetMgmtFp(); - pDnode->wrappers[VNODES].fp = vmGetMgmtFp(); - pDnode->wrappers[QNODE].fp = qmGetMgmtFp(); - pDnode->wrappers[SNODE].fp = smGetMgmtFp(); - pDnode->wrappers[BNODE].fp = bmGetMgmtFp(); - pDnode->wrappers[DNODE].name = "dnode"; - pDnode->wrappers[MNODE].name = "mnode"; - pDnode->wrappers[VNODES].name = "vnodes"; - pDnode->wrappers[QNODE].name = "qnode"; - pDnode->wrappers[SNODE].name = "snode"; - pDnode->wrappers[BNODE].name = "bnode"; + dmGetMgmtFp(&pDnode->wrappers[DNODE]); + mmGetMgmtFp(&pDnode->wrappers[MNODE]); + vmGetMgmtFp(&pDnode->wrappers[VNODES]); + qmGetMgmtFp(&pDnode->wrappers[QNODE]); + smGetMgmtFp(&pDnode->wrappers[SNODE]); + bmGetMgmtFp(&pDnode->wrappers[BNODE]); memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg)); if (dndSetMsgHandle(pDnode) != 0) { @@ -263,5 +257,8 @@ void dndeHandleEvent(SDnode *pDnode, EDndEvent event) { pDnode->event = event; } - -void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet) {} \ No newline at end of file +void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet) { + if (pEpSet && pEpSet->numOfEps > 0 && pMsg->msgType == TDMT_MND_STATUS_RSP) { + dmUpdateMnodeEpSet(pWrapper->pDnode, pEpSet); + } +} \ No newline at end of file diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index d99ff25a4f91e950f712188f14764f2d899c2f73..f4aa43f39c1448d622dfcc5e1984f37a6deda9db 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -39,7 +39,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { if (pHandle->rpcMsgFp != NULL) { dTrace("RPC %p, rsp:%s will be processed by %s, code:0x%x app:%p", pRsp->handle, TMSG_INFO(msgType), pHandle->pWrapper->name, pRsp->code & 0XFFFF, pRsp->ahandle); - (*pHandle->rpcMsgFp)(pDnode, pHandle->pWrapper, pRsp, pEpSet); + (*pHandle->rpcMsgFp)(pHandle->pWrapper, pRsp, pEpSet); } else { dError("RPC %p, rsp:%s not processed, app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->ahandle); rpcFreeCont(pRsp->pCont); @@ -121,7 +121,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { if (pHandle->rpcMsgFp != NULL) { dTrace("RPC %p, req:%s will be processed by %s, app:%p", pReq->handle, TMSG_INFO(msgType), pHandle->pWrapper->name, pReq->ahandle); - (*pHandle->rpcMsgFp)(pDnode, pHandle->pWrapper, pReq, pEpSet); + (*pHandle->rpcMsgFp)(pHandle->pWrapper, pReq, pEpSet); } else { dError("RPC %p, req:%s not processed since no handle, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle}; @@ -251,7 +251,7 @@ int32_t dndSetMsgHandle(SDnode *pDnode) { for (ENodeType nodeType = 0; nodeType < NODE_MAX; ++nodeType) { SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType]; - GetMsgHandleFp getMsgHandleFp = pDnode->fps[nodeType].getMsgHandleFp; + GetMsgHandleFp getMsgHandleFp = pWrapper->fp.getMsgHandleFp; if (getMsgHandleFp == NULL) continue; for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { diff --git a/source/dnode/mgmt/dnode/inc/dmHandle.h b/source/dnode/mgmt/dnode/inc/dmHandle.h index 873423c9b2db41eab1fc89800e43accae9def139..fde1d8b2d77d600142ae4dd267a4b61dadcf3326 100644 --- a/source/dnode/mgmt/dnode/inc/dmHandle.h +++ b/source/dnode/mgmt/dnode/inc/dmHandle.h @@ -25,8 +25,12 @@ extern "C" { void dmInitMsgHandles(SMgmtWrapper *pWrapper); SMsgHandle dmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); -void dmSendStatusReq(SDnode *pDnode); -void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); +void dmSendStatusReq(SDnodeMgmt *pMgmt); +void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); +int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq); +void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp); +void dmProcessAuthRsp(SDnode *pDnode, SRpcMsg *pRsp); +void dmProcessGrantRsp(SDnode *pDnode, SRpcMsg *pRsp); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/dnode/inc/dmInt.h b/source/dnode/mgmt/dnode/inc/dmInt.h index 0c0208be646076a3145feae2d9f3cb676cb25a6e..1e5fcd33ed0a1d92b1aec99e5e9e32414f59b03a 100644 --- a/source/dnode/mgmt/dnode/inc/dmInt.h +++ b/source/dnode/mgmt/dnode/inc/dmInt.h @@ -26,6 +26,8 @@ typedef struct SDnodeMgmt { int32_t dnodeId; int32_t dropped; int64_t clusterId; + char localEp[TSDB_EP_LEN]; + char firstEp[TSDB_EP_LEN]; int64_t dver; int64_t updateTime; int8_t statusSent; @@ -42,12 +44,13 @@ typedef struct SDnodeMgmt { } SDnodeMgmt; // dmInt.h -SMgmtFp dmGetMgmtFp(); +void dmGetMgmtFp(SMgmtWrapper *pWrapper); int32_t dmGetDnodeId(SDnode *pDnode); int64_t dmGetClusterId(SDnode *pDnode); - -// dmMgmt.h -void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); +void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); +void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); +void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); +void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg); // dmHandle.h void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/dnode/inc/dmMgmt.h b/source/dnode/mgmt/dnode/inc/dmMgmt.h deleted file mode 100644 index a5fad093bb3d968fb28945863750d26c82eb3c20..0000000000000000000000000000000000000000 --- a/source/dnode/mgmt/dnode/inc/dmMgmt.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DND_DNODE_MGMT_H_ -#define _TD_DND_DNODE_MGMT_H_ - -#include "dmInt.h" - -#ifdef __cplusplus -extern "C" { -#endif - -int32_t dmInit(SMgmtWrapper *pWrapper); -void dmCleanup(SMgmtWrapper *pWrapper); -bool dmRequire(SMgmtWrapper *pWrapper); - -void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); -void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); - -void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg); -void dmProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg) ; - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DND_DNODE_MGMT_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/inc/dmWorker.h b/source/dnode/mgmt/dnode/inc/dmWorker.h index f809df1d09d3a3d53c0b88cd334f1d4dd09b2bbe..60c5458e9c129646eecee2d2f6865b1155872645 100644 --- a/source/dnode/mgmt/dnode/inc/dmWorker.h +++ b/source/dnode/mgmt/dnode/inc/dmWorker.h @@ -24,6 +24,7 @@ extern "C" { int32_t dmStartWorker(); void dmStopWorker(); +void dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/dnode/src/dmHandle.c b/source/dnode/mgmt/dnode/src/dmHandle.c index cca12e954eab577586a8276e25ee7e58df300279..2fa3c92967363661df87769a08b70fe9ac1c3437 100644 --- a/source/dnode/mgmt/dnode/src/dmHandle.c +++ b/source/dnode/mgmt/dnode/src/dmHandle.c @@ -15,8 +15,104 @@ #define _DEFAULT_SOURCE #include "dmHandle.h" -#include "dndWorker.h" -#include "dmMgmt.h" +#include "dmFile.h" +#include "dmWorker.h" +#include "vmInt.h" + +void dmSendStatusReq(SDnodeMgmt *pMgmt) { + SDnode *pDnode = pMgmt->pDnode; + SStatusReq req = {0}; + + taosRLockLatch(&pMgmt->latch); + req.sver = tsVersion; + req.dver = pMgmt->dver; + req.dnodeId = pMgmt->dnodeId; + req.clusterId = pMgmt->clusterId; + req.rebootTime = pDnode->rebootTime; + req.updateTime = pMgmt->updateTime; + req.numOfCores = tsNumOfCores; + req.numOfSupportVnodes = pDnode->cfg.numOfSupportVnodes; + memcpy(req.dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN); + + req.clusterCfg.statusInterval = tsStatusInterval; + req.clusterCfg.checkTime = 0; + char timestr[32] = "1970-01-01 00:00:00.00"; + (void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + memcpy(req.clusterCfg.timezone, tsTimezone, TD_TIMEZONE_LEN); + memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN); + memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); + taosRUnLockLatch(&pMgmt->latch); + + req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad)); + vmGetVnodeLoads(pDnode, req.pVloads); + + int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); + void *pHead = rpcMallocCont(contLen); + tSerializeSStatusReq(pHead, contLen, &req); + taosArrayDestroy(req.pVloads); + + SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; + pMgmt->statusSent = 1; + + dTrace("pDnode:%p, send status req to mnode", pDnode); + dndSendReqToMnode(pMgmt->pDnode, &rpcMsg); +} + +static void dndUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { + if (pMgmt->dnodeId == 0) { + dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); + taosWLockLatch(&pMgmt->latch); + pMgmt->dnodeId = pCfg->dnodeId; + pMgmt->clusterId = pCfg->clusterId; + dmWriteFile(pMgmt); + taosWUnLockLatch(&pMgmt->latch); + } +} + +void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { + SDnodeMgmt *pMgmt = dndGetWrapper(pDnode, MNODE)->pMgmt; + + if (pRsp->code != TSDB_CODE_SUCCESS) { + if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) { + dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId); + pMgmt->dropped = 1; + dmWriteFile(pMgmt); + } + } else { + SStatusRsp statusRsp = {0}; + if (pRsp->pCont != NULL && pRsp->contLen != 0 && + tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { + pMgmt->dver = statusRsp.dver; + dndUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); + dmUpdateDnodeEps(pMgmt, statusRsp.pDnodeEps); + } + taosArrayDestroy(statusRsp.pDnodeEps); + } + + pMgmt->statusSent = 0; +} + +void dmProcessAuthRsp(SDnode *pDnode, SRpcMsg *pReq) { dError("auth rsp is received, but not supported yet"); } + +void dmProcessGrantRsp(SDnode *pDnode, SRpcMsg *pReq) { dError("grant rsp is received, but not supported yet"); } + +int32_t dmProcessConfigReq(SDnode *pDnode, SRpcMsg *pReq) { + dError("config req is received, but not supported yet"); + SDCfgDnodeReq *pCfg = pReq->pCont; + return TSDB_CODE_OPS_NOT_SUPPORT; +} + +void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { + dDebug("startup req is received"); + + SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); + dndGetStartup(pDnode, pStartup); + + dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); + + SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)}; + rpcSendResponse(&rpcRsp); +} static void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) { SDnodeMgmt *pMgmt = pWrapper->pMgmt; diff --git a/source/dnode/mgmt/dnode/src/dmInt.c b/source/dnode/mgmt/dnode/src/dmInt.c index ee034602bfb861b78be0ccf176bc731f7b14f830..916a7f87406795c7ac54364f79f1b2b932e73516 100644 --- a/source/dnode/mgmt/dnode/src/dmInt.c +++ b/source/dnode/mgmt/dnode/src/dmInt.c @@ -15,8 +15,9 @@ #define _DEFAULT_SOURCE #include "dmInt.h" +#include "dmFile.h" #include "dmHandle.h" -#include "dmMgmt.h" +#include "dmWorker.h" int32_t dmGetDnodeId(SDnode *pDnode) { SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE); @@ -38,11 +39,147 @@ int64_t dmGetClusterId(SDnode *pDnode) { return clusterId; } -SMgmtFp dmGetMgmtFp() { +void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { + SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE); + SDnodeMgmt *pMgmt = pWrapper->pMgmt; + + taosRLockLatch(&pMgmt->latch); + *pEpSet = pMgmt->mnodeEpSet; + taosRUnLockLatch(&pMgmt->latch); +} + +void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { + dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); + + SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE); + SDnodeMgmt *pMgmt = pWrapper->pMgmt; + + taosWLockLatch(&pMgmt->latch); + + pMgmt->mnodeEpSet = *pEpSet; + for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { + dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); + } + + taosWUnLockLatch(&pMgmt->latch); +} + +void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { + SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE); + SDnodeMgmt *pMgmt = pWrapper->pMgmt; + + taosRLockLatch(&pMgmt->latch); + + SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t)); + if (pDnodeEp != NULL) { + if (pPort != NULL) { + *pPort = pDnodeEp->ep.port; + } + if (pFqdn != NULL) { + tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN); + } + if (pEp != NULL) { + snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port); + } + } + + taosRUnLockLatch(&pMgmt->latch); +} + +void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) { + tmsg_t msgType = pReq->msgType; + + SEpSet epSet = {0}; + dmGetMnodeEpSet(pDnode, &epSet); + + dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse); + for (int32_t i = 0; i < epSet.numOfEps; ++i) { + dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); + if (strcmp(epSet.eps[i].fqdn, pDnode->cfg.localFqdn) == 0 && epSet.eps[i].port == pDnode->cfg.serverPort) { + epSet.inUse = (i + 1) % epSet.numOfEps; + } + + epSet.eps[i].port = htons(epSet.eps[i].port); + } + + rpcSendRedirectRsp(pReq->handle, &epSet); +} + +int32_t dmInit(SMgmtWrapper *pWrapper) { + SDnode *pDnode = pWrapper->pDnode; + SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt)); + + pMgmt->dnodeId = 0; + pMgmt->dropped = 0; + pMgmt->clusterId = 0; + pMgmt->path = pWrapper->path; + pMgmt->pDnode = pDnode; + memcpy(pMgmt->localEp, pDnode->cfg.localEp, TSDB_EP_LEN); + memcpy(pMgmt->firstEp, pDnode->cfg.firstEp, TSDB_EP_LEN); + taosInitRWLatch(&pMgmt->latch); + + pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + if (pMgmt->dnodeHash == NULL) { + dError("node:%s, failed to init dnode hash", pWrapper->name); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (dmReadFile(pMgmt) != 0) { + dError("node:%s, failed to read file since %s", pWrapper->name, terrstr()); + return -1; + } + + if (pMgmt->dropped) { + dError("node:%s, will not start since its already dropped", pWrapper->name); + return -1; + } + + if (dmStartWorker(pMgmt) != 0) { + dError("node:%s, failed to start worker since %s", pWrapper->name, terrstr()); + return -1; + } + + dndSetStatus(pDnode, DND_STAT_RUNNING); + dmSendStatusReq(pMgmt); + dndReportStartup(pDnode, "TDengine", "initialized successfully"); + + dInfo("dnode-mgmt is initialized"); + return 0; +} + +void dmCleanup(SMgmtWrapper *pWrapper) { + SDnodeMgmt *pMgmt = pWrapper->pMgmt; + if (pMgmt == NULL) return; + + dmStopWorker(pMgmt); + + taosWLockLatch(&pMgmt->latch); + + if (pMgmt->pDnodeEps != NULL) { + taosArrayDestroy(pMgmt->pDnodeEps); + pMgmt->pDnodeEps = NULL; + } + + if (pMgmt->dnodeHash != NULL) { + taosHashCleanup(pMgmt->dnodeHash); + pMgmt->dnodeHash = NULL; + } + + taosWUnLockLatch(&pMgmt->latch); + dInfo("dnode-mgmt is cleaned up"); +} + +bool dmRequire(SMgmtWrapper *pWrapper) { return true; } + +void dmGetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = dmInit; mgmtFp.closeFp = dmCleanup; mgmtFp.requiredFp = dmRequire; mgmtFp.getMsgHandleFp = dmGetMsgHandle; - return mgmtFp; + + dmInitMsgHandles(pWrapper); + pWrapper->name = "dnode"; + pWrapper->fp = mgmtFp; } diff --git a/source/dnode/mgmt/dnode/src/dmMgmt.c b/source/dnode/mgmt/dnode/src/dmMgmt.c deleted file mode 100644 index f377c56b017085870528811c222191580f146029..0000000000000000000000000000000000000000 --- a/source/dnode/mgmt/dnode/src/dmMgmt.c +++ /dev/null @@ -1,361 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dmMgmt.h" -#include "dmWorker.h" -// #include "dmMgmt.h" -#include "dmFile.h" -#include "dmHandle.h" -#include "dndMonitor.h" -// #include "dndBnode.h" -// #include "mm.h" -// #include "dndQnode.h" -// #include "dndSnode.h" -#include "dndTransport.h" -// #include "dndVnodes.h" -#include "dndWorker.h" -// #include "monitor.h" - -#if 0 -static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); - - -static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq); -static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp); -static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pRsp); -static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pRsp); - -void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - taosRLockLatch(&pMgmt->latch); - - SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t)); - if (pDnodeEp != NULL) { - if (pPort != NULL) { - *pPort = pDnodeEp->ep.port; - } - if (pFqdn != NULL) { - tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN); - } - if (pEp != NULL) { - snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port); - } - } - - taosRUnLockLatch(&pMgmt->latch); -} - -void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - taosRLockLatch(&pMgmt->latch); - *pEpSet = pMgmt->mnodeEpSet; - taosRUnLockLatch(&pMgmt->latch); -} - -void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) { - tmsg_t msgType = pReq->msgType; - - SEpSet epSet = {0}; - dmGetMnodeEpSet(pDnode, &epSet); - - dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse); - for (int32_t i = 0; i < epSet.numOfEps; ++i) { - dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); - if (strcmp(epSet.eps[i].fqdn, pDnode->cfg.localFqdn) == 0 && epSet.eps[i].port == pDnode->cfg.serverPort) { - epSet.inUse = (i + 1) % epSet.numOfEps; - } - - epSet.eps[i].port = htons(epSet.eps[i].port); - } - - rpcSendRedirectRsp(pReq->handle, &epSet); -} - -static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { - dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); - - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - taosWLockLatch(&pMgmt->latch); - - pMgmt->mnodeEpSet = *pEpSet; - for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { - dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); - } - - taosWUnLockLatch(&pMgmt->latch); -} - - -void dmSendStatusReq(SDnode *pDnode) { - SStatusReq req = {0}; - - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - taosRLockLatch(&pMgmt->latch); - req.sver = tsVersion; - req.dver = pMgmt->dver; - req.dnodeId = pMgmt->dnodeId; - req.clusterId = pMgmt->clusterId; - req.rebootTime = pMgmt->rebootTime; - req.updateTime = pMgmt->updateTime; - req.numOfCores = tsNumOfCores; - req.numOfSupportVnodes = pDnode->cfg.numOfSupportVnodes; - memcpy(req.dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN); - - req.clusterCfg.statusInterval = tsStatusInterval; - req.clusterCfg.checkTime = 0; - char timestr[32] = "1970-01-01 00:00:00.00"; - (void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); - memcpy(req.clusterCfg.timezone, tsTimezone, TD_TIMEZONE_LEN); - memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN); - memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); - taosRUnLockLatch(&pMgmt->latch); - -#if 0 - req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad)); - dndGetVnodeLoads(pDnode, req.pVloads); -#endif - - int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); - void *pHead = rpcMallocCont(contLen); - tSerializeSStatusReq(pHead, contLen, &req); - taosArrayDestroy(req.pVloads); - - SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; - pMgmt->statusSent = 1; - - dTrace("pDnode:%p, send status req to mnode", pDnode); - dndSendReqToMnode(pDnode, &rpcMsg); -} - -static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - if (pMgmt->dnodeId == 0) { - dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); - taosWLockLatch(&pMgmt->latch); - pMgmt->dnodeId = pCfg->dnodeId; - pMgmt->clusterId = pCfg->clusterId; - dmWriteFile(pDnode); - taosWUnLockLatch(&pMgmt->latch); - } -} - -static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - - if (pRsp->code != TSDB_CODE_SUCCESS) { - if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) { - dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId); - pMgmt->dropped = 1; - dmWriteFile(pDnode); - } - } else { - SStatusRsp statusRsp = {0}; - if (pRsp->pCont != NULL && pRsp->contLen != 0 && - tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { - pMgmt->dver = statusRsp.dver; - dndUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg); - dmUpdateDnodeEps(pDnode, statusRsp.pDnodeEps); - } - taosArrayDestroy(statusRsp.pDnodeEps); - } - - pMgmt->statusSent = 0; -} - -static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pReq) { dError("auth rsp is received, but not supported yet"); } - -static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pReq) { - dError("grant rsp is received, but not supported yet"); -} - -static int32_t dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pReq) { - dError("config req is received, but not supported yet"); - SDCfgDnodeReq *pCfg = pReq->pCont; - return TSDB_CODE_OPS_NOT_SUPPORT; -} - -void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { - dDebug("startup req is received"); - - SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); - dndGetStartup(pDnode, pStartup); - - dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); - - SRpcMsg rpcRsp = {.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq)}; - rpcSendResponse(&rpcRsp); -} - -void dmStopMgmt(SDnode *pDnode) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - dndCleanupWorker(&pMgmt->mgmtWorker); - dndCleanupWorker(&pMgmt->statusWorker); - - if (pMgmt->threadId != NULL) { - taosDestoryThread(pMgmt->threadId); - pMgmt->threadId = NULL; - } -} - -void dmCleanupMgmt(SDnode *pDnode) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - taosWLockLatch(&pMgmt->latch); - - if (pMgmt->pDnodeEps != NULL) { - taosArrayDestroy(pMgmt->pDnodeEps); - pMgmt->pDnodeEps = NULL; - } - - if (pMgmt->dnodeHash != NULL) { - taosHashCleanup(pMgmt->dnodeHash); - pMgmt->dnodeHash = NULL; - } - - if (pMgmt->file != NULL) { - free(pMgmt->file); - pMgmt->file = NULL; - } - - taosWUnLockLatch(&pMgmt->latch); - dInfo("dnode-mgmt is cleaned up"); -} - -void dmProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SDnodeMgmt *pMgmt = &pDnode->dmgmt; - - if (pEpSet && pEpSet->numOfEps > 0 && pMsg->msgType == TDMT_MND_STATUS_RSP) { - dndUpdateMnodeEpSet(pDnode, pEpSet); - } - - SDnodeWorker *pWorker = &pMgmt->mgmtWorker; - if (pMsg->msgType == TDMT_MND_STATUS_RSP) { - pWorker = &pMgmt->statusWorker; - } - - if (dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)) != 0) { - if (pMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } -} - -#endif - - - -void dmStopMgmt(SDnode *pDnode) {} - -void dmCleanupMgmt(SDnode *pDnode){} - - -void dmSendStatusReq(SDnode *pDnode){} - - -void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {} - - -void dmProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq){} -void dmProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} - -int32_t dmInit(SMgmtWrapper *pWrapper) { - SDnodeMgmt *pMgmt = calloc(1, sizeof(SDnodeMgmt)); - - pMgmt->dnodeId = 0; - pMgmt->dropped = 0; - pMgmt->clusterId = 0; - pMgmt->path = pWrapper->path; - taosInitRWLatch(&pMgmt->latch); - - pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - if (pMgmt->dnodeHash == NULL) { - dError("node:%s, failed to init dnode hash", pWrapper->name); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - if (dmReadFile(pMgmt) != 0) { - dError("node:%s, failed to read file since %s", pWrapper->name, terrstr()); - return -1; - } - - if (pMgmt->dropped) { - dError("node:%s, will not start since its already dropped", pWrapper->name); - return -1; - } - - if (dmStartWorker(pMgmt) != 0) { - dError("node:%s, failed to start worker since %s", pWrapper->name, terrstr()); - return -1; - } - - dInfo("dnode-mgmt is initialized"); - return 0; - - // dndSetStatus(pDnode, DND_STAT_RUNNING); - // dmSendStatusReq(pDnode); - // dndReportStartup(pDnode, "TDengine", "initialized successfully"); - -#if 0 - if (dndInitTrans(pDnode) != 0) { - dError("failed to init transport since %s", terrstr()); - return -1; - } - - SDiskCfg dCfg = {0}; - tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN); - dCfg.level = 0; - dCfg.primary = 1; - SDiskCfg *pDisks = pDnode->cfg.pDisks; - int32_t numOfDisks = pDnode->cfg.numOfDisks; - if (numOfDisks <= 0 || pDisks == NULL) { - pDisks = &dCfg; - numOfDisks = 1; - } - - pDnode->pTfs = tfsOpen(pDisks, numOfDisks); - if (pDnode->pTfs == NULL) { - dError("failed to init tfs since %s", terrstr()); - return -1; - } -#endif -} - -void dmCleanup(SMgmtWrapper *pWrapper) { - SDnodeMgmt *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return; - - dmStopWorker(pMgmt); - - taosWLockLatch(&pMgmt->latch); - - if (pMgmt->pDnodeEps != NULL) { - taosArrayDestroy(pMgmt->pDnodeEps); - pMgmt->pDnodeEps = NULL; - } - - if (pMgmt->dnodeHash != NULL) { - taosHashCleanup(pMgmt->dnodeHash); - pMgmt->dnodeHash = NULL; - } - - taosWUnLockLatch(&pMgmt->latch); - dInfo("dnode-mgmt is cleaned up"); -} - -bool dmRequire(SMgmtWrapper *pWrapper) { return true; } diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 0fcc05338a368bda1eb635d0a5680c78565b9952..81215ae729acc869b93c95be0d94e3306a88aee1 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -16,7 +16,12 @@ #define _DEFAULT_SOURCE #include "dmWorker.h" #include "dmHandle.h" -#include "dndWorker.h" + +#include "bmInt.h" +#include "mmInt.h" +#include "qmInt.h" +#include "smInt.h" +#include "vmInt.h" static void *dnodeThreadRoutine(void *param) { SDnodeMgmt *pMgmt = param; @@ -37,7 +42,7 @@ static void *dnodeThreadRoutine(void *param) { float statusInterval = (curTime - lastStatusTime) / 1000.0f; if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) { - dmSendStatusReq(pDnode); + dmSendStatusReq(pMgmt); lastStatusTime = curTime; } @@ -52,61 +57,60 @@ static void *dnodeThreadRoutine(void *param) { static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { int32_t code = 0; -#if 0 switch (pMsg->msgType) { case TDMT_DND_CREATE_MNODE: - code = mmProcessCreateMnodeReq(pDnode, pMsg); + code = mmProcessCreateReq(pDnode, pMsg); break; case TDMT_DND_ALTER_MNODE: - code = mmProcessAlterMnodeReq(pDnode, pMsg); + code = mmProcessAlterReq(pDnode, pMsg); break; case TDMT_DND_DROP_MNODE: - code = mmProcessDropMnodeReq(pDnode, pMsg); + code = mmProcessDropReq(pDnode, pMsg); break; case TDMT_DND_CREATE_QNODE: - code = dndProcessCreateQnodeReq(pDnode, pMsg); + code = qmProcessCreateReq(pDnode, pMsg); break; case TDMT_DND_DROP_QNODE: - code = dndProcessDropQnodeReq(pDnode, pMsg); + code = qmProcessDropReq(pDnode, pMsg); break; case TDMT_DND_CREATE_SNODE: - code = dndProcessCreateSnodeReq(pDnode, pMsg); + code = smProcessCreateReq(pDnode, pMsg); break; case TDMT_DND_DROP_SNODE: - code = dndProcessDropSnodeReq(pDnode, pMsg); + code = smProcessDropReq(pDnode, pMsg); break; case TDMT_DND_CREATE_BNODE: - code = dndProcessCreateBnodeReq(pDnode, pMsg); + code = bmProcessCreateReq(pDnode, pMsg); break; case TDMT_DND_DROP_BNODE: - code = dndProcessDropBnodeReq(pDnode, pMsg); + code = bmProcessDropReq(pDnode, pMsg); break; case TDMT_DND_CONFIG_DNODE: - code = dndProcessConfigDnodeReq(pDnode, pMsg); + code = dmProcessConfigReq(pDnode, pMsg); break; case TDMT_MND_STATUS_RSP: - dndProcessStatusRsp(pDnode, pMsg); + dmProcessStatusRsp(pDnode, pMsg); break; case TDMT_MND_AUTH_RSP: - dndProcessAuthRsp(pDnode, pMsg); + dmProcessAuthRsp(pDnode, pMsg); break; case TDMT_MND_GRANT_RSP: - dndProcessGrantRsp(pDnode, pMsg); + dmProcessGrantRsp(pDnode, pMsg); break; case TDMT_DND_CREATE_VNODE: - code = dndProcessCreateVnodeReq(pDnode, pMsg); + code = vmProcessCreateVnodeReq(pDnode, pMsg); break; case TDMT_DND_ALTER_VNODE: - code = dndProcessAlterVnodeReq(pDnode, pMsg); + code = vmProcessAlterVnodeReq(pDnode, pMsg); break; case TDMT_DND_DROP_VNODE: - code = dndProcessDropVnodeReq(pDnode, pMsg); + code = vmProcessDropVnodeReq(pDnode, pMsg); break; case TDMT_DND_SYNC_VNODE: - code = dndProcessSyncVnodeReq(pDnode, pMsg); + code = vmProcessSyncVnodeReq(pDnode, pMsg); break; case TDMT_DND_COMPACT_VNODE: - code = dndProcessCompactVnodeReq(pDnode, pMsg); + code = vmProcessCompactVnodeReq(pDnode, pMsg); break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; @@ -114,7 +118,7 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { dError("RPC %p, dnode msg:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); break; } -#endif + if (pMsg->msgType & 1u) { if (code != 0) code = terrno; SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle}; @@ -156,3 +160,21 @@ void dmStopWorker(SDnodeMgmt *pMgmt) { pMgmt->threadId = NULL; } } + +void dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SDnodeMgmt *pMgmt = pWrapper->pMgmt; + + SDnodeWorker *pWorker = &pMgmt->mgmtWorker; + if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) { + pWorker = &pMgmt->statusWorker; + } + + if (dndWriteMsgToWorker(pWorker, pMsg, sizeof(SNodeMsg)) != 0) { + if (pMsg->rpcMsg.msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .code = TSDB_CODE_OUT_OF_MEMORY}; + rpcSendResponse(&rsp); + } + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); + } +} \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/inc/mmHandle.h b/source/dnode/mgmt/mnode/inc/mmHandle.h index 9bad5ffb788ad88e9d5336fc6413cb6aff9b42f4..df66b1b80c75c70aec39ef6ec9f68b3246952cfb 100644 --- a/source/dnode/mgmt/mnode/inc/mmHandle.h +++ b/source/dnode/mgmt/mnode/inc/mmHandle.h @@ -25,6 +25,10 @@ extern "C" { void mmInitMsgHandles(SMgmtWrapper *pWrapper); SMsgHandle mmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); +int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); + int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, SMonGrantInfo *pGrantInfo); diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index 3252a060027f2bbc7e339216d56410c6e32b7736..60e3f2211ab7ecdc69fd141c2c0065b2664dd2f1 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -45,13 +45,15 @@ typedef struct SMnodeMgmt { // interface -SMgmtFp mmGetMgmtFp(); +void mmGetMgmtFp(SMgmtWrapper *pMgmt); int32_t mmInit(SDnode *pDnode); void mmCleanup(SDnode *pDnode); -int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t mmProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t mmProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); + +// mmHandle.h +int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, diff --git a/source/dnode/mgmt/mnode/inc/mmWorker.h b/source/dnode/mgmt/mnode/inc/mmWorker.h index 0ffe109cbd2a58fd8ab261003b7858842e77ab2e..553786f444d735cb0d3499ffe51f274d9f1078a7 100644 --- a/source/dnode/mgmt/mnode/inc/mmWorker.h +++ b/source/dnode/mgmt/mnode/inc/mmWorker.h @@ -31,9 +31,9 @@ int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); -void mmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void mmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void mmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +void mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +void mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +void mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mnode/src/mmHandle.c b/source/dnode/mgmt/mnode/src/mmHandle.c index 49688d8a3e335266388d151ed9d97a113a89dbec..5ea3480027f1253a0942550d2a7a2ea83f4c2bae 100644 --- a/source/dnode/mgmt/mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mnode/src/mmHandle.c @@ -18,9 +18,9 @@ #include "mmWorker.h" #if 0 -#include "dmMgmt.h" +#include "dmInt.h" -int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { SDCreateMnodeReq createReq = {0}; if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -52,7 +52,7 @@ int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return mmOpen(pDnode, &option); } -int32_t mmProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pReq) { SDAlterMnodeReq alterReq = {0}; if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -86,7 +86,7 @@ int32_t mmProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return code; } -int32_t mmProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { SDDropMnodeReq dropReq = {0}; if (tDeserializeSMCreateDropMnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -142,6 +142,10 @@ int32_t mmGetUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char #endif +int32_t mmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} +int32_t mmProcessAlterReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} +int32_t mmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} + static void mmSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index fdd5919e0182b20a787ca53d008b4f2bdbdb06fa..b08c11cca6aa09104261cb4265befa68f87bae63 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -19,14 +19,16 @@ bool mmRequireNode(SMgmtWrapper *pWrapper) { return false; } - -SMgmtFp mmGetMgmtFp() { +void mmGetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = NULL; mgmtFp.closeFp = NULL; mgmtFp.requiredFp = mmRequireNode; mgmtFp.getMsgHandleFp = mmGetMsgHandle; - return mgmtFp; + + mmInitMsgHandles(pWrapper); + pWrapper->name = "mnode"; + pWrapper->fp = mgmtFp; } int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey) { diff --git a/source/dnode/mgmt/mnode/src/mmMgmt.c b/source/dnode/mgmt/mnode/src/mmMgmt.c index 222f1255159e93a86f3a3a98433c7bb107b064eb..c7e3f39814287223ea34516d90f61fa2faa73e52 100644 --- a/source/dnode/mgmt/mnode/src/mmMgmt.c +++ b/source/dnode/mgmt/mnode/src/mmMgmt.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "mmInt.h" -#include "dmMgmt.h" +#include "dmInt.h" #include "dndTransport.h" #if 0 diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 7cb7a4f9e26ecc9ad2c888ba9b1ed5b3b880ea21..42b0dc52925ee4cb4eef884d94505d9c00f6d3a4 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "mmInt.h" -#include "dmMgmt.h" +#include "dmInt.h" #include "dndTransport.h" #include "dndWorker.h" @@ -268,6 +268,6 @@ static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) { #endif -void mmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {} -void mmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {} -void mmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {} \ No newline at end of file +void mmProcessWriteMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {} +void mmProcessSyncMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {} +void mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {} \ No newline at end of file diff --git a/source/dnode/mgmt/qnode/inc/qmHandle.h b/source/dnode/mgmt/qnode/inc/qmHandle.h index 7dd761e917ea6d22e55cb05e2861ee71511d0c3f..3bd434772fa2c4e405550ebdf4b54de39572ee1e 100644 --- a/source/dnode/mgmt/qnode/inc/qmHandle.h +++ b/source/dnode/mgmt/qnode/inc/qmHandle.h @@ -25,6 +25,9 @@ extern "C" { void qmInitMsgHandles(SMgmtWrapper *pWrapper); SMsgHandle qmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); +int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h index e1b275999f0b15954951fe7b9fd05de4c9c7a6cf..43a3cae11b61889a3dc22c7c610ba68a6f4f942e 100644 --- a/source/dnode/mgmt/qnode/inc/qmInt.h +++ b/source/dnode/mgmt/qnode/inc/qmInt.h @@ -37,15 +37,17 @@ typedef struct SQnodeMgmt { bool singleProc; } SQnodeMgmt; -SMgmtFp qmGetMgmtFp(); +void qmGetMgmtFp(SMgmtWrapper *pMgmt); int32_t dndInitQnode(SDnode *pDnode); void dndCleanupQnode(SDnode *pDnode); void dndProcessQnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessQnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); + +// qmHandle.h +int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/qnode/src/qmHandle.c b/source/dnode/mgmt/qnode/src/qmHandle.c index d7e55e1fa524be6a82782423c333d0b6380027a7..0471d3602e4514f8a7beeb7ed11e2af0597ef105 100644 --- a/source/dnode/mgmt/qnode/src/qmHandle.c +++ b/source/dnode/mgmt/qnode/src/qmHandle.c @@ -17,6 +17,9 @@ #include "qmHandle.h" #include "qmWorker.h" +int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} +int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg){return 0;} + void qmInitMsgHandles(SMgmtWrapper *pWrapper) { } diff --git a/source/dnode/mgmt/qnode/src/qmInt.c b/source/dnode/mgmt/qnode/src/qmInt.c index 3b2d3cadbdebe8053fb2a18838d2ee679d3fb0f7..df0f0388a5c38a0119121a1301b2ad3bb6bcbab5 100644 --- a/source/dnode/mgmt/qnode/src/qmInt.c +++ b/source/dnode/mgmt/qnode/src/qmInt.c @@ -19,11 +19,14 @@ bool qmRequireNode(SMgmtWrapper *pWrapper) { return false; } -SMgmtFp qmGetMgmtFp() { +void qmGetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = NULL; mgmtFp.closeFp = NULL; mgmtFp.requiredFp = qmRequireNode; mgmtFp.getMsgHandleFp = qmGetMsgHandle; - return mgmtFp; + + // qmInitMsgHandles(pWrapper); + pWrapper->name = "qnode"; + pWrapper->fp = mgmtFp; } diff --git a/source/dnode/mgmt/qnode/src/qmMgmt.c b/source/dnode/mgmt/qnode/src/qmMgmt.c index b5ac0c816f3eb1e0cc8db294391a64dd14d36c1c..b5147cdf938a91c360e972a07b784c2e256cbb42 100644 --- a/source/dnode/mgmt/qnode/src/qmMgmt.c +++ b/source/dnode/mgmt/qnode/src/qmMgmt.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE // #include "dndQnode.h" -// #include "dmMgmt.h" +// #include "dmInt.h" // #include "dndTransport.h" // #include "dndWorker.h" @@ -267,7 +267,7 @@ static int32_t dndDropQnode(SDnode *pDnode) { return 0; } -int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { SDCreateQnodeReq createReq = {0}; if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -283,7 +283,7 @@ int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pReq) { } } -int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { SDDropQnodeReq dropReq = {0}; if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; diff --git a/source/dnode/mgmt/snode/inc/smInt.h b/source/dnode/mgmt/snode/inc/smInt.h index 59e104aeb36b08b47dfa59063090e1e7d242a6a5..95177b7edf38270d3b573175573550adcd22713d 100644 --- a/source/dnode/mgmt/snode/inc/smInt.h +++ b/source/dnode/mgmt/snode/inc/smInt.h @@ -37,14 +37,14 @@ typedef struct SSnodeMgmt { } SSnodeMgmt; -SMgmtFp smGetMgmtFp(); +void smGetMgmtFp(SMgmtWrapper *pMgmt); int32_t dndInitSnode(SDnode *pDnode); void dndCleanupSnode(SDnode *pDnode); void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/snode/src/smHandle.c b/source/dnode/mgmt/snode/src/smHandle.c index 82f144f935473a6f03f2eaead75eb38e6674d0c3..e785d207c4b53556e6da0517d2519e2a2170f979 100644 --- a/source/dnode/mgmt/snode/src/smHandle.c +++ b/source/dnode/mgmt/snode/src/smHandle.c @@ -17,6 +17,9 @@ #include "smHandle.h" #include "smWorker.h" +int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} +int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {return 0;} + void smInitMsgHandles(SMgmtWrapper *pWrapper) { } diff --git a/source/dnode/mgmt/snode/src/smInt.c b/source/dnode/mgmt/snode/src/smInt.c index 3a7498edbd7ee41d256c94007d3242610cb24b38..1bb3572163e1e9aeeefa080b9d6acf2d6eb98e7e 100644 --- a/source/dnode/mgmt/snode/src/smInt.c +++ b/source/dnode/mgmt/snode/src/smInt.c @@ -20,11 +20,14 @@ bool smRequireNode(SMgmtWrapper *pWrapper) { return false; } -SMgmtFp smGetMgmtFp() { +void smGetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = NULL; mgmtFp.closeFp = NULL; mgmtFp.requiredFp = smRequireNode; mgmtFp.getMsgHandleFp = smGetMsgHandle; - return mgmtFp; + + // smInitMsgHandles(pWrapper); + pWrapper->name = "snode"; + pWrapper->fp = mgmtFp; } diff --git a/source/dnode/mgmt/snode/src/smMgmt.c b/source/dnode/mgmt/snode/src/smMgmt.c index 8baa017f7c84869f1fa085385730570392137093..9e3d711dc571c3bb910057e10200d7c0a200717a 100644 --- a/source/dnode/mgmt/snode/src/smMgmt.c +++ b/source/dnode/mgmt/snode/src/smMgmt.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE // #include "dndSnode.h" -// #include "dmMgmt.h" +// #include "dmInt.h" // #include "dndTransport.h" // #include "dndWorker.h" @@ -261,7 +261,7 @@ static int32_t dndDropSnode(SDnode *pDnode) { return 0; } -int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t smProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { SDCreateSnodeReq createReq = {0}; if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -277,7 +277,7 @@ int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pReq) { } } -int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t smProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { SDDropSnodeReq dropReq = {0}; if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; diff --git a/source/dnode/mgmt/vnode/inc/vmHandle.h b/source/dnode/mgmt/vnode/inc/vmHandle.h index 0cb0bae87fcab1f9fd0ce7a9799ff7062880450c..7401c5d8f1c2aade950bc0a23ec8043384a43844 100644 --- a/source/dnode/mgmt/vnode/inc/vmHandle.h +++ b/source/dnode/mgmt/vnode/inc/vmHandle.h @@ -25,6 +25,12 @@ extern "C" { void vmInitMsgHandles(SMgmtWrapper *pWrapper); SMsgHandle vmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex); +int32_t vmProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t vmProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t vmProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t vmProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t vmProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index b5e44361368661c77067445d135cd187babf0531..42fa1ccb808dec5781bdcb9418dec62f52de70eb 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -48,22 +48,22 @@ typedef struct SVnodesMgmt { bool singleProc; } SVnodesMgmt; -SMgmtFp vmGetMgmtFp() ; +void vmGetMgmtFp(SMgmtWrapper *pMgmt) ; int32_t dndInitVnodes(SDnode *pDnode); void dndCleanupVnodes(SDnode *pDnode); -void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads); +void vmGetVnodeLoads(SDnode *pDnode, SArray *pLoads); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq); -int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq); -int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t vmProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t vmProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t vmProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq); -int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq); -int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t vmProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t vmProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq); int32_t vmGetTfsMonitorInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo); void vmGetVndMonitorInfo(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo); diff --git a/source/dnode/mgmt/vnode/inc/vmWorker.h b/source/dnode/mgmt/vnode/inc/vmWorker.h index 50ca076e10ede1edc15a93ebfa61095049b1176a..b13d2412c0b619751fb4024e7516597558ee874b 100644 --- a/source/dnode/mgmt/vnode/inc/vmWorker.h +++ b/source/dnode/mgmt/vnode/inc/vmWorker.h @@ -31,10 +31,10 @@ int32_t vmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); void vmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); void vmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); -void vmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void vmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void vmProcessQueryMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void vmProcessFetchMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +void vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +void vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +void vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +void vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/vnode/src/vmHandle.c b/source/dnode/mgmt/vnode/src/vmHandle.c index 10c9db869054c1c0c7f9d7be3cb4a6ecf2194366..15f1aa472952fe223e2ab22cc69aa5428de37bde 100644 --- a/source/dnode/mgmt/vnode/src/vmHandle.c +++ b/source/dnode/mgmt/vnode/src/vmHandle.c @@ -17,6 +17,14 @@ #include "vmHandle.h" #include "vmWorker.h" + +int32_t vmProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;} +int32_t vmProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;} +int32_t vmProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;} +int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;} +int32_t vmProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;} +int32_t vmProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq){return 0;} + static void vmSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 587ba30134b35d29a635b65c871f6648b3850328..2e747b86b672d6d9e3e39ad00c0d52095e4bf0bb 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -19,6 +19,23 @@ #include "vmMgmt.h" static int32_t vmInit(SMgmtWrapper *pWrapper) { + // SDiskCfg dCfg = {0}; + // tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN); + // dCfg.level = 0; + // dCfg.primary = 1; + // SDiskCfg *pDisks = pDnode->cfg.pDisks; + // int32_t numOfDisks = pDnode->cfg.numOfDisks; + // if (numOfDisks <= 0 || pDisks == NULL) { + // pDisks = &dCfg; + // numOfDisks = 1; + // } + + // pDnode->pTfs = tfsOpen(pDisks, numOfDisks); + // if (pDnode->pTfs == NULL) { + // dError("failed to init tfs since %s", terrstr()); + // return -1; + // } + SVnodeOpt vnodeOpt = {0}; vnodeOpt.nthreads = tsNumOfCommitThreads; vnodeOpt.putReqToVQueryQFp = dndPutReqToVQueryQ; @@ -45,11 +62,14 @@ static void vmCleanup(SMgmtWrapper *pWrapper) { static bool vmRequire(SMgmtWrapper *pWrapper) { return false; } -SMgmtFp vmGetMgmtFp() { +void vmGetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = vmInit; mgmtFp.closeFp = vmCleanup; mgmtFp.requiredFp = vmRequire; mgmtFp.getMsgHandleFp = vmGetMsgHandle; - return mgmtFp; + + vmInitMsgHandles(pWrapper); + pWrapper->name = "vnodes"; + pWrapper->fp = mgmtFp; } diff --git a/source/dnode/mgmt/vnode/src/vmMgmt.c b/source/dnode/mgmt/vnode/src/vmMgmt.c index ac50a3a1b347e9956307ec86490893bbc37bc9e4..510b0b719461817cb81592d180edb79e0ab3b5b5 100644 --- a/source/dnode/mgmt/vnode/src/vmMgmt.c +++ b/source/dnode/mgmt/vnode/src/vmMgmt.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE #include "vmMgmt.h" -#include "dmMgmt.h" +#include "dmInt.h" #include "dndTransport.h" // #include "sync.h" @@ -539,7 +539,7 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWra pCfg->vgVersion = pCreate->vgVersion; } -int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t vmProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { SCreateVnodeReq createReq = {0}; if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -597,7 +597,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return 0; } -int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t vmProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { SAlterVnodeReq alterReq = {0}; if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -638,7 +638,7 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return code; } -int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t vmProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { SDropVnodeReq dropReq = {0}; if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -668,7 +668,7 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return 0; } -int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t vmProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { SSyncVnodeReq syncReq = {0}; tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &syncReq); @@ -691,7 +691,7 @@ int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return 0; } -int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { +int32_t vmProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { SCompactVnodeReq compatcReq = {0}; tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &compatcReq); @@ -979,7 +979,7 @@ void dndCleanupVnodes(SDnode *pDnode) { dInfo("dnode-vnodes is cleaned up"); } -void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { +void vmGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesStat *pStat = &pMgmt->stat; int32_t totalVnodes = 0; @@ -1078,4 +1078,6 @@ void vmGetVndMonitorInfo(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo) { pInfo->errors = tsNumOfErrorLogs; pInfo->vnodes_num = pStat->totalVnodes; pInfo->masters = pStat->masterNum; -} \ No newline at end of file +} + +void vmGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {} \ No newline at end of file diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 4518d98487cc945090e67920bf9e77c803b74e0a..a7111e743cc27ebb150772f8d7bcdb6ba4460437 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "vmWorker.h" -void vmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} -void vmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} -void vmProcessQueryMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} -void vmProcessFetchMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} \ No newline at end of file +void vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} +void vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} +void vmProcessQueryMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} +void vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} \ No newline at end of file