未验证 提交 f1a90648 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1489 from taosdata/refactor/cluster

[TD-17] fix error while alloc vnodes
...@@ -128,7 +128,9 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { ...@@ -128,7 +128,9 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
} }
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
if (tsDnodeMClientRpc) {
rpcSendRequest(tsDnodeMClientRpc, &tsDnodeMnodeIpList, rpcMsg); rpcSendRequest(tsDnodeMClientRpc, &tsDnodeMnodeIpList, rpcMsg);
}
} }
static bool dnodeReadMnodeIpList() { static bool dnodeReadMnodeIpList() {
......
...@@ -178,9 +178,9 @@ static void dnodeCleanUpSystem() { ...@@ -178,9 +178,9 @@ static void dnodeCleanUpSystem() {
tclearModuleStatus(TSDB_MOD_MGMT); tclearModuleStatus(TSDB_MOD_MGMT);
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
dnodeCleanupShell(); dnodeCleanupShell();
dnodeCleanupMClient();
dnodeCleanupMnode(); dnodeCleanupMnode();
dnodeCleanupMgmt(); dnodeCleanupMgmt();
dnodeCleanupMClient();
dnodeCleanupWrite(); dnodeCleanupWrite();
dnodeCleanupRead(); dnodeCleanupRead();
dnodeCleanUpModules(); dnodeCleanUpModules();
......
...@@ -24,6 +24,7 @@ extern "C" { ...@@ -24,6 +24,7 @@ extern "C" {
int32_t mgmtInitDnodes(); int32_t mgmtInitDnodes();
void mgmtCleanUpDnodes(); void mgmtCleanUpDnodes();
int32_t mgmtGetDnodesNum(); int32_t mgmtGetDnodesNum();
void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode);
SDnodeObj* mgmtGetDnode(int32_t dnodeId); SDnodeObj* mgmtGetDnode(int32_t dnodeId);
SDnodeObj* mgmtGetDnodeByIp(uint32_t ip); SDnodeObj* mgmtGetDnodeByIp(uint32_t ip);
......
...@@ -18,47 +18,37 @@ ...@@ -18,47 +18,37 @@
#include "mgmtBalance.h" #include "mgmtBalance.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
int32_t (*mgmtInitBalanceFp)() = NULL; int32_t mgmtInitBalance() { return 0; }
void (*mgmtCleanupBalanceFp)() = NULL; void mgmtCleanupBalance() {}
void (*mgmtStartBalanceTimerFp)(int32_t afterMs) = NULL; void mgmtStartBalanceTimer(int32_t afterMs) {}
int32_t (*mgmtAllocVnodesFp)(SVgObj *pVgroup) = NULL;
int32_t mgmtInitBalance() { int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
if (mgmtInitBalanceFp) { void * pNode = NULL;
return (*mgmtInitBalanceFp)(); SDnodeObj *pDnode = NULL;
} else { SDnodeObj *pSelDnode = NULL;
return 0; float vnodeUsage = 1.0;
}
} while (1) {
pNode = mgmtGetNextDnode(pNode, &pDnode);
void mgmtCleanupBalance() { if (pDnode == NULL) break;
if (mgmtCleanupBalanceFp) { if (pDnode->numOfTotalVnodes <= 0) continue;
(*mgmtCleanupBalanceFp)(); if (pDnode->openVnodes == pDnode->numOfTotalVnodes) continue;
float usage = (float)pDnode->openVnodes / pDnode->numOfTotalVnodes;
if (usage <= vnodeUsage) {
pSelDnode = pDnode;
vnodeUsage = usage;
} }
}
void mgmtStartBalanceTimer(int32_t afterMs) {
if (mgmtStartBalanceTimerFp) {
(*mgmtStartBalanceTimerFp)(afterMs);
} }
}
int32_t mgmtAllocVnodes(SVgObj *pVgroup) { if (pSelDnode == NULL) {
if (mgmtAllocVnodesFp) { mError("failed to alloc vnode to vgroup", pDnode->dnodeId);
return (*mgmtAllocVnodesFp)(pVgroup); return TSDB_CODE_NO_ENOUGH_DNODES;
} }
SDnodeObj *pDnode = mgmtGetDnode(1); pVgroup->vnodeGid[0].dnodeId = pSelDnode->dnodeId;
if (pDnode == NULL) return TSDB_CODE_OTHERS; pVgroup->vnodeGid[0].privateIp = pSelDnode->privateIp;
pVgroup->vnodeGid[0].publicIp = pSelDnode->publicIp;
if (pDnode->openVnodes < pDnode->numOfTotalVnodes) { mTrace("dnode:%d, alloc one vnode to vgroup", pSelDnode->dnodeId);
pVgroup->vnodeGid[0].dnodeId = pDnode->dnodeId;
pVgroup->vnodeGid[0].privateIp = pDnode->privateIp;
pVgroup->vnodeGid[0].publicIp = pDnode->publicIp;
mTrace("dnode:%d, alloc one vnode to vgroup", pDnode->dnodeId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else {
mError("dnode:%d, failed to alloc vnode to vgroup", pDnode->dnodeId);
return TSDB_CODE_NO_ENOUGH_DNODES;
}
} }
...@@ -32,6 +32,7 @@ static void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg); ...@@ -32,6 +32,7 @@ static void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg);
extern int32_t clusterInit(); extern int32_t clusterInit();
extern void clusterCleanUp(); extern void clusterCleanUp();
extern int32_t clusterGetDnodesNum(); extern int32_t clusterGetDnodesNum();
extern void * clusterGetNextDnode(void *pNode, SDnodeObj **pDnode);
extern SDnodeObj* clusterGetDnode(int32_t dnodeId); extern SDnodeObj* clusterGetDnode(int32_t dnodeId);
extern SDnodeObj* clusterGetDnodeByIp(uint32_t ip); extern SDnodeObj* clusterGetDnodeByIp(uint32_t ip);
static SDnodeObj tsDnodeObj = {0}; static SDnodeObj tsDnodeObj = {0};
...@@ -98,6 +99,19 @@ int32_t mgmtGetDnodesNum() { ...@@ -98,6 +99,19 @@ int32_t mgmtGetDnodesNum() {
#endif #endif
} }
void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode) {
#ifdef _CLUSTER
return (*clusterGetNextDnode)(pNode, pDnode);
#else
if (*pDnode == NULL) {
*pDnode = &tsDnodeObj;
} else {
*pDnode = NULL;
}
return *pDnode;
#endif
}
void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) { void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mgmtCheckRedirect(pMsg->thandle)) return; if (mgmtCheckRedirect(pMsg->thandle)) return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册