diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index d1e27a3ef17b7c57dd922e50773f9cee902dc371..5630872318a83ee4b59159af12e18fc7d7a49281 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -128,7 +128,9 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { } void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { - rpcSendRequest(tsDnodeMClientRpc, &tsDnodeMnodeIpList, rpcMsg); + if (tsDnodeMClientRpc) { + rpcSendRequest(tsDnodeMClientRpc, &tsDnodeMnodeIpList, rpcMsg); + } } static bool dnodeReadMnodeIpList() { diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 3dde3f8d5073d601b7d744e1ccb9fe91824c21c1..ee2143ff6d1d8c2ba2dddcb173506a27f4603530 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -178,9 +178,9 @@ static void dnodeCleanUpSystem() { tclearModuleStatus(TSDB_MOD_MGMT); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); dnodeCleanupShell(); - dnodeCleanupMClient(); dnodeCleanupMnode(); dnodeCleanupMgmt(); + dnodeCleanupMClient(); dnodeCleanupWrite(); dnodeCleanupRead(); dnodeCleanUpModules(); diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index ffb1ca785c5692daefd77cab4efb8ab16b8015ac..30b121c54b493a0921775ae91c1f322b629b2e7b 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -24,6 +24,7 @@ extern "C" { int32_t mgmtInitDnodes(); void mgmtCleanUpDnodes(); int32_t mgmtGetDnodesNum(); +void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode); SDnodeObj* mgmtGetDnode(int32_t dnodeId); SDnodeObj* mgmtGetDnodeByIp(uint32_t ip); diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index cb4857fa2c9bf424c8989fd6aa79f44d88d5dff7..7d06329a465a825c412c617660375842ed541494 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -18,47 +18,37 @@ #include "mgmtBalance.h" #include "mgmtDnode.h" -int32_t (*mgmtInitBalanceFp)() = NULL; -void (*mgmtCleanupBalanceFp)() = NULL; -void (*mgmtStartBalanceTimerFp)(int32_t afterMs) = NULL; -int32_t (*mgmtAllocVnodesFp)(SVgObj *pVgroup) = NULL; - -int32_t mgmtInitBalance() { - if (mgmtInitBalanceFp) { - return (*mgmtInitBalanceFp)(); - } else { - return 0; - } -} - -void mgmtCleanupBalance() { - if (mgmtCleanupBalanceFp) { - (*mgmtCleanupBalanceFp)(); - } -} - -void mgmtStartBalanceTimer(int32_t afterMs) { - if (mgmtStartBalanceTimerFp) { - (*mgmtStartBalanceTimerFp)(afterMs); - } -} +int32_t mgmtInitBalance() { return 0; } +void mgmtCleanupBalance() {} +void mgmtStartBalanceTimer(int32_t afterMs) {} int32_t mgmtAllocVnodes(SVgObj *pVgroup) { - if (mgmtAllocVnodesFp) { - return (*mgmtAllocVnodesFp)(pVgroup); + void * pNode = NULL; + SDnodeObj *pDnode = NULL; + SDnodeObj *pSelDnode = NULL; + float vnodeUsage = 1.0; + + while (1) { + pNode = mgmtGetNextDnode(pNode, &pDnode); + if (pDnode == NULL) break; + if (pDnode->numOfTotalVnodes <= 0) continue; + if (pDnode->openVnodes == pDnode->numOfTotalVnodes) continue; + + float usage = (float)pDnode->openVnodes / pDnode->numOfTotalVnodes; + if (usage <= vnodeUsage) { + pSelDnode = pDnode; + vnodeUsage = usage; + } } - SDnodeObj *pDnode = mgmtGetDnode(1); - if (pDnode == NULL) return TSDB_CODE_OTHERS; - - if (pDnode->openVnodes < pDnode->numOfTotalVnodes) { - pVgroup->vnodeGid[0].dnodeId = pDnode->dnodeId; - pVgroup->vnodeGid[0].privateIp = pDnode->privateIp; - pVgroup->vnodeGid[0].publicIp = pDnode->publicIp; - mTrace("dnode:%d, alloc one vnode to vgroup", pDnode->dnodeId); - return TSDB_CODE_SUCCESS; - } else { - mError("dnode:%d, failed to alloc vnode to vgroup", pDnode->dnodeId); + if (pSelDnode == NULL) { + mError("failed to alloc vnode to vgroup", pDnode->dnodeId); return TSDB_CODE_NO_ENOUGH_DNODES; } + + pVgroup->vnodeGid[0].dnodeId = pSelDnode->dnodeId; + pVgroup->vnodeGid[0].privateIp = pSelDnode->privateIp; + pVgroup->vnodeGid[0].publicIp = pSelDnode->publicIp; + mTrace("dnode:%d, alloc one vnode to vgroup", pSelDnode->dnodeId); + return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 0cabaa813544f72ef990b02c097db4b27d1988c3..d7e2270107a79b061b32161aee3556da9f031d72 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -32,6 +32,7 @@ static void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg); extern int32_t clusterInit(); extern void clusterCleanUp(); extern int32_t clusterGetDnodesNum(); +extern void * clusterGetNextDnode(void *pNode, SDnodeObj **pDnode); extern SDnodeObj* clusterGetDnode(int32_t dnodeId); extern SDnodeObj* clusterGetDnodeByIp(uint32_t ip); static SDnodeObj tsDnodeObj = {0}; @@ -98,6 +99,19 @@ int32_t mgmtGetDnodesNum() { #endif } +void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode) { +#ifdef _CLUSTER + return (*clusterGetNextDnode)(pNode, pDnode); +#else + if (*pDnode == NULL) { + *pDnode = &tsDnodeObj; + } else { + *pDnode = NULL; + } + return *pDnode; +#endif +} + void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; if (mgmtCheckRedirect(pMsg->thandle)) return;