diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index c5934bcdc4b06f8eea97bdb2f79b56753f8b8df7..d8273738c37ca20b738c0afdc635630ec153a38c 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -143,8 +143,6 @@ void dnodeSendStatusMsgToMgmt(void *handle, void *tmrId) { // } dnodeSendMsgToMnode(TSDB_MSG_TYPE_STATUS, pStatus, contLen); - - //grantSendMsgToMgmt(); } diff --git a/src/dnode/src/dnodeVnodeMgmt.c b/src/dnode/src/dnodeVnodeMgmt.c index cf43f87aaae1882f52bac362bdad294d3d475f2e..fd1a0b6f2857799813448e8bd73e0b3faa50aa3f 100644 --- a/src/dnode/src/dnodeVnodeMgmt.c +++ b/src/dnode/src/dnodeVnodeMgmt.c @@ -59,3 +59,6 @@ bool dnodeCheckTableExist(int32_t vnode, int32_t sid, int64_t uid) { return true; } +int32_t dnodeGetVnodesNum() { + return 1; +} diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 7de97a21f88490bafc21707608d43a2d543b06a9..34c8b8c77e4821ebb7a1aab1c4e148ffd3893302 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -258,10 +258,10 @@ typedef struct { } SShowObj; //mgmtSystem +int32_t mgmtInitSystem(); int32_t mgmtStartSystem(); -void mgmtCleanUpSystem(); -extern void (*mgmtCleanUpRedirect)(); - +void mgmtCleanUpSystem(); +void mgmtStopSystem(); void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); diff --git a/src/mnode/inc/mgmtBalance.h b/src/mnode/inc/mgmtBalance.h index 336811105c2ded4f8018b7d50837fae5b9ee31b7..61331b9b3d3c8d90cfe48550511e44e07a8fbd5e 100644 --- a/src/mnode/inc/mgmtBalance.h +++ b/src/mnode/inc/mgmtBalance.h @@ -20,19 +20,13 @@ extern "C" { #endif -#include "os.h" #include "mnode.h" -extern void (*mgmtStartBalanceTimer)(int64_t mseconds); -extern int32_t (*mgmtInitBalance)(); -extern void (*mgmtCleanupBalance)(); -extern int32_t (*mgmtAllocVnodes)(SVgObj *pVgroup); -extern bool (*mgmtCheckModuleInDnode)(SDnodeObj *pDnode, int moduleType); -extern char* (*mgmtGetVnodeStatus)(SVgObj *pVgroup, SVnodeGid *pVnode); -extern bool (*mgmtCheckVnodeReady)(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode); -extern void (*mgmtUpdateDnodeState)(SDnodeObj *pDnode, int lbStatus); -extern void (*mgmtUpdateVgroupState)(SVgObj *pVgroup, int lbStatus, int srcIp); -bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode); +void mgmtStartBalanceTimer(int64_t mseconds); +int32_t mgmtInitBalance(); +void mgmtCleanupBalance(); +int32_t mgmtAllocVnodes(SVgObj *pVgroup); +char* mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 15e3a8550aa45f479d088404e7095d0f8ef0b7b5..4cdac1e7afd367fedcab36a07f6e36d26a215551 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -34,6 +34,7 @@ void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode); int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn); +bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType); int32_t mgmtGetModuleMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn); diff --git a/src/mnode/inc/mgmtSystem.h b/src/mnode/inc/mgmtSystem.h index 2ea052287d908f28c036c520403edda084dd34bb..5d71809f36de9ab44797c907c89fad44de0d47f6 100644 --- a/src/mnode/inc/mgmtSystem.h +++ b/src/mnode/inc/mgmtSystem.h @@ -27,10 +27,6 @@ int32_t mgmtStartSystem(); void mgmtCleanUpSystem(); void mgmtStopSystem(); - - -extern void (*mgmtCleanUpRedirect)(); - #ifdef __cplusplus } #endif diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index 13c12162fb39b069b586d418ab72d3da6d4cfbbb..1e5fc54c5a82102b5a0dd8db840be8d1ccc0bc63 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -21,76 +21,67 @@ #include "mgmtBalance.h" #include "mgmtDnode.h" -void mgmtStartBalanceTimerImp(int64_t mseconds) {} -void (*mgmtStartBalanceTimer)(int64_t mseconds) = mgmtStartBalanceTimerImp; - -int32_t mgmtInitBalanceImp() { return 0; } -int32_t (*mgmtInitBalance)() = mgmtInitBalanceImp; - -void mgmtCleanupBalanceImp() {} -void (*mgmtCleanupBalance)() = mgmtCleanupBalanceImp; - -int32_t mgmtAllocVnodesImp(SVgObj *pVgroup) { -// int selectedVnode = -1; -// int lastAllocVode = pDnode->lastAllocVnode; -// -// for (int i = 0; i < pDnode->numOfVnodes; i++) { -// int vnode = (i + lastAllocVode) % pDnode->numOfVnodes; -// if (pDnode->vload[vnode].vgId == 0 && pDnode->vload[vnode].status == TSDB_VN_STATUS_OFFLINE) { -// selectedVnode = vnode; -// break; -// } -// } -// -// if (selectedVnode == -1) { -// mError("vgroup:%d alloc vnode failed, free vnodes:%d", pVgroup->vgId, pDnode->numOfFreeVnodes); -// return -1; -// } else { -// mTrace("vgroup:%d allocate vnode:%d, last allocated vnode:%d", pVgroup->vgId, selectedVnode, lastAllocVode); -// pVgroup->vnodeGid[0].vnode = selectedVnode; -// pDnode->lastAllocVnode = selectedVnode + 1; -// if (pDnode->lastAllocVnode >= pDnode->numOfVnodes) pDnode->lastAllocVnode = 0; -// return 0; -// } - return 0; +void (*mgmtStartBalanceTimerFp)(int64_t mseconds) = NULL; +int32_t (*mgmtInitBalanceFp)() = NULL; +void (*mgmtCleanupBalanceFp)() = NULL; +int32_t (*mgmtAllocVnodesFp)(SVgObj *pVgroup) = NULL; +char * (*mgmtGetVnodeStatusFp)(SVgObj *pVgroup, SVnodeGid *pVnode) = NULL; + +void mgmtStartBalanceTimer(int64_t mseconds) { + if (mgmtStartBalanceTimerFp) { + (*mgmtStartBalanceTimerFp)(mseconds); + } } -int32_t (*mgmtAllocVnodes)(SVgObj *pVgroup) = mgmtAllocVnodesImp; -bool mgmtCheckModuleInDnodeImp(SDnodeObj *pDnode, int moduleType) { - return tsModule[moduleType].num != 0; +int32_t mgmtInitBalance() { + if (mgmtInitBalanceFp) { + return (*mgmtInitBalanceFp)(); + } else { + return 0; + } } -bool (*mgmtCheckModuleInDnode)(SDnodeObj *pDnode, int moduleType) = mgmtCheckModuleInDnodeImp; - -char *mgmtGetVnodeStatusImp(SVgObj *pVgroup, SVnodeGid *pVnode) { - return "master"; +void mgmtCleanupBalance() { + if (mgmtCleanupBalanceFp) { + (*mgmtCleanupBalanceFp)(); + } } -char *(*mgmtGetVnodeStatus)(SVgObj *pVgroup, SVnodeGid *pVnode) = mgmtGetVnodeStatusImp; - -bool mgmtCheckVnodeReadyImp(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) { - return true; -} +int32_t mgmtAllocVnodes(SVgObj *pVgroup) { + if (mgmtAllocVnodesFp) { + return mgmtAllocVnodesFp(pVgroup); + } -bool (*mgmtCheckVnodeReady)(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) = mgmtCheckVnodeReadyImp; + SDnodeObj *pDnode = mgmtGetDnode(0); + if (pDnode == NULL) return TSDB_CODE_OTHERS; -void mgmtUpdateDnodeStateImp(SDnodeObj *pDnode, int lbStatus) { -} + int selectedVnode = -1; + int lastAllocVode = pDnode->lastAllocVnode; -void (*mgmtUpdateDnodeState)(SDnodeObj *pDnode, int lbStatus) = mgmtUpdateDnodeStateImp; + for (int i = 0; i < pDnode->numOfVnodes; i++) { + int vnode = (i + lastAllocVode) % pDnode->numOfVnodes; + if (pDnode->vload[vnode].vgId == 0 && pDnode->vload[vnode].status == TSDB_VN_STATUS_OFFLINE) { + selectedVnode = vnode; + break; + } + } -void mgmtUpdateVgroupStateImp(SVgObj *pVgroup, int lbStatus, int srcIp) { + if (selectedVnode == -1) { + mError("vgroup:%d alloc vnode failed, free vnodes:%d", pVgroup->vgId, pDnode->numOfFreeVnodes); + return -1; + } else { + mTrace("vgroup:%d allocate vnode:%d, last allocated vnode:%d", pVgroup->vgId, selectedVnode, lastAllocVode); + pVgroup->vnodeGid[0].vnode = selectedVnode; + pDnode->lastAllocVnode = selectedVnode + 1; + if (pDnode->lastAllocVnode >= pDnode->numOfVnodes) pDnode->lastAllocVnode = 0; + return 0; + } } -void (*mgmtUpdateVgroupState)(SVgObj *pVgroup, int lbStatus, int srcIp) = mgmtUpdateVgroupStateImp; - - -bool (*mgmtAddVnodeFp)(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) = NULL; -bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { - if (mgmtAddVnodeFp) { - return mgmtAddVnodeFp(pVgroup, pSrcDnode, pDestDnode); +char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { + if (mgmtGetVnodeStatusFp) { + return (*mgmtGetVnodeStatusFp)(pVgroup, pVnode); } else { - return false; + return "master"; } } - diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 857e05fddc7297b4349b65b067ccb510fb02f28d..bedd51dcffa23cc791b301acc708778b7db60490 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -409,68 +409,69 @@ void mgmtMonitorDbDrop(void *unused, void *unusedt) { } int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { - int32_t code = TSDB_CODE_SUCCESS; - - SDbObj *pDb = (SDbObj *) sdbGetRow(tsDbSdb, pAlter->db); - if (pDb == NULL) { - mTrace("db:%s is not exist", pAlter->db); - return TSDB_CODE_INVALID_DB; - } - - int32_t oldReplicaNum = pDb->cfg.replications; - if (pAlter->daysToKeep > 0) { - mTrace("db:%s daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, pAlter->daysToKeep); - pDb->cfg.daysToKeep = pAlter->daysToKeep; - } else if (pAlter->replications > 0) { - mTrace("db:%s replica:%d change to %d", pDb->name, pDb->cfg.replications, pAlter->replications); - if (pAlter->replications < TSDB_REPLICA_MIN_NUM || pAlter->replications > TSDB_REPLICA_MAX_NUM) { - mError("invalid db option replica: %d valid range: %d--%d", pAlter->replications, TSDB_REPLICA_MIN_NUM, TSDB_REPLICA_MAX_NUM); - return TSDB_CODE_INVALID_OPTION; - } - pDb->cfg.replications = pAlter->replications; - } else if (pAlter->maxSessions > 0) { - mTrace("db:%s tables:%d change to %d", pDb->name, pDb->cfg.maxSessions, pAlter->maxSessions); - if (pAlter->maxSessions < TSDB_MIN_TABLES_PER_VNODE || pAlter->maxSessions > TSDB_MAX_TABLES_PER_VNODE) { - mError("invalid db option tables: %d valid range: %d--%d", pAlter->maxSessions, TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE); - return TSDB_CODE_INVALID_OPTION; - } - if (pAlter->maxSessions < pDb->cfg.maxSessions) { - mError("invalid db option tables: %d should larger than original:%d", pAlter->maxSessions, pDb->cfg.maxSessions); - return TSDB_CODE_INVALID_OPTION; - } - return TSDB_CODE_INVALID_OPTION; - //The modification of tables needs to rewrite the head file, so disable this option - //pDb->cfg.maxSessions = pAlter->maxSessions; - } else { - mError("db:%s alter msg, replica:%d, keep:%d, tables:%d, origin replica:%d keep:%d", pDb->name, - pAlter->replications, pAlter->maxSessions, pAlter->daysToKeep, - pDb->cfg.replications, pDb->cfg.daysToKeep); - return TSDB_CODE_INVALID_OPTION; - } - - if (sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1) < 0) { - return TSDB_CODE_SDB_ERROR; - } - - SVgObj *pVgroup = pDb->pHead; - while (pVgroup != NULL) { - mgmtUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); - if (oldReplicaNum < pDb->cfg.replications) { - if (!mgmtAddVnode(pVgroup, NULL, NULL)) { - mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId); - code = TSDB_CODE_NO_ENOUGH_DNODES; - } - } - if (pAlter->maxSessions > 0) { - //rebuild meterList in mgmtVgroup.c - mgmtUpdateVgroup(pVgroup); - } -// mgmtSendCreateVnodeMsg(pVgroup); - pVgroup = pVgroup->next; - } - mgmtStartBalanceTimer(10); - - return code; + return 0; +// int32_t code = TSDB_CODE_SUCCESS; +// +// SDbObj *pDb = (SDbObj *) sdbGetRow(tsDbSdb, pAlter->db); +// if (pDb == NULL) { +// mTrace("db:%s is not exist", pAlter->db); +// return TSDB_CODE_INVALID_DB; +// } +// +// int32_t oldReplicaNum = pDb->cfg.replications; +// if (pAlter->daysToKeep > 0) { +// mTrace("db:%s daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, pAlter->daysToKeep); +// pDb->cfg.daysToKeep = pAlter->daysToKeep; +// } else if (pAlter->replications > 0) { +// mTrace("db:%s replica:%d change to %d", pDb->name, pDb->cfg.replications, pAlter->replications); +// if (pAlter->replications < TSDB_REPLICA_MIN_NUM || pAlter->replications > TSDB_REPLICA_MAX_NUM) { +// mError("invalid db option replica: %d valid range: %d--%d", pAlter->replications, TSDB_REPLICA_MIN_NUM, TSDB_REPLICA_MAX_NUM); +// return TSDB_CODE_INVALID_OPTION; +// } +// pDb->cfg.replications = pAlter->replications; +// } else if (pAlter->maxSessions > 0) { +// mTrace("db:%s tables:%d change to %d", pDb->name, pDb->cfg.maxSessions, pAlter->maxSessions); +// if (pAlter->maxSessions < TSDB_MIN_TABLES_PER_VNODE || pAlter->maxSessions > TSDB_MAX_TABLES_PER_VNODE) { +// mError("invalid db option tables: %d valid range: %d--%d", pAlter->maxSessions, TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE); +// return TSDB_CODE_INVALID_OPTION; +// } +// if (pAlter->maxSessions < pDb->cfg.maxSessions) { +// mError("invalid db option tables: %d should larger than original:%d", pAlter->maxSessions, pDb->cfg.maxSessions); +// return TSDB_CODE_INVALID_OPTION; +// } +// return TSDB_CODE_INVALID_OPTION; +// //The modification of tables needs to rewrite the head file, so disable this option +// //pDb->cfg.maxSessions = pAlter->maxSessions; +// } else { +// mError("db:%s alter msg, replica:%d, keep:%d, tables:%d, origin replica:%d keep:%d", pDb->name, +// pAlter->replications, pAlter->maxSessions, pAlter->daysToKeep, +// pDb->cfg.replications, pDb->cfg.daysToKeep); +// return TSDB_CODE_INVALID_OPTION; +// } +// +// if (sdbUpdateRow(tsDbSdb, pDb, tsDbUpdateSize, 1) < 0) { +// return TSDB_CODE_SDB_ERROR; +// } +// +// SVgObj *pVgroup = pDb->pHead; +// while (pVgroup != NULL) { +// mgmtUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); +// if (oldReplicaNum < pDb->cfg.replications) { +// if (!mgmtAddVnode(pVgroup, NULL, NULL)) { +// mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId); +// code = TSDB_CODE_NO_ENOUGH_DNODES; +// } +// } +// if (pAlter->maxSessions > 0) { +// //rebuild meterList in mgmtVgroup.c +// mgmtUpdateVgroup(pVgroup); +// } +//// mgmtSendCreateVnodeMsg(pVgroup); +// pVgroup = pVgroup->next; +// } +// mgmtStartBalanceTimer(10); +// +// return code; } int32_t mgmtAddVgroupIntoDb(SDbObj *pDb, SVgObj *pVgroup) { diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 8c85d4f5f65dcf0ba42ee1f630c89073515e7c07..20043c380081f86fbc025f2ce849cb4d9157c0da 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -34,7 +34,7 @@ int32_t (*mgmtGetScoresMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pCon int32_t (*mgmtRetrieveScoresFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL; void (*mgmtSetDnodeUnRemoveFp)(SDnodeObj *pDnode) = NULL; -static SDnodeObj tsDnodeObj; +static SDnodeObj tsDnodeObj = {0}; void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { int32_t maxVnodes = pDnode->numOfCores * tsNumOfVnodesPerCore; @@ -225,6 +225,11 @@ int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon return numOfRows; } +bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { + uint32_t status = pDnode->moduleStatus & (1 << moduleType); + return status > 0; +} + int32_t mgmtGetModuleMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { int32_t cols = 0; @@ -543,6 +548,14 @@ int32_t mgmtInitDnodes() { tsDnodeObj.thandle = (void *) (1); //hack way tsDnodeObj.status = TSDB_DN_STATUS_READY; mgmtSetDnodeMaxVnodes(&tsDnodeObj); + + tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MGMT); + if (tsEnableHttpModule) { + tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_HTTP); + } + if (tsEnableMonitorModule) { + tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MONITOR); + } return 0; } } diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 92f43e64f6a0c62fd67f12e36594357fa227d8c0..1c60312f3eebb0bd0e8909f4e7b4aae003070fae 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -91,8 +91,8 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon pShow->pNode = mgmtGetNextMnode(pShow, (SDnodeObj **)&pMnode); - pShow->pNode = sdbFetchRow(mnodeSdb, pShow->pNode, (void **)&pMnode); - if (pMnode == NULL) break; +// pShow->pNode = sdbFetchRow(mnodeSdb, pShow->pNode, (void **)&pMnode); +// if (pMnode == NULL) break; cols = 0; diff --git a/src/mnode/src/mgmtSystem.c b/src/mnode/src/mgmtSystem.c index bf8763cf7f98ec8aa8c325ab1f1776fd3e477687..e36788a5ddb79896d9e89a958bdcba2999af784a 100644 --- a/src/mnode/src/mgmtSystem.c +++ b/src/mnode/src/mgmtSystem.c @@ -38,7 +38,6 @@ void *tsMgmtTranQhandle = NULL; void mgmtCleanUpSystem() { mPrint("starting to clean up mgmt"); - mgmtCleanUpRedirect(); sdbCleanUpPeers(); mgmtCleanupBalance(); mgmtCleanUpDnodeInt(); @@ -63,8 +62,8 @@ int32_t mgmtCheckMgmtRunning() { tsetModuleStatus(TSDB_MOD_MGMT); // strcpy(sdbMasterIp, mgmtIpStr[0]); - strcpy(sdbPrivateIp, tsPrivateIp); - sdbPublicIp = inet_addr(tsPublicIp); +// strcpy(sdbPrivateIp, tsPrivateIp); +// sdbPublicIp = inet_addr(tsPublicIp); return 0; }