diff --git a/packaging/cfg/taos.cfg b/packaging/cfg/taos.cfg index 7662d492805ca3a948f01b4a87e44bcaf01b79e5..ff4beea6e20a09ef5a670ab2c33e9a6e248ffcff 100644 --- a/packaging/cfg/taos.cfg +++ b/packaging/cfg/taos.cfg @@ -236,7 +236,7 @@ # httpDebugFlag 131 # debug flag for monitor -# monitorDebugFlag 131 +# monDebugFlag 131 # debug flag for query # qDebugflag 131 diff --git a/src/balance/inc/bnInt.h b/src/balance/inc/bnInt.h new file mode 100644 index 0000000000000000000000000000000000000000..e924776ff151a9e81fe1254cdd1f1def04e6fef3 --- /dev/null +++ b/src/balance/inc/bnInt.h @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_BALANCE_INT_H +#define TDENGINE_BALANCE_INT_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "mnodeInt.h" +#include "mnodeDef.h" +#include "mnodeDnode.h" + +typedef struct { + int32_t size; + int32_t maxSize; + SDnodeObj **list; +} SBnDnodes; + +typedef struct { + void * timer; + bool stop; + pthread_mutex_t mutex; + pthread_cond_t cond; + pthread_t thread; +} SBnThread; + +typedef struct { + pthread_mutex_t mutex; +} SBnMgmt; + +int32_t bnInit(); +void bnCleanUp(); +bool bnStart(); +void bnCheckStatus(); +void bnCheckModules(); + +extern SBnDnodes tsBnDnodes; +extern void *tsMnodeTmr; + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/balance/inc/bnScore.h b/src/balance/inc/bnScore.h new file mode 100644 index 0000000000000000000000000000000000000000..a28c4459dd444001674305e404bfb0d23ed2fc3b --- /dev/null +++ b/src/balance/inc/bnScore.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_BALANCE_SCORE_H +#define TDENGINE_BALANCE_SCORE_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "bnInt.h" + +void bnInitDnodes(); +void bnCleanupDnodes(); +void bnAccquireDnodes(); +void bnReleaseDnodes(); +float bnTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extraVnode); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/balance/inc/bnThread.h b/src/balance/inc/bnThread.h new file mode 100644 index 0000000000000000000000000000000000000000..8f54b66028c5ddc859861bb78e0d790f7d3e5507 --- /dev/null +++ b/src/balance/inc/bnThread.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_BALANCE_THREAD_H +#define TDENGINE_BALANCE_THREAD_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "bnInt.h" + +int32_t bnInitThread(); +void bnCleanupThread(); +void bnNotify(); +void bnStartTimer(int64_t mseconds); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/balance/src/balance.c b/src/balance/src/bnMain.c similarity index 50% rename from src/balance/src/balance.c rename to src/balance/src/bnMain.c index b172867929c5fe2850b665ad5c4987be0c4b0e26..383f98191313d3a40f0899787c0f421f2d741cef 100644 --- a/src/balance/src/balance.c +++ b/src/balance/src/bnMain.c @@ -15,17 +15,12 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "tutil.h" -#include "tbalance.h" #include "tsync.h" -#include "ttimer.h" #include "tglobal.h" -#include "tdataformat.h" #include "dnode.h" -#include "mnode.h" -#include "mnodeDef.h" -#include "mnodeInt.h" -#include "mnodeDnode.h" +#include "bnInt.h" +#include "bnScore.h" +#include "bnThread.h" #include "mnodeDb.h" #include "mnodeMnode.h" #include "mnodeSdb.h" @@ -33,36 +28,18 @@ #include "mnodeUser.h" #include "mnodeVgroup.h" -/* - * once sdb work as mater, then tsAccessSquence reset to zero - * increase tsAccessSquence every balance interval - */ -extern void * tsMnodeTmr; -static void * tsBalanceTimer = NULL; -static int32_t tsBalanceDnodeListSize = 0; -static SDnodeObj ** tsBalanceDnodeList = NULL; -static int32_t tsBalanceDnodeListMallocSize = 16; -static pthread_mutex_t tsBalanceMutex; - -static void balanceStartTimer(int64_t mseconds); -static void balanceInitDnodeList(); -static void balanceCleanupDnodeList(); -static void balanceAccquireDnodeList(); -static void balanceReleaseDnodeList(); -static void balanceMonitorDnodeModule(); -static float balanceTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extraVnode); -static int32_t balanceGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static int32_t balanceRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn); - -static void balanceLock() { - pthread_mutex_lock(&tsBalanceMutex); +static SBnMgmt tsBnMgmt;; +static void bnMonitorDnodeModule(); + +static void bnLock() { + pthread_mutex_lock(&tsBnMgmt.mutex); } -static void balanceUnLock() { - pthread_mutex_unlock(&tsBalanceMutex); +static void bnUnLock() { + pthread_mutex_unlock(&tsBnMgmt.mutex); } -static bool balanceCheckFree(SDnodeObj *pDnode) { +static bool bnCheckFree(SDnodeObj *pDnode) { if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) { mError("dnode:%d, status:%s not available", pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status)); return false; @@ -86,7 +63,7 @@ static bool balanceCheckFree(SDnodeObj *pDnode) { return true; } -static void balanceDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) { +static void bnDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) { mDebug("vgId:%d, dnode:%d is dropping", pVgroup->vgId, pVnodeGid->dnodeId); SDnodeObj *pDnode = mnodeGetDnode(pVnodeGid->dnodeId); @@ -111,27 +88,26 @@ static void balanceDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) { mnodeUpdateVgroup(pVgroup); } -static void balanceSwapVnodeGid(SVnodeGid *pVnodeGid1, SVnodeGid *pVnodeGid2) { +static void bnSwapVnodeGid(SVnodeGid *pVnodeGid1, SVnodeGid *pVnodeGid2) { // SVnodeGid tmp = *pVnodeGid1; // *pVnodeGid1 = *pVnodeGid2; // *pVnodeGid2 = tmp; } -int32_t balanceAllocVnodes(SVgObj *pVgroup) { +int32_t bnAllocVnodes(SVgObj *pVgroup) { static int32_t randIndex = 0; int32_t dnode = 0; int32_t vnodes = 0; - balanceLock(); - - balanceAccquireDnodeList(); + bnLock(); + bnAccquireDnodes(); mDebug("db:%s, try alloc %d vnodes to vgroup, dnodes total:%d, avail:%d", pVgroup->dbName, pVgroup->numOfVnodes, - mnodeGetDnodesNum(), tsBalanceDnodeListSize); + mnodeGetDnodesNum(), tsBnDnodes.size); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - for (; dnode < tsBalanceDnodeListSize; ++dnode) { - SDnodeObj *pDnode = tsBalanceDnodeList[dnode]; - if (balanceCheckFree(pDnode)) { + for (; dnode < tsBnDnodes.size; ++dnode) { + SDnodeObj *pDnode = tsBnDnodes.list[dnode]; + if (bnCheckFree(pDnode)) { SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i; pVnodeGid->dnodeId = pDnode->dnodeId; pVnodeGid->pDnode = pDnode; @@ -148,8 +124,8 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) { } if (vnodes != pVgroup->numOfVnodes) { - balanceReleaseDnodeList(); - balanceUnLock(); + bnReleaseDnodes(); + bnUnLock(); mDebug("db:%s, need vnodes:%d, but alloc:%d", pVgroup->dbName, pVgroup->numOfVnodes, vnodes); @@ -179,33 +155,33 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) { if (pVgroup->numOfVnodes == 1) { } else if (pVgroup->numOfVnodes == 2) { if (randIndex++ % 2 == 0) { - balanceSwapVnodeGid(pVgroup->vnodeGid, pVgroup->vnodeGid + 1); + bnSwapVnodeGid(pVgroup->vnodeGid, pVgroup->vnodeGid + 1); } } else { int32_t randVal = randIndex++ % 6; if (randVal == 1) { // 1, 0, 2 - balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1); + bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1); } else if (randVal == 2) { // 1, 2, 0 - balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1); - balanceSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2); + bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1); + bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2); } else if (randVal == 3) { // 2, 1, 0 - balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2); + bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2); } else if (randVal == 4) { // 2, 0, 1 - balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2); - balanceSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2); + bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2); + bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2); } if (randVal == 5) { // 0, 2, 1 - balanceSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2); + bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2); } else { } // 0, 1, 2 } - balanceReleaseDnodeList(); - balanceUnLock(); + bnReleaseDnodes(); + bnUnLock(); return TSDB_CODE_SUCCESS; } -static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) { +static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) { if (pVgroup->lbTime + 5 * tsStatusInterval > tsAccessSquence) { return false; } @@ -232,7 +208,7 @@ static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) { * desc: remove one vnode from vgroup * all vnodes in vgroup should in ready state, except the balancing one **/ -static int32_t balanceRemoveVnode(SVgObj *pVgroup) { +static int32_t bnRemoveVnode(SVgObj *pVgroup) { if (pVgroup->numOfVnodes <= 1) return -1; SVnodeGid *pRmVnode = NULL; @@ -274,17 +250,17 @@ static int32_t balanceRemoveVnode(SVgObj *pVgroup) { pSelVnode = pRmVnode; } - if (!balanceCheckVgroupReady(pVgroup, pSelVnode)) { + if (!bnCheckVgroupReady(pVgroup, pSelVnode)) { mDebug("vgId:%d, is not ready", pVgroup->vgId); return -1; } else { mDebug("vgId:%d, is ready, discard dnode:%d", pVgroup->vgId, pSelVnode->dnodeId); - balanceDiscardVnode(pVgroup, pSelVnode); + bnDiscardVnode(pVgroup, pSelVnode); return TSDB_CODE_SUCCESS; } } -static bool balanceCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) { +static bool bnCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SVnodeGid *pGid = &pVgroup->vnodeGid[i]; if (pGid->dnodeId == 0) break; @@ -299,13 +275,13 @@ static bool balanceCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) { /** * desc: add vnode to vgroup, find a new one if dest dnode is null **/ -static int32_t balanceAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { +static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { if (pDestDnode == NULL) { - for (int32_t i = 0; i < tsBalanceDnodeListSize; ++i) { - SDnodeObj *pDnode = tsBalanceDnodeList[i]; + for (int32_t i = 0; i < tsBnDnodes.size; ++i) { + SDnodeObj *pDnode = tsBnDnodes.list[i]; if (pDnode == pSrcDnode) continue; - if (balanceCheckDnodeInVgroup(pDnode, pVgroup)) continue; - if (!balanceCheckFree(pDnode)) continue; + if (bnCheckDnodeInVgroup(pDnode, pVgroup)) continue; + if (!bnCheckFree(pDnode)) continue; pDestDnode = pDnode; mDebug("vgId:%d, add vnode to dnode:%d", pVgroup->vgId, pDnode->dnodeId); @@ -333,25 +309,25 @@ static int32_t balanceAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj return TSDB_CODE_SUCCESS; } -static bool balanceMonitorBalance() { - if (tsBalanceDnodeListSize < 2) return false; +static bool bnMonitorBalance() { + if (tsBnDnodes.size < 2) return false; - for (int32_t src = tsBalanceDnodeListSize - 1; src >= 0; --src) { - SDnodeObj *pDnode = tsBalanceDnodeList[src]; - mDebug("%d-dnode:%d, state:%s, score:%.1f, numOfCores:%d, openVnodes:%d", tsBalanceDnodeListSize - src - 1, + for (int32_t src = tsBnDnodes.size - 1; src >= 0; --src) { + SDnodeObj *pDnode = tsBnDnodes.list[src]; + mDebug("%d-dnode:%d, state:%s, score:%.1f, numOfCores:%d, openVnodes:%d", tsBnDnodes.size - src - 1, pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status), pDnode->score, pDnode->numOfCores, pDnode->openVnodes); } - float scoresDiff = tsBalanceDnodeList[tsBalanceDnodeListSize - 1]->score - tsBalanceDnodeList[0]->score; + float scoresDiff = tsBnDnodes.list[tsBnDnodes.size - 1]->score - tsBnDnodes.list[0]->score; if (scoresDiff < 0.01) { - mDebug("all dnodes:%d is already balanced, scoresDiff:%f", tsBalanceDnodeListSize, scoresDiff); + mDebug("all dnodes:%d is already balanced, scoresDiff:%f", tsBnDnodes.size, scoresDiff); return false; } - for (int32_t src = tsBalanceDnodeListSize - 1; src > 0; --src) { - SDnodeObj *pSrcDnode = tsBalanceDnodeList[src]; - float srcScore = balanceTryCalcDnodeScore(pSrcDnode, -1); + for (int32_t src = tsBnDnodes.size - 1; src > 0; --src) { + SDnodeObj *pSrcDnode = tsBnDnodes.list[src]; + float srcScore = bnTryCalcDnodeScore(pSrcDnode, -1); if (tsEnableBalance == 0 && pSrcDnode->status != TAOS_DN_STATUS_DROPPING) { continue; } @@ -362,19 +338,19 @@ static bool balanceMonitorBalance() { pIter = mnodeGetNextVgroup(pIter, &pVgroup); if (pVgroup == NULL) break; - if (balanceCheckDnodeInVgroup(pSrcDnode, pVgroup)) { + if (bnCheckDnodeInVgroup(pSrcDnode, pVgroup)) { for (int32_t dest = 0; dest < src; dest++) { - SDnodeObj *pDestDnode = tsBalanceDnodeList[dest]; - if (balanceCheckDnodeInVgroup(pDestDnode, pVgroup)) continue; + SDnodeObj *pDestDnode = tsBnDnodes.list[dest]; + if (bnCheckDnodeInVgroup(pDestDnode, pVgroup)) continue; - float destScore = balanceTryCalcDnodeScore(pDestDnode, 1); + float destScore = bnTryCalcDnodeScore(pDestDnode, 1); if (srcScore + 0.0001 < destScore) continue; - if (!balanceCheckFree(pDestDnode)) continue; + if (!bnCheckFree(pDestDnode)) continue; mDebug("vgId:%d, balance from dnode:%d to dnode:%d, srcScore:%.1f:%.1f, destScore:%.1f:%.1f", pVgroup->vgId, pSrcDnode->dnodeId, pDestDnode->dnodeId, pSrcDnode->score, srcScore, pDestDnode->score, destScore); - balanceAddVnode(pVgroup, pSrcDnode, pDestDnode); + bnAddVnode(pVgroup, pSrcDnode, pDestDnode); mnodeDecVgroupRef(pVgroup); mnodeCancelGetNextVgroup(pIter); return true; @@ -392,7 +368,7 @@ static bool balanceMonitorBalance() { // 1. reset balanceAccessSquence to zero // 2. reset state of dnodes to offline // 3. reset lastAccess of dnodes to zero -void balanceReset() { +void bnReset() { void * pIter = NULL; SDnodeObj *pDnode = NULL; while (1) { @@ -413,7 +389,7 @@ void balanceReset() { tsAccessSquence = 0; } -static int32_t balanceMonitorVgroups() { +static int32_t bnMonitorVgroups() { void * pIter = NULL; SVgObj *pVgroup = NULL; bool hasUpdatingVgroup = false; @@ -429,11 +405,11 @@ static int32_t balanceMonitorVgroups() { if (vgReplica > dbReplica) { mInfo("vgId:%d, replica:%d numOfVnodes:%d, try remove one vnode", pVgroup->vgId, dbReplica, vgReplica); hasUpdatingVgroup = true; - code = balanceRemoveVnode(pVgroup); + code = bnRemoveVnode(pVgroup); } else if (vgReplica < dbReplica) { mInfo("vgId:%d, replica:%d numOfVnodes:%d, try add one vnode", pVgroup->vgId, dbReplica, vgReplica); hasUpdatingVgroup = true; - code = balanceAddVnode(pVgroup, NULL, NULL); + code = bnAddVnode(pVgroup, NULL, NULL); } mnodeDecVgroupRef(pVgroup); @@ -446,7 +422,7 @@ static int32_t balanceMonitorVgroups() { return hasUpdatingVgroup; } -static bool balanceMonitorDnodeDropping(SDnodeObj *pDnode) { +static bool bnMonitorDnodeDropping(SDnodeObj *pDnode) { mDebug("dnode:%d, in dropping state", pDnode->dnodeId); void * pIter = NULL; @@ -456,7 +432,7 @@ static bool balanceMonitorDnodeDropping(SDnodeObj *pDnode) { pIter = mnodeGetNextVgroup(pIter, &pVgroup); if (pVgroup == NULL) break; - hasThisDnode = balanceCheckDnodeInVgroup(pDnode, pVgroup); + hasThisDnode = bnCheckDnodeInVgroup(pDnode, pVgroup); mnodeDecVgroupRef(pVgroup); if (hasThisDnode) { @@ -474,7 +450,7 @@ static bool balanceMonitorDnodeDropping(SDnodeObj *pDnode) { return false; } -static bool balanceMontiorDropping() { +static bool bnMontiorDropping() { void *pIter = NULL; SDnodeObj *pDnode = NULL; @@ -499,7 +475,7 @@ static bool balanceMontiorDropping() { } if (pDnode->status == TAOS_DN_STATUS_DROPPING) { - bool ret = balanceMonitorDnodeDropping(pDnode); + bool ret = bnMonitorDnodeDropping(pDnode); mnodeDecDnodeRef(pDnode); mnodeCancelGetNextDnode(pIter); return ret; @@ -509,33 +485,31 @@ static bool balanceMontiorDropping() { return false; } -static bool balanceStart() { +bool bnStart() { if (!sdbIsMaster()) return false; - balanceLock(); + bnLock(); + bnAccquireDnodes(); - balanceAccquireDnodeList(); + bnMonitorDnodeModule(); - balanceMonitorDnodeModule(); - - bool updateSoon = balanceMontiorDropping(); + bool updateSoon = bnMontiorDropping(); if (!updateSoon) { - updateSoon = balanceMonitorVgroups(); + updateSoon = bnMonitorVgroups(); } if (!updateSoon) { - updateSoon = balanceMonitorBalance(); + updateSoon = bnMonitorBalance(); } - balanceReleaseDnodeList(); - - balanceUnLock(); + bnReleaseDnodes(); + bnUnLock(); return updateSoon; } -static void balanceSetVgroupOffline(SDnodeObj* pDnode) { +static void bnSetVgroupOffline(SDnodeObj* pDnode) { void *pIter = NULL; while (1) { SVgObj *pVgroup; @@ -551,7 +525,7 @@ static void balanceSetVgroupOffline(SDnodeObj* pDnode) { } } -static void balanceCheckDnodeAccess() { +void bnCheckStatus() { void * pIter = NULL; SDnodeObj *pDnode = NULL; @@ -564,84 +538,39 @@ static void balanceCheckDnodeAccess() { pDnode->offlineReason = TAOS_DN_OFF_STATUS_MSG_TIMEOUT; mInfo("dnode:%d, set to offline state, access seq:%d last seq:%d laststat:%d", pDnode->dnodeId, tsAccessSquence, pDnode->lastAccess, pDnode->status); - balanceSetVgroupOffline(pDnode); + bnSetVgroupOffline(pDnode); } } mnodeDecDnodeRef(pDnode); } } -static void balanceProcessBalanceTimer(void *handle, void *tmrId) { - if (!sdbIsMaster()) return; - - tsBalanceTimer = NULL; - tsAccessSquence ++; - - balanceCheckDnodeAccess(); - bool updateSoon = false; - - if (handle == NULL) { - if (tsAccessSquence % tsBalanceInterval == 0) { - mDebug("balance function is scheduled by timer"); - updateSoon = balanceStart(); - } - } else { - int64_t mseconds = (int64_t)handle; - mDebug("balance function is scheduled by event for %" PRId64 " mseconds arrived", mseconds); - updateSoon = balanceStart(); - } - - if (updateSoon) { - balanceStartTimer(1000); - } else { - taosTmrReset(balanceProcessBalanceTimer, tsStatusInterval * 1000, NULL, tsMnodeTmr, &tsBalanceTimer); - } -} - -static void balanceStartTimer(int64_t mseconds) { - taosTmrReset(balanceProcessBalanceTimer, mseconds, (void *)mseconds, tsMnodeTmr, &tsBalanceTimer); -} - -void balanceSyncNotify() { +void bnCheckModules() { if (sdbIsMaster()) { - balanceLock(); - balanceAccquireDnodeList(); - balanceMonitorDnodeModule(); - balanceReleaseDnodeList(); - balanceUnLock(); + bnLock(); + bnAccquireDnodes(); + bnMonitorDnodeModule(); + bnReleaseDnodes(); + bnUnLock(); } } -void balanceAsyncNotify() { - balanceStartTimer(500); -} - -int32_t balanceInit() { - mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_SCORES, balanceGetScoresMeta); - mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_SCORES, balanceRetrieveScores); - mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_SCORES, mnodeCancelGetNextDnode); - - pthread_mutex_init(&tsBalanceMutex, NULL); - balanceInitDnodeList(); - balanceStartTimer(2000); - mDebug("balance start fp:%p initialized", balanceProcessBalanceTimer); - - balanceReset(); +int32_t bnInit() { + pthread_mutex_init(&tsBnMgmt.mutex, NULL); + bnInitDnodes(); + bnInitThread(); + bnReset(); return 0; } -void balanceCleanUp() { - if (tsBalanceTimer != NULL) { - taosTmrStopA(&tsBalanceTimer); - pthread_mutex_destroy(&tsBalanceMutex); - tsBalanceTimer = NULL; - mDebug("stop balance timer"); - } - balanceCleanupDnodeList(); +void bnCleanUp() { + bnCleanupThread(); + bnCleanupDnodes(); + pthread_mutex_destroy(&tsBnMgmt.mutex); } -int32_t balanceDropDnode(SDnodeObj *pDnode) { +int32_t bnDropDnode(SDnodeObj *pDnode) { int32_t totalFreeVnodes = 0; void * pIter = NULL; SDnodeObj *pTempDnode = NULL; @@ -650,7 +579,7 @@ int32_t balanceDropDnode(SDnodeObj *pDnode) { pIter = mnodeGetNextDnode(pIter, &pTempDnode); if (pTempDnode == NULL) break; - if (pTempDnode != pDnode && balanceCheckFree(pTempDnode)) { + if (pTempDnode != pDnode && bnCheckFree(pTempDnode)) { totalFreeVnodes += (TSDB_MAX_VNODES - pTempDnode->openVnodes); } @@ -665,298 +594,17 @@ int32_t balanceDropDnode(SDnodeObj *pDnode) { pDnode->status = TAOS_DN_STATUS_DROPPING; mnodeUpdateDnode(pDnode); - balanceStartTimer(1100); + bnStartTimer(1100); return TSDB_CODE_SUCCESS; } -static int32_t balanceCalcCpuScore(SDnodeObj *pDnode) { - if (pDnode->cpuAvgUsage < 80) - return 0; - else if (pDnode->cpuAvgUsage < 90) - return 10; - else - return 50; -} - -static int32_t balanceCalcMemoryScore(SDnodeObj *pDnode) { - if (pDnode->memoryAvgUsage < 80) - return 0; - else if (pDnode->memoryAvgUsage < 90) - return 10; - else - return 50; -} - -static int32_t balanceCalcDiskScore(SDnodeObj *pDnode) { - if (pDnode->diskAvgUsage < 80) - return 0; - else if (pDnode->diskAvgUsage < 90) - return 10; - else - return 50; -} - -static int32_t balanceCalcBandwidthScore(SDnodeObj *pDnode) { - if (pDnode->bandwidthUsage < 30) - return 0; - else if (pDnode->bandwidthUsage < 80) - return 10; - else - return 50; -} - -static float balanceCalcModuleScore(SDnodeObj *pDnode) { - if (pDnode->numOfCores <= 0) return 0; - if (pDnode->isMgmt) { - return (float)tsMnodeEqualVnodeNum / pDnode->numOfCores; - } - return 0; -} - -static float balanceCalcVnodeScore(SDnodeObj *pDnode, int32_t extra) { - if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) return 100000000; - if (pDnode->numOfCores <= 0) return 0; - return (float)(pDnode->openVnodes + extra) / pDnode->numOfCores; -} - -/** - * calc singe score, such as cpu/memory/disk/bandwitdh/vnode - * 1. get the score config - * 2. if the value is out of range, use border data - * 3. otherwise use interpolation method - **/ -void balanceCalcDnodeScore(SDnodeObj *pDnode) { - pDnode->score = balanceCalcCpuScore(pDnode) + balanceCalcMemoryScore(pDnode) + balanceCalcDiskScore(pDnode) + - balanceCalcBandwidthScore(pDnode) + balanceCalcModuleScore(pDnode) + - balanceCalcVnodeScore(pDnode, 0) + pDnode->customScore; -} - -float balanceTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extra) { - int32_t systemScore = balanceCalcCpuScore(pDnode) + balanceCalcMemoryScore(pDnode) + balanceCalcDiskScore(pDnode) + - balanceCalcBandwidthScore(pDnode); - float moduleScore = balanceCalcModuleScore(pDnode); - float vnodeScore = balanceCalcVnodeScore(pDnode, extra); - - float score = systemScore + moduleScore + vnodeScore + pDnode->customScore; - return score; -} - -static void balanceInitDnodeList() { - tsBalanceDnodeList = calloc(tsBalanceDnodeListMallocSize, sizeof(SDnodeObj *)); -} - -static void balanceCleanupDnodeList() { - if (tsBalanceDnodeList != NULL) { - free(tsBalanceDnodeList); - tsBalanceDnodeList = NULL; - } -} - -static void balanceCheckDnodeListSize(int32_t dnodesNum) { - if (tsBalanceDnodeListMallocSize <= dnodesNum) { - tsBalanceDnodeListMallocSize = dnodesNum * 2; - tsBalanceDnodeList = realloc(tsBalanceDnodeList, tsBalanceDnodeListMallocSize * sizeof(SDnodeObj *)); - } -} - -void balanceAccquireDnodeList() { - int32_t dnodesNum = mnodeGetDnodesNum(); - balanceCheckDnodeListSize(dnodesNum); - - void * pIter = NULL; - SDnodeObj *pDnode = NULL; - int32_t dnodeIndex = 0; - - while (1) { - if (dnodeIndex >= dnodesNum) { - mnodeCancelGetNextDnode(pIter); - break; - } - - pIter = mnodeGetNextDnode(pIter, &pDnode); - if (pDnode == NULL) break; - if (pDnode->status == TAOS_DN_STATUS_OFFLINE) { - mnodeDecDnodeRef(pDnode); - continue; - } - - balanceCalcDnodeScore(pDnode); - - int32_t orderIndex = dnodeIndex; - for (; orderIndex > 0; --orderIndex) { - if (pDnode->score > tsBalanceDnodeList[orderIndex - 1]->score) { - break; - } - tsBalanceDnodeList[orderIndex] = tsBalanceDnodeList[orderIndex - 1]; - } - tsBalanceDnodeList[orderIndex] = pDnode; - dnodeIndex++; - } - - tsBalanceDnodeListSize = dnodeIndex; -} - -void balanceReleaseDnodeList() { - for (int32_t i = 0; i < tsBalanceDnodeListSize; ++i) { - SDnodeObj *pDnode = tsBalanceDnodeList[i]; - if (pDnode != NULL) { - mnodeDecDnodeRef(pDnode); - } - } -} - -static int32_t balanceGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { - SUserObj *pUser = mnodeGetUserFromConn(pConn); - if (pUser == NULL) return 0; - - if (strcmp(pUser->pAcct->user, "root") != 0) { - mnodeDecUserRef(pUser); - return TSDB_CODE_MND_NO_RIGHTS; - } - - int32_t cols = 0; - SSchema *pSchema = pMeta->schema; - - pShow->bytes[cols] = 2; - pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; - strcpy(pSchema[cols].name, "id"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; - strcpy(pSchema[cols].name, "system scores"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; - strcpy(pSchema[cols].name, "custom scores"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; - strcpy(pSchema[cols].name, "module scores"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; - strcpy(pSchema[cols].name, "vnode scores"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; - strcpy(pSchema[cols].name, "total scores"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "open vnodes"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "cpu cores"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 18 + VARSTR_HEADER_SIZE; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "balance state"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pMeta->numOfColumns = htons(cols); - pShow->numOfColumns = cols; - - pShow->offset[0] = 0; - for (int32_t i = 1; i < cols; ++i) { - pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; - } - - pShow->numOfRows = mnodeGetDnodesNum(); - pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - pShow->pIter = NULL; - - mnodeDecUserRef(pUser); - - return 0; -} - -static int32_t balanceRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn) { - int32_t numOfRows = 0; - SDnodeObj *pDnode = NULL; - char * pWrite; - int32_t cols = 0; - - while (numOfRows < rows) { - pShow->pIter = mnodeGetNextDnode(pShow->pIter, &pDnode); - if (pDnode == NULL) break; - - int32_t systemScore = balanceCalcCpuScore(pDnode) + balanceCalcMemoryScore(pDnode) + balanceCalcDiskScore(pDnode) + - balanceCalcBandwidthScore(pDnode); - float moduleScore = balanceCalcModuleScore(pDnode); - float vnodeScore = balanceCalcVnodeScore(pDnode, 0); - - cols = 0; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pDnode->dnodeId; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(float *)pWrite = systemScore; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(float *)pWrite = pDnode->customScore; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(float *)pWrite = (int32_t)moduleScore; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(float *)pWrite = (int32_t)vnodeScore; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(float *)pWrite = (int32_t)(vnodeScore + moduleScore + pDnode->customScore + systemScore); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDnode->openVnodes; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDnode->numOfCores; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_TO_VARSTR(pWrite, mnodeGetDnodeStatusStr(pDnode->status)); - cols++; - - numOfRows++; - mnodeDecDnodeRef(pDnode); - } - - mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); - pShow->numOfReads += numOfRows; - return numOfRows; -} - -static void balanceMonitorDnodeModule() { +static void bnMonitorDnodeModule() { int32_t numOfMnodes = mnodeGetMnodesNum(); if (numOfMnodes >= tsNumOfMnodes) return; - for (int32_t i = 0; i < tsBalanceDnodeListSize; ++i) { - SDnodeObj *pDnode = tsBalanceDnodeList[i]; + for (int32_t i = 0; i < tsBnDnodes.size; ++i) { + SDnodeObj *pDnode = tsBnDnodes.list[i]; if (pDnode == NULL) break; if (pDnode->isMgmt || pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) { @@ -980,7 +628,7 @@ static void balanceMonitorDnodeModule() { } } -int32_t balanceAlterDnode(struct SDnodeObj *pSrcDnode, int32_t vnodeId, int32_t dnodeId) { +int32_t bnAlterDnode(struct SDnodeObj *pSrcDnode, int32_t vnodeId, int32_t dnodeId) { if (!sdbIsMaster()) { mError("dnode:%d, failed to alter vgId:%d to dnode:%d, for self not master", pSrcDnode->dnodeId, vnodeId, dnodeId); return TSDB_CODE_MND_DNODE_NOT_EXIST; @@ -1004,29 +652,29 @@ int32_t balanceAlterDnode(struct SDnodeObj *pSrcDnode, int32_t vnodeId, int32_t return TSDB_CODE_MND_DNODE_NOT_EXIST; } - balanceLock(); - balanceAccquireDnodeList(); + bnLock(); + bnAccquireDnodes(); int32_t code = TSDB_CODE_SUCCESS; - if (!balanceCheckDnodeInVgroup(pSrcDnode, pVgroup)) { + if (!bnCheckDnodeInVgroup(pSrcDnode, pVgroup)) { mError("dnode:%d, failed to alter vgId:%d to dnode:%d, vgroup not in dnode:%d", pSrcDnode->dnodeId, vnodeId, dnodeId, pSrcDnode->dnodeId); code = TSDB_CODE_MND_VGROUP_NOT_IN_DNODE; - } else if (balanceCheckDnodeInVgroup(pDestDnode, pVgroup)) { + } else if (bnCheckDnodeInVgroup(pDestDnode, pVgroup)) { mError("dnode:%d, failed to alter vgId:%d to dnode:%d, vgroup already in dnode:%d", pSrcDnode->dnodeId, vnodeId, dnodeId, dnodeId); code = TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE; - } else if (!balanceCheckFree(pDestDnode)) { + } else if (!bnCheckFree(pDestDnode)) { mError("dnode:%d, failed to alter vgId:%d to dnode:%d, for dnode:%d not free", pSrcDnode->dnodeId, vnodeId, dnodeId, dnodeId); code = TSDB_CODE_MND_DNODE_NOT_FREE; } else { - code = balanceAddVnode(pVgroup, pSrcDnode, pDestDnode); + code = bnAddVnode(pVgroup, pSrcDnode, pDestDnode); mInfo("dnode:%d, alter vgId:%d to dnode:%d, result:%s", pSrcDnode->dnodeId, vnodeId, dnodeId, tstrerror(code)); } - balanceReleaseDnodeList(); - balanceUnLock(); + bnReleaseDnodes(); + bnUnLock(); mnodeDecVgroupRef(pVgroup); mnodeDecDnodeRef(pDestDnode); diff --git a/src/balance/src/bnScore.c b/src/balance/src/bnScore.c new file mode 100644 index 0000000000000000000000000000000000000000..e5ad7a211958dce9507c5fca22f57a6d99c82029 --- /dev/null +++ b/src/balance/src/bnScore.c @@ -0,0 +1,312 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "tglobal.h" +#include "mnodeShow.h" +#include "mnodeUser.h" +#include "bnScore.h" + +SBnDnodes tsBnDnodes; + +static int32_t bnGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn); + +static int32_t bnCalcCpuScore(SDnodeObj *pDnode) { + if (pDnode->cpuAvgUsage < 80) + return 0; + else if (pDnode->cpuAvgUsage < 90) + return 10; + else + return 50; +} + +static int32_t bnCalcMemoryScore(SDnodeObj *pDnode) { + if (pDnode->memoryAvgUsage < 80) + return 0; + else if (pDnode->memoryAvgUsage < 90) + return 10; + else + return 50; +} + +static int32_t bnCalcDiskScore(SDnodeObj *pDnode) { + if (pDnode->diskAvgUsage < 80) + return 0; + else if (pDnode->diskAvgUsage < 90) + return 10; + else + return 50; +} + +static int32_t bnCalcBandScore(SDnodeObj *pDnode) { + if (pDnode->bandwidthUsage < 30) + return 0; + else if (pDnode->bandwidthUsage < 80) + return 10; + else + return 50; +} + +static float bnCalcModuleScore(SDnodeObj *pDnode) { + if (pDnode->numOfCores <= 0) return 0; + if (pDnode->isMgmt) { + return (float)tsMnodeEqualVnodeNum / pDnode->numOfCores; + } + return 0; +} + +static float bnCalcVnodeScore(SDnodeObj *pDnode, int32_t extra) { + if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) return 100000000; + if (pDnode->numOfCores <= 0) return 0; + return (float)(pDnode->openVnodes + extra) / pDnode->numOfCores; +} + +/** + * calc singe score, such as cpu/memory/disk/bandwitdh/vnode + * 1. get the score config + * 2. if the value is out of range, use border data + * 3. otherwise use interpolation method + **/ +static void bnCalcDnodeScore(SDnodeObj *pDnode) { + pDnode->score = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) + + bnCalcBandScore(pDnode) + bnCalcModuleScore(pDnode) + bnCalcVnodeScore(pDnode, 0) + + pDnode->customScore; +} + +float bnTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extra) { + int32_t systemScore = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) + + bnCalcBandScore(pDnode); + float moduleScore = bnCalcModuleScore(pDnode); + float vnodeScore = bnCalcVnodeScore(pDnode, extra); + + float score = systemScore + moduleScore + vnodeScore + pDnode->customScore; + return score; +} + +void bnInitDnodes() { + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_SCORES, bnGetScoresMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_SCORES, bnRetrieveScores); + mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_SCORES, mnodeCancelGetNextDnode); + + memset(&tsBnDnodes, 0, sizeof(SBnDnodes)); + tsBnDnodes.maxSize = 16; + tsBnDnodes.list = calloc(tsBnDnodes.maxSize, sizeof(SDnodeObj *)); +} + +void bnCleanupDnodes() { + if (tsBnDnodes.list != NULL) { + free(tsBnDnodes.list); + tsBnDnodes.list = NULL; + } +} + +static void bnCheckDnodesSize(int32_t dnodesNum) { + if (tsBnDnodes.maxSize <= dnodesNum) { + tsBnDnodes.maxSize = dnodesNum * 2; + tsBnDnodes.list = realloc(tsBnDnodes.list, tsBnDnodes.maxSize * sizeof(SDnodeObj *)); + } +} + +void bnAccquireDnodes() { + int32_t dnodesNum = mnodeGetDnodesNum(); + bnCheckDnodesSize(dnodesNum); + + void * pIter = NULL; + SDnodeObj *pDnode = NULL; + int32_t dnodeIndex = 0; + + while (1) { + if (dnodeIndex >= dnodesNum) { + mnodeCancelGetNextDnode(pIter); + break; + } + + pIter = mnodeGetNextDnode(pIter, &pDnode); + if (pDnode == NULL) break; + if (pDnode->status == TAOS_DN_STATUS_OFFLINE) { + mnodeDecDnodeRef(pDnode); + continue; + } + + bnCalcDnodeScore(pDnode); + + int32_t orderIndex = dnodeIndex; + for (; orderIndex > 0; --orderIndex) { + if (pDnode->score > tsBnDnodes.list[orderIndex - 1]->score) { + break; + } + tsBnDnodes.list[orderIndex] = tsBnDnodes.list[orderIndex - 1]; + } + tsBnDnodes.list[orderIndex] = pDnode; + dnodeIndex++; + } + + tsBnDnodes.size = dnodeIndex; +} + +void bnReleaseDnodes() { + for (int32_t i = 0; i < tsBnDnodes.size; ++i) { + SDnodeObj *pDnode = tsBnDnodes.list[i]; + if (pDnode != NULL) { + mnodeDecDnodeRef(pDnode); + } + } +} + +static int32_t bnGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { + SUserObj *pUser = mnodeGetUserFromConn(pConn); + if (pUser == NULL) return 0; + + if (strcmp(pUser->pAcct->user, "root") != 0) { + mnodeDecUserRef(pUser); + return TSDB_CODE_MND_NO_RIGHTS; + } + + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = 2; + pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; + strcpy(pSchema[cols].name, "id"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; + strcpy(pSchema[cols].name, "system scores"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; + strcpy(pSchema[cols].name, "custom scores"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; + strcpy(pSchema[cols].name, "module scores"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; + strcpy(pSchema[cols].name, "vnode scores"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; + strcpy(pSchema[cols].name, "total scores"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "open vnodes"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "cpu cores"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 18 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "balance state"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = mnodeGetDnodesNum(); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + pShow->pIter = NULL; + + mnodeDecUserRef(pUser); + + return 0; +} + +static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + int32_t numOfRows = 0; + SDnodeObj *pDnode = NULL; + char * pWrite; + int32_t cols = 0; + + while (numOfRows < rows) { + pShow->pIter = mnodeGetNextDnode(pShow->pIter, &pDnode); + if (pDnode == NULL) break; + + int32_t systemScore = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) + bnCalcBandScore(pDnode); + float moduleScore = bnCalcModuleScore(pDnode); + float vnodeScore = bnCalcVnodeScore(pDnode, 0); + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *)pWrite = pDnode->dnodeId; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(float *)pWrite = systemScore; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(float *)pWrite = pDnode->customScore; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(float *)pWrite = (int32_t)moduleScore; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(float *)pWrite = (int32_t)vnodeScore; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(float *)pWrite = (int32_t)(vnodeScore + moduleScore + pDnode->customScore + systemScore); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = pDnode->openVnodes; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = pDnode->numOfCores; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, mnodeGetDnodeStatusStr(pDnode->status)); + cols++; + + numOfRows++; + mnodeDecDnodeRef(pDnode); + } + + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + pShow->numOfReads += numOfRows; + return numOfRows; +} diff --git a/src/balance/src/bnThread.c b/src/balance/src/bnThread.c new file mode 100644 index 0000000000000000000000000000000000000000..a11bc61b01bb769873cc6c6509aab59054d0fdaa --- /dev/null +++ b/src/balance/src/bnThread.c @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "ttimer.h" +#include "tglobal.h" +#include "mnodeSdb.h" +#include "bnThread.h" + +static SBnThread tsBnThread; + +static void *bnThreadFunc(void *arg) { + while (1) { + pthread_mutex_lock(&tsBnThread.mutex); + if (tsBnThread.stop) { + pthread_mutex_unlock(&tsBnThread.mutex); + break; + } + + pthread_cond_wait(&tsBnThread.cond, &tsBnThread.mutex); + bool updateSoon = bnStart(); + bnStartTimer(updateSoon ? 1000 : -1); + pthread_mutex_unlock(&(tsBnThread.mutex)); + } + + return NULL; +} + +int32_t bnInitThread() { + memset(&tsBnThread, 0, sizeof(SBnThread)); + tsBnThread.stop = false; + pthread_mutex_init(&tsBnThread.mutex, NULL); + pthread_cond_init(&tsBnThread.cond, NULL); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); + int32_t ret = pthread_create(&tsBnThread.thread, &thattr, bnThreadFunc, NULL); + pthread_attr_destroy(&thattr); + + if (ret != 0) { + mError("failed to create balance thread since %s", strerror(errno)); + return -1; + } + + bnStartTimer(2000); + mDebug("balance thread is created"); + return 0; +} + +void bnCleanupThread() { + mDebug("balance thread will be cleanup"); + + if (tsBnThread.timer != NULL) { + taosTmrStopA(&tsBnThread.timer); + tsBnThread.timer = NULL; + mDebug("stop balance timer"); + } + + pthread_mutex_lock(&tsBnThread.mutex); + tsBnThread.stop = true; + pthread_cond_signal(&tsBnThread.cond); + pthread_mutex_unlock(&(tsBnThread.mutex)); + pthread_join(tsBnThread.thread, NULL); + + pthread_cond_destroy(&tsBnThread.cond); + pthread_mutex_destroy(&tsBnThread.mutex); +} + +static void bnPostSignal() { + pthread_mutex_lock(&tsBnThread.mutex); + pthread_cond_signal(&tsBnThread.cond); + pthread_mutex_unlock(&(tsBnThread.mutex)); +} + +/* + * once sdb work as mater, then tsAccessSquence reset to zero + * increase tsAccessSquence every balance interval + */ + +static void bnProcessTimer(void *handle, void *tmrId) { + if (!sdbIsMaster()) return; + if (tsBnThread.stop) return; + + tsBnThread.timer = NULL; + tsAccessSquence++; + + bnCheckStatus(); + bnStartTimer(-1); + + if (handle == NULL) { + if (tsAccessSquence % tsBalanceInterval == 0) { + mDebug("balance function is scheduled by timer"); + bnPostSignal(); + } + } else { + int64_t mseconds = (int64_t)handle; + mDebug("balance function is scheduled by event for %" PRId64 " mseconds arrived", mseconds); + bnPostSignal(); + } +} + +void bnStartTimer(int64_t mseconds) { + if (tsBnThread.stop) return; + + bool updateSoon = (mseconds != -1); + if (updateSoon) { + taosTmrReset(bnProcessTimer, mseconds, (void *)mseconds, tsMnodeTmr, &tsBnThread.timer); + } else { + taosTmrReset(bnProcessTimer, tsStatusInterval * 1000, NULL, tsMnodeTmr, &tsBnThread.timer); + } +} + +void bnNotify() { + bnStartTimer(500); +} diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index abbe26204d21b633319c23f62152f6283c5fe7ee..1e6f931fe39a18c7b0abbe13737038b500dd4f0a 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5103,7 +5103,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) { const int tokenDebugFlagEnd = 20; const SDNodeDynConfOption cfgOptions[] = { {"resetLog", 8}, {"resetQueryCache", 15}, {"balance", 7}, {"monitor", 7}, - {"debugFlag", 9}, {"monitorDebugFlag", 16}, {"vDebugFlag", 10}, {"mDebugFlag", 10}, + {"debugFlag", 9}, {"monDebugFlag", 12}, {"vDebugFlag", 10}, {"mDebugFlag", 10}, {"cDebugFlag", 10}, {"httpDebugFlag", 13}, {"qDebugflag", 10}, {"sdbDebugFlag", 12}, {"uDebugFlag", 10}, {"tsdbDebugFlag", 13}, {"sDebugflag", 10}, {"rpcDebugFlag", 12}, {"dDebugFlag", 10}, {"mqttDebugFlag", 13}, {"wDebugFlag", 10}, {"tmrDebugFlag", 12}, diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 6e4274b3587869a7f83032f7600b1ea1c3d6cb22..7ba7260af22be7e817e673d2f92756f661ef5703 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -179,7 +179,7 @@ extern int32_t tmrDebugFlag; extern int32_t sdbDebugFlag; extern int32_t httpDebugFlag; extern int32_t mqttDebugFlag; -extern int32_t monitorDebugFlag; +extern int32_t monDebugFlag; extern int32_t uDebugFlag; extern int32_t rpcDebugFlag; extern int32_t odbcDebugFlag; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 18aa0ae6e3c570a2414d21dc00924d4fb954c95a..dd4b738949419e6d3f92ef3c27d6e18601a44840 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -209,7 +209,7 @@ int32_t jniDebugFlag = 131; int32_t odbcDebugFlag = 131; int32_t httpDebugFlag = 131; int32_t mqttDebugFlag = 131; -int32_t monitorDebugFlag = 131; +int32_t monDebugFlag = 131; int32_t qDebugFlag = 131; int32_t rpcDebugFlag = 131; int32_t uDebugFlag = 131; @@ -219,9 +219,9 @@ int32_t wDebugFlag = 135; int32_t tsdbDebugFlag = 131; int32_t cqDebugFlag = 135; -int32_t (*monitorStartSystemFp)() = NULL; -void (*monitorStopSystemFp)() = NULL; -void (*monitorExecuteSQLFp)(char *sql) = NULL; +int32_t (*monStartSystemFp)() = NULL; +void (*monStopSystemFp)() = NULL; +void (*monExecuteSQLFp)(char *sql) = NULL; char *qtypeStr[] = {"rpc", "fwd", "wal", "cq", "query"}; @@ -238,7 +238,7 @@ void taosSetAllDebugFlag() { odbcDebugFlag = debugFlag; httpDebugFlag = debugFlag; mqttDebugFlag = debugFlag; - monitorDebugFlag = debugFlag; + monDebugFlag = debugFlag; qDebugFlag = debugFlag; rpcDebugFlag = debugFlag; uDebugFlag = debugFlag; @@ -279,15 +279,15 @@ bool taosCfgDynamicOptions(char *msg) { if (strncasecmp(cfg->option, "monitor", olen) == 0) { if (1 == vint) { - if (monitorStartSystemFp) { - (*monitorStartSystemFp)(); + if (monStartSystemFp) { + (*monStartSystemFp)(); uInfo("monitor is enabled"); } else { uError("monitor can't be updated, for monitor not initialized"); } } else { - if (monitorStopSystemFp) { - (*monitorStopSystemFp)(); + if (monStopSystemFp) { + (*monStopSystemFp)(); uInfo("monitor is disabled"); } else { uError("monitor can't be updated, for monitor not initialized"); @@ -310,8 +310,8 @@ bool taosCfgDynamicOptions(char *msg) { } if (strncasecmp(option, "resetQueryCache", 15) == 0) { - if (monitorExecuteSQLFp) { - (*monitorExecuteSQLFp)("resetQueryCache"); + if (monExecuteSQLFp) { + (*monExecuteSQLFp)("resetQueryCache"); uInfo("resetquerycache is executed"); } else { uError("resetquerycache can't be executed, for monitor not started"); @@ -1240,8 +1240,8 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); - cfg.option = "monitorDebugFlag"; - cfg.ptr = &monitorDebugFlag; + cfg.option = "monDebugFlag"; + cfg.ptr = &monDebugFlag; cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG; cfg.minValue = 0; diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 0d1a56cfc4516f34eed18d6a402158bb922aa6b4..5c01f6471659f18c63d5eeeded8c0c5d73cd1867 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -24,7 +24,7 @@ #include "tqueue.h" #include "tsync.h" #include "ttimer.h" -#include "tbalance.h" +#include "tbn.h" #include "tglobal.h" #include "dnode.h" #include "vnode.h" diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index bd9500ba51226d138fd3fe52f027d144d289681b..7faa3c8913a7cc98cb7de8f33763d533100a7715 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -78,10 +78,10 @@ static void dnodeAllocModules() { tsModule[TSDB_MOD_MONITOR].enable = (tsEnableMonitorModule == 1); tsModule[TSDB_MOD_MONITOR].name = "monitor"; - tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem; - tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem; - tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem; - tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem; + tsModule[TSDB_MOD_MONITOR].initFp = monInitSystem; + tsModule[TSDB_MOD_MONITOR].cleanUpFp = monCleanupSystem; + tsModule[TSDB_MOD_MONITOR].startFp = monStartSystem; + tsModule[TSDB_MOD_MONITOR].stopFp = monStopSystem; if (tsEnableMonitorModule) { dnodeSetModuleStatus(TSDB_MOD_MONITOR); } diff --git a/src/inc/monitor.h b/src/inc/monitor.h index b1229cca6b1fc18ac9bef4d6ea506c9a0ef59627..1aefb0b84887270c7997c381402096006fe6eaee 100644 --- a/src/inc/monitor.h +++ b/src/inc/monitor.h @@ -47,13 +47,13 @@ typedef struct { int8_t accessState; } SAcctMonitorObj; -int32_t monitorInitSystem(); -int32_t monitorStartSystem(); -void monitorStopSystem(); -void monitorCleanUpSystem(); -void monitorSaveAcctLog(SAcctMonitorObj *pMonObj); -void monitorSaveLog(int32_t level, const char *const format, ...); -void monitorExecuteSQL(char *sql); +int32_t monInitSystem(); +int32_t monStartSystem(); +void monStopSystem(); +void monCleanupSystem(); +void monSaveAcctLog(SAcctMonitorObj *pMonObj); +void monSaveLog(int32_t level, const char *const format, ...); +void monExecuteSQL(char *sql); #ifdef __cplusplus } diff --git a/src/inc/tbalance.h b/src/inc/tbn.h similarity index 71% rename from src/inc/tbalance.h rename to src/inc/tbn.h index f0da4a37476157e235993ef8c2c32baedabc2a1e..b9f4e3c608a1ae3df3a4ea0dca32c7bf9d5820a9 100644 --- a/src/inc/tbalance.h +++ b/src/inc/tbn.h @@ -23,14 +23,14 @@ extern "C" { struct SVgObj; struct SDnodeObj; -int32_t balanceInit(); -void balanceCleanUp(); -void balanceAsyncNotify(); -void balanceSyncNotify(); -void balanceReset(); -int32_t balanceAllocVnodes(struct SVgObj *pVgroup); -int32_t balanceAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId); -int32_t balanceDropDnode(struct SDnodeObj *pDnode); +int32_t bnInit(); +void bnCleanUp(); +void bnNotify(); +void bnCheckModules(); +void bnReset(); +int32_t bnAllocVnodes(struct SVgObj *pVgroup); +int32_t bnAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId); +int32_t bnDropDnode(struct SDnodeObj *pDnode); #ifdef __cplusplus } diff --git a/src/mnode/inc/mnodeInt.h b/src/mnode/inc/mnodeInt.h index 44626fd1672d5cf7e216e3fb256d434a4c27ca27..7a791d76e6796cfed22657f1bc7ffdb26890ea40 100644 --- a/src/mnode/inc/mnodeInt.h +++ b/src/mnode/inc/mnodeInt.h @@ -41,9 +41,9 @@ extern int32_t sdbDebugFlag; #define sdbDebug(...) { if (sdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }} #define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }} -#define mLError(...) { monitorSaveLog(2, __VA_ARGS__); mError(__VA_ARGS__) } -#define mLWarn(...) { monitorSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) } -#define mLInfo(...) { monitorSaveLog(0, __VA_ARGS__); mInfo(__VA_ARGS__) } +#define mLError(...) { monSaveLog(2, __VA_ARGS__); mError(__VA_ARGS__) } +#define mLWarn(...) { monSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) } +#define mLInfo(...) { monSaveLog(0, __VA_ARGS__); mInfo(__VA_ARGS__) } #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 69e8f076e9d617da13a1158eaecacd9bb3981b51..c971a945aa37dac5530c5f08701e446d5ee79cea 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -20,7 +20,7 @@ #include "tgrant.h" #include "tglobal.h" #include "tname.h" -#include "tbalance.h" +#include "tbn.h" #include "tdataformat.h" #include "mnode.h" #include "mnodeDef.h" @@ -1004,7 +1004,7 @@ static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) { mDebug("db:%s, all vgroups is altered", pDb->name); mLInfo("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); - balanceAsyncNotify(); + bnNotify(); return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 65f40603923082a9ddebee8dd27f53135a942f5c..0ff50b23072b11ab8cc31b192ea51a3c1c3004ad 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -16,12 +16,12 @@ #define _DEFAULT_SOURCE #include "os.h" #include "tgrant.h" -#include "tbalance.h" +#include "tbn.h" #include "tglobal.h" #include "tconfig.h" #include "tutil.h" #include "tsocket.h" -#include "tbalance.h" +#include "tbn.h" #include "tsync.h" #include "tdataformat.h" #include "mnode.h" @@ -115,7 +115,7 @@ static int32_t mnodeDnodeActionDelete(SSdbRow *pRow) { mnodeDropAllDnodeVgroups(pDnode); #endif mnodeDropMnodeLocal(pDnode->dnodeId); - balanceAsyncNotify(); + bnNotify(); mnodeUpdateDnodeEps(); mDebug("dnode:%d, all vgroups is dropped from sdb", pDnode->dnodeId); @@ -347,7 +347,7 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_INVALID_DNODE_CFG_OPTION; } - int32_t code = balanceAlterDnode(pDnode, vnodeId, dnodeId); + int32_t code = bnAlterDnode(pDnode, vnodeId, dnodeId); mnodeDecDnodeRef(pDnode); return code; } else { @@ -591,8 +591,8 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { mInfo("dnode:%d, from offline to online", pDnode->dnodeId); pDnode->status = TAOS_DN_STATUS_READY; pDnode->offlineReason = TAOS_DN_OFF_ONLINE; - balanceSyncNotify(); - balanceAsyncNotify(); + bnCheckModules(); + bnNotify(); } if (openVnodes != pDnode->openVnodes) { @@ -708,7 +708,7 @@ static int32_t mnodeDropDnodeByEp(char *ep, SMnodeMsg *pMsg) { #ifndef _SYNC int32_t code = mnodeDropDnode(pDnode, pMsg); #else - int32_t code = balanceDropDnode(pDnode); + int32_t code = bnDropDnode(pDnode); #endif mnodeDecDnodeRef(pDnode); return code; @@ -1182,12 +1182,12 @@ static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole) { #ifndef _SYNC -int32_t balanceInit() { return TSDB_CODE_SUCCESS; } -void balanceCleanUp() {} -void balanceAsyncNotify() {} -void balanceSyncNotify() {} -void balanceReset() {} -int32_t balanceAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId) { return TSDB_CODE_SYN_NOT_ENABLED; } +int32_t bnInit() { return TSDB_CODE_SUCCESS; } +void bnCleanUp() {} +void bnNotify() {} +void bnCheckModules() {} +void bnReset() {} +int32_t bnAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId) { return TSDB_CODE_SYN_NOT_ENABLED; } char* syncRole[] = { "offline", @@ -1197,7 +1197,7 @@ char* syncRole[] = { "master" }; -int32_t balanceAllocVnodes(SVgObj *pVgroup) { +int32_t bnAllocVnodes(SVgObj *pVgroup) { void * pIter = NULL; SDnodeObj *pDnode = NULL; SDnodeObj *pSelDnode = NULL; diff --git a/src/mnode/src/mnodeMain.c b/src/mnode/src/mnodeMain.c index d15b32da54d245b08146703aaed08a5b43ce8170..86f2c821f9c4d088f8c057dacbf9c43b20270a90 100644 --- a/src/mnode/src/mnodeMain.c +++ b/src/mnode/src/mnodeMain.c @@ -17,7 +17,7 @@ #include "os.h" #include "taosdef.h" #include "tsched.h" -#include "tbalance.h" +#include "tbn.h" #include "tgrant.h" #include "ttimer.h" #include "tglobal.h" @@ -58,7 +58,7 @@ static const SMnodeComponent tsMnodeComponents[] = { {"tables", mnodeInitTables, mnodeCleanupTables}, {"mnodes", mnodeInitMnodes, mnodeCleanupMnodes}, {"sdb", sdbInit, sdbCleanUp}, - {"balance", balanceInit, balanceCleanUp}, + {"balance", bnInit, bnCleanUp}, {"grant", grantInit, grantCleanUp}, {"show", mnodeInitShow, mnodeCleanUpShow} }; diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index 7428e8d2c8b1cdbde3c0c187f37b0fb3f6444ed0..d20d51f82b3a86506610e157203b2beb05a4373d 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -19,7 +19,7 @@ #include "tglobal.h" #include "trpc.h" #include "tsync.h" -#include "tbalance.h" +#include "tbn.h" #include "tutil.h" #include "tsocket.h" #include "tdataformat.h" diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index ef1b819018cfaf820a306aba1762e5dfcbdda338..cfb7b7781b04c0145f66a58c6d3d2eaefa0913f3 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -20,7 +20,7 @@ #include "tsystem.h" #include "tutil.h" #include "tgrant.h" -#include "tbalance.h" +#include "tbn.h" #include "tglobal.h" #include "mnode.h" #include "dnode.h" diff --git a/src/mnode/src/mnodeRead.c b/src/mnode/src/mnodeRead.c index a39d35506f64f91c8b6bb5e89ff096da090ab9f4..c2a70bc01d2194fff5b8095d06e40535b940dc25 100644 --- a/src/mnode/src/mnodeRead.c +++ b/src/mnode/src/mnodeRead.c @@ -17,7 +17,7 @@ #include "os.h" #include "taosdef.h" #include "tsched.h" -#include "tbalance.h" +#include "tbn.h" #include "tgrant.h" #include "ttimer.h" #include "tglobal.h" diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 40e2e1cfcc77e7059f980e7ad32363fd8a4323d0..2ef758baf19b3f4196699553db97cc1f954794f7 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -19,7 +19,7 @@ #include "hash.h" #include "tutil.h" #include "tref.h" -#include "tbalance.h" +#include "tbn.h" #include "tqueue.h" #include "twal.h" #include "tsync.h" @@ -244,7 +244,7 @@ static void sdbNotifyRole(int32_t vgId, int8_t role) { sdbInfo("vgId:1, mnode role changed from %s to %s", syncRole[tsSdbMgmt.role], syncRole[role]); if (role == TAOS_SYNC_ROLE_MASTER && tsSdbMgmt.role != TAOS_SYNC_ROLE_MASTER) { - balanceReset(); + bnReset(); } tsSdbMgmt.role = role; diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 3e974f417ff09f4663e25c6c62d1d341457285ff..d3020de6bd32dfcee97a068181d1e957e7a1a62a 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -20,7 +20,7 @@ #include "tsocket.h" #include "tidpool.h" #include "tsync.h" -#include "tbalance.h" +#include "tbn.h" #include "tglobal.h" #include "tdataformat.h" #include "dnode.h" @@ -563,7 +563,7 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) { pVgroup->numOfVnodes = pDb->cfg.replications; pVgroup->createdTime = taosGetTimestampMs(); pVgroup->accessState = TSDB_VN_ALL_ACCCESS; - int32_t code = balanceAllocVnodes(pVgroup); + int32_t code = bnAllocVnodes(pVgroup); if (code != TSDB_CODE_SUCCESS) { mError("db:%s, no enough dnode to alloc %d vnodes to vgroup, reason:%s", pDb->name, pVgroup->numOfVnodes, tstrerror(code)); diff --git a/src/mnode/src/mnodeWrite.c b/src/mnode/src/mnodeWrite.c index f0cfe1aedc803f35ad31c6c8d613a90e41a44e62..53981238a76ee3976689f77315eabab118816e81 100644 --- a/src/mnode/src/mnodeWrite.c +++ b/src/mnode/src/mnodeWrite.c @@ -17,7 +17,7 @@ #include "os.h" #include "taosdef.h" #include "tsched.h" -#include "tbalance.h" +#include "tbn.h" #include "tgrant.h" #include "tglobal.h" #include "trpc.h" diff --git a/src/plugins/monitor/src/monitorMain.c b/src/plugins/monitor/src/monMain.c similarity index 70% rename from src/plugins/monitor/src/monitorMain.c rename to src/plugins/monitor/src/monMain.c index 24998b54cd1a0114697c411fa145465f52e1e48a..9443b1ce12deba75a070f25ae4f67f7887870f7d 100644 --- a/src/plugins/monitor/src/monitorMain.c +++ b/src/plugins/monitor/src/monMain.c @@ -27,12 +27,12 @@ #include "monitor.h" #include "taoserror.h" -#define mnFatal(...) { if (monitorDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }} -#define mnError(...) { if (monitorDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }} -#define mnWarn(...) { if (monitorDebugFlag & DEBUG_WARN) { taosPrintLog("MON WARN ", 255, __VA_ARGS__); }} -#define mnInfo(...) { if (monitorDebugFlag & DEBUG_INFO) { taosPrintLog("MON ", 255, __VA_ARGS__); }} -#define mnDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }} -#define mnTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }} +#define monFatal(...) { if (monDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }} +#define monError(...) { if (monDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }} +#define monWarn(...) { if (monDebugFlag & DEBUG_WARN) { taosPrintLog("MON WARN ", 255, __VA_ARGS__); }} +#define monInfo(...) { if (monDebugFlag & DEBUG_INFO) { taosPrintLog("MON ", 255, __VA_ARGS__); }} +#define monDebug(...) { if (monDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monDebugFlag, __VA_ARGS__); }} +#define monTrace(...) { if (monDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monDebugFlag, __VA_ARGS__); }} #define SQL_LENGTH 1030 #define LOG_LEN_STR 100 @@ -48,12 +48,12 @@ typedef enum { MON_CMD_CREATE_TB_ACCT_ROOT, MON_CMD_CREATE_TB_SLOWQUERY, MON_CMD_MAX -} EMonitorCommand; +} EMonCmd; typedef enum { MON_STATE_NOT_INIT, MON_STATE_INITED -} EMonitorState; +} EMonState; typedef struct { pthread_t thread; @@ -64,17 +64,17 @@ typedef struct { int8_t start; // enable/disable by mnode int8_t quiting; // taosd is quiting char sql[SQL_LENGTH + 1]; -} SMonitorConn; +} SMonConn; -static SMonitorConn tsMonitor = {0}; -static void monitorSaveSystemInfo(); -static void *monitorThreadFunc(void *param); -static void monitorBuildMonitorSql(char *sql, int32_t cmd); -extern int32_t (*monitorStartSystemFp)(); -extern void (*monitorStopSystemFp)(); -extern void (*monitorExecuteSQLFp)(char *sql); +static SMonConn tsMonitor = {0}; +static void monSaveSystemInfo(); +static void *monThreadFunc(void *param); +static void monBuildMonitorSql(char *sql, int32_t cmd); +extern int32_t (*monStartSystemFp)(); +extern void (*monStopSystemFp)(); +extern void (*monExecuteSQLFp)(char *sql); -int32_t monitorInitSystem() { +int32_t monInitSystem() { if (tsMonitor.ep[0] == 0) { strcpy(tsMonitor.ep, tsLocalEp); } @@ -90,29 +90,29 @@ int32_t monitorInitSystem() { pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&tsMonitor.thread, &thAttr, monitorThreadFunc, NULL)) { - mnError("failed to create thread to for monitor module, reason:%s", strerror(errno)); + if (pthread_create(&tsMonitor.thread, &thAttr, monThreadFunc, NULL)) { + monError("failed to create thread to for monitor module, reason:%s", strerror(errno)); return -1; } pthread_attr_destroy(&thAttr); - mnDebug("monitor thread is launched"); + monDebug("monitor thread is launched"); - monitorStartSystemFp = monitorStartSystem; - monitorStopSystemFp = monitorStopSystem; + monStartSystemFp = monStartSystem; + monStopSystemFp = monStopSystem; return 0; } -int32_t monitorStartSystem() { +int32_t monStartSystem() { taos_init(); tsMonitor.start = 1; - monitorExecuteSQLFp = monitorExecuteSQL; - mnInfo("monitor module start"); + monExecuteSQLFp = monExecuteSQL; + monInfo("monitor module start"); return 0; } -static void *monitorThreadFunc(void *param) { - mnDebug("starting to initialize monitor module ..."); +static void *monThreadFunc(void *param) { + monDebug("starting to initialize monitor module ..."); while (1) { static int32_t accessTimes = 0; @@ -121,7 +121,7 @@ static void *monitorThreadFunc(void *param) { if (tsMonitor.quiting) { tsMonitor.state = MON_STATE_NOT_INIT; - mnInfo("monitor thread will quit, for taosd is quiting"); + monInfo("monitor thread will quit, for taosd is quiting"); break; } else { taosGetDisk(); @@ -132,7 +132,7 @@ static void *monitorThreadFunc(void *param) { } if (dnodeGetDnodeId() <= 0) { - mnDebug("dnode not initialized, waiting for 3000 ms to start monitor module"); + monDebug("dnode not initialized, waiting for 3000 ms to start monitor module"); continue; } @@ -140,10 +140,10 @@ static void *monitorThreadFunc(void *param) { tsMonitor.state = MON_STATE_NOT_INIT; tsMonitor.conn = taos_connect(NULL, "monitor", tsInternalPass, "", 0); if (tsMonitor.conn == NULL) { - mnError("failed to connect to database, reason:%s", tstrerror(terrno)); + monError("failed to connect to database, reason:%s", tstrerror(terrno)); continue; } else { - mnDebug("connect to database success"); + monDebug("connect to database success"); } } @@ -151,16 +151,16 @@ static void *monitorThreadFunc(void *param) { int code = 0; for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) { - monitorBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex); + monBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex); void *res = taos_query(tsMonitor.conn, tsMonitor.sql); code = taos_errno(res); taos_free_result(res); if (code != 0) { - mnError("failed to exec sql:%s, reason:%s", tsMonitor.sql, tstrerror(code)); + monError("failed to exec sql:%s, reason:%s", tsMonitor.sql, tstrerror(code)); break; } else { - mnDebug("successfully to exec sql:%s", tsMonitor.sql); + monDebug("successfully to exec sql:%s", tsMonitor.sql); } } @@ -171,16 +171,16 @@ static void *monitorThreadFunc(void *param) { if (tsMonitor.state == MON_STATE_INITED) { if (accessTimes % tsMonitorInterval == 0) { - monitorSaveSystemInfo(); + monSaveSystemInfo(); } } } - mnInfo("monitor thread is stopped"); + monInfo("monitor thread is stopped"); return NULL; } -static void monitorBuildMonitorSql(char *sql, int32_t cmd) { +static void monBuildMonitorSql(char *sql, int32_t cmd) { memset(sql, 0, SQL_LENGTH); if (cmd == MON_CMD_CREATE_DB) { @@ -236,47 +236,47 @@ static void monitorBuildMonitorSql(char *sql, int32_t cmd) { sql[SQL_LENGTH] = 0; } -void monitorStopSystem() { +void monStopSystem() { tsMonitor.start = 0; tsMonitor.state = MON_STATE_NOT_INIT; - monitorExecuteSQLFp = NULL; - mnInfo("monitor module stopped"); + monExecuteSQLFp = NULL; + monInfo("monitor module stopped"); } -void monitorCleanUpSystem() { +void monCleanupSystem() { tsMonitor.quiting = 1; - monitorStopSystem(); + monStopSystem(); pthread_join(tsMonitor.thread, NULL); if (tsMonitor.conn != NULL) { taos_close(tsMonitor.conn); tsMonitor.conn = NULL; } - mnInfo("monitor module is cleaned up"); + monInfo("monitor module is cleaned up"); } // unit is MB -static int32_t monitorBuildMemorySql(char *sql) { +static int32_t monBuildMemorySql(char *sql) { float sysMemoryUsedMB = 0; bool suc = taosGetSysMemory(&sysMemoryUsedMB); if (!suc) { - mnDebug("failed to get sys memory info"); + monDebug("failed to get sys memory info"); } float procMemoryUsedMB = 0; suc = taosGetProcMemory(&procMemoryUsedMB); if (!suc) { - mnDebug("failed to get proc memory info"); + monDebug("failed to get proc memory info"); } return sprintf(sql, ", %f, %f, %d", procMemoryUsedMB, sysMemoryUsedMB, tsTotalMemoryMB); } // unit is % -static int32_t monitorBuildCpuSql(char *sql) { +static int32_t monBuildCpuSql(char *sql) { float sysCpuUsage = 0, procCpuUsage = 0; bool suc = taosGetCpuUsage(&sysCpuUsage, &procCpuUsage); if (!suc) { - mnDebug("failed to get cpu usage"); + monDebug("failed to get cpu usage"); } if (sysCpuUsage <= procCpuUsage) { @@ -287,72 +287,72 @@ static int32_t monitorBuildCpuSql(char *sql) { } // unit is GB -static int32_t monitorBuildDiskSql(char *sql) { +static int32_t monBuildDiskSql(char *sql) { return sprintf(sql, ", %f, %d", (tsTotalDataDirGB - tsAvailDataDirGB), (int32_t)tsTotalDataDirGB); } // unit is Kb -static int32_t monitorBuildBandSql(char *sql) { +static int32_t monBuildBandSql(char *sql) { float bandSpeedKb = 0; bool suc = taosGetBandSpeed(&bandSpeedKb); if (!suc) { - mnDebug("failed to get bandwidth speed"); + monDebug("failed to get bandwidth speed"); } return sprintf(sql, ", %f", bandSpeedKb); } -static int32_t monitorBuildReqSql(char *sql) { +static int32_t monBuildReqSql(char *sql) { SStatisInfo info = dnodeGetStatisInfo(); return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.queryReqNum, info.submitReqNum); } -static int32_t monitorBuildIoSql(char *sql) { +static int32_t monBuildIoSql(char *sql) { float readKB = 0, writeKB = 0; bool suc = taosGetProcIO(&readKB, &writeKB); if (!suc) { - mnDebug("failed to get io info"); + monDebug("failed to get io info"); } return sprintf(sql, ", %f, %f", readKB, writeKB); } -static void monitorSaveSystemInfo() { +static void monSaveSystemInfo() { int64_t ts = taosGetTimestampUs(); char * sql = tsMonitor.sql; int32_t pos = snprintf(sql, SQL_LENGTH, "insert into %s.dn%d values(%" PRId64, tsMonitorDbName, dnodeGetDnodeId(), ts); - pos += monitorBuildCpuSql(sql + pos); - pos += monitorBuildMemorySql(sql + pos); - pos += monitorBuildDiskSql(sql + pos); - pos += monitorBuildBandSql(sql + pos); - pos += monitorBuildIoSql(sql + pos); - pos += monitorBuildReqSql(sql + pos); + pos += monBuildCpuSql(sql + pos); + pos += monBuildMemorySql(sql + pos); + pos += monBuildDiskSql(sql + pos); + pos += monBuildBandSql(sql + pos); + pos += monBuildIoSql(sql + pos); + pos += monBuildReqSql(sql + pos); void *res = taos_query(tsMonitor.conn, tsMonitor.sql); int code = taos_errno(res); taos_free_result(res); if (code != 0) { - mnError("failed to save system info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql); + monError("failed to save system info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql); } else { - mnDebug("successfully to save system info, sql:%s", tsMonitor.sql); + monDebug("successfully to save system info, sql:%s", tsMonitor.sql); } } -static void montiorExecSqlCb(void *param, TAOS_RES *result, int32_t code) { +static void monExecSqlCb(void *param, TAOS_RES *result, int32_t code) { int32_t c = taos_errno(result); if (c != TSDB_CODE_SUCCESS) { - mnError("save %s failed, reason:%s", (char *)param, tstrerror(c)); + monError("save %s failed, reason:%s", (char *)param, tstrerror(c)); } else { int32_t rows = taos_affected_rows(result); - mnDebug("save %s succ, rows:%d", (char *)param, rows); + monDebug("save %s succ, rows:%d", (char *)param, rows); } taos_free_result(result); } -void monitorSaveAcctLog(SAcctMonitorObj *pMon) { +void monSaveAcctLog(SAcctMonitorObj *pMon) { if (tsMonitor.state != MON_STATE_INITED) return; char sql[1024] = {0}; @@ -382,11 +382,11 @@ void monitorSaveAcctLog(SAcctMonitorObj *pMon) { pMon->totalConns, pMon->maxConns, pMon->accessState); - mnDebug("save account info, sql:%s", sql); - taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "account info"); + monDebug("save account info, sql:%s", sql); + taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "account info"); } -void monitorSaveLog(int32_t level, const char *const format, ...) { +void monSaveLog(int32_t level, const char *const format, ...) { if (tsMonitor.state != MON_STATE_INITED) return; va_list argpointer; @@ -403,13 +403,13 @@ void monitorSaveLog(int32_t level, const char *const format, ...) { len += sprintf(sql + len, "', '%s')", tsLocalEp); sql[len++] = 0; - mnDebug("save log, sql: %s", sql); - taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "log"); + monDebug("save log, sql: %s", sql); + taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "log"); } -void monitorExecuteSQL(char *sql) { +void monExecuteSQL(char *sql) { if (tsMonitor.state != MON_STATE_INITED) return; - mnDebug("execute sql:%s", sql); - taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "sql"); + monDebug("execute sql:%s", sql); + taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "sql"); } diff --git a/tests/script/general/alter/dnode.sim b/tests/script/general/alter/dnode.sim index 20ce8799791eaf2aa6198bc07d4abde70a6f31e0..73a095ec054320d0373f137250193bc4d8230234 100644 --- a/tests/script/general/alter/dnode.sim +++ b/tests/script/general/alter/dnode.sim @@ -24,7 +24,7 @@ sql alter dnode 1 debugFlag 135 sql alter dnode 1 debugFlag 131 sql alter dnode 1 monitor 0 sql alter dnode 1 debugFlag 135 -sql alter dnode 1 monitorDebugFlag 135 +sql alter dnode 1 monDebugFlag 135 sql alter dnode 1 vDebugFlag 135 sql alter dnode 1 mDebugFlag 135 sql alter dnode 1 cDebugFlag 135 @@ -44,15 +44,15 @@ sql_error alter dnode 2 tmrDebugFlag 135 print ======== step3 sql_error alter $hostname1 debugFlag 135 -sql_error alter $hostname1 monitorDebugFlag 135 +sql_error alter $hostname1 monDebugFlag 135 sql_error alter $hostname1 vDebugFlag 135 sql_error alter $hostname1 mDebugFlag 135 sql_error alter dnode $hostname2 debugFlag 135 -sql_error alter dnode $hostname2 monitorDebugFlag 135 +sql_error alter dnode $hostname2 monDebugFlag 135 sql_error alter dnode $hostname2 vDebugFlag 135 sql_error alter dnode $hostname2 mDebugFlag 135 sql alter dnode $hostname1 debugFlag 135 -sql alter dnode $hostname1 monitorDebugFlag 135 +sql alter dnode $hostname1 monDebugFlag 135 sql alter dnode $hostname1 vDebugFlag 135 sql alter dnode $hostname1 tmrDebugFlag 131 diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index e26778e86b9876ab6e6f47f4b41207ea35cafbf5..cd2f3772eb11a0c5d3a7a69fcb3d62162eadf14c 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -120,7 +120,7 @@ echo "cDebugFlag 143" >> $TAOS_CFG echo "jnidebugFlag 143" >> $TAOS_CFG echo "odbcdebugFlag 143" >> $TAOS_CFG echo "httpDebugFlag 143" >> $TAOS_CFG -echo "monitorDebugFlag 143" >> $TAOS_CFG +echo "monDebugFlag 143" >> $TAOS_CFG echo "mqttDebugFlag 143" >> $TAOS_CFG echo "qdebugFlag 143" >> $TAOS_CFG echo "rpcDebugFlag 143" >> $TAOS_CFG