From fe12d0fe110fb52d7bbc30d26dd6bfb449f297f8 Mon Sep 17 00:00:00 2001 From: slguan Date: Thu, 16 Apr 2020 17:17:03 +0800 Subject: [PATCH] add cluster codes --- src/dnode/src/dnodeMain.c | 4 + src/inc/dnode.h | 2 + src/inc/mnode.h | 15 +- src/inc/{taccount.h => tacct.h} | 20 +- src/mnode/inc/mgmtAcct.h | 44 ++ src/mnode/inc/mgmtDb.h | 2 +- src/{inc/tcluster.h => mnode/inc/mgmtDnode.h} | 29 +- src/mnode/src/mgmtAcct.c | 171 +++++- src/mnode/src/mgmtBalance.c | 10 +- src/mnode/src/mgmtDClient.c | 2 +- src/mnode/src/mgmtDb.c | 22 +- src/mnode/src/mgmtDnode.c | 580 +++++++++++++++--- src/mnode/src/mgmtMain.c | 12 +- src/mnode/src/mgmtProfile.c | 10 +- src/mnode/src/mgmtShell.c | 4 +- src/mnode/src/mgmtTable.c | 56 +- src/mnode/src/mgmtUser.c | 16 +- src/mnode/src/mgmtVgroup.c | 26 +- 18 files changed, 789 insertions(+), 236 deletions(-) rename src/inc/{taccount.h => tacct.h} (58%) create mode 100644 src/mnode/inc/mgmtAcct.h rename src/{inc/tcluster.h => mnode/inc/mgmtDnode.h} (60%) diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 5fba941788..c80de0ce6e 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -229,3 +229,7 @@ static int32_t dnodeInitStorage() { } static void dnodeCleanupStorage() {} + +bool dnodeIsFirstDeploy() { + return strcmp(tsMasterIp, tsPrivateIp) == 0; +} \ No newline at end of file diff --git a/src/inc/dnode.h b/src/inc/dnode.h index db39906c68..25ec747ac9 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -44,6 +44,8 @@ void *dnodeAllocateRqueue(void *pVnode); void dnodeFreeRqueue(void *rqueue); void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code); +bool dnodeIsFirstDeploy(); + #ifdef __cplusplus } #endif diff --git a/src/inc/mnode.h b/src/inc/mnode.h index f0407aa9e4..a5817ac9df 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -60,14 +60,16 @@ typedef struct _dnode_obj { int32_t dnodeId; uint32_t privateIp; uint32_t publicIp; + uint16_t mnodeShellPort; + uint16_t mnodeDnodePort; + uint16_t dnodeShellPort; + uint16_t dnodeMnodePort; + uint16_t syncPort; uint32_t moduleStatus; int64_t createdTime; uint32_t lastAccess; int32_t openVnodes; - int32_t numOfTotalVnodes; // from dnode status msg, config information - uint32_t rack; - uint16_t idc; - uint16_t slot; + int32_t totalVnodes; // from dnode status msg, config information uint16_t numOfCores; // from dnode status msg int8_t alternativeRole; // from dnode status msg, 0-any, 1-mgmt, 2-dnode int8_t status; // set in balance function @@ -88,7 +90,6 @@ typedef struct _dnode_obj { typedef struct { int32_t dnodeId; - uint16_t port; uint32_t privateIp; uint32_t publicIp; } SVnodeGid; @@ -209,10 +210,10 @@ typedef struct _acct_obj { SAcctCfg cfg; int32_t acctId; int64_t createdTime; - int8_t dirty; + int8_t status; int8_t reserved[14]; int8_t updateEnd[1]; - int32_t refCount; + int32_t refCount; SAcctInfo acctInfo; pthread_mutex_t mutex; } SAcctObj; diff --git a/src/inc/taccount.h b/src/inc/tacct.h similarity index 58% rename from src/inc/taccount.h rename to src/inc/tacct.h index 18a974a574..52215fac52 100644 --- a/src/inc/taccount.h +++ b/src/inc/tacct.h @@ -20,27 +20,15 @@ extern "C" { #endif -struct _acct_obj; -struct _user_obj; -struct _db_obj; - typedef enum { - TSDB_ACCT_USER, - TSDB_ACCT_DB, - TSDB_ACCT_TABLE + ACCT_GRANT_USER, + ACCT_GRANT_DB, + ACCT_GRANT_TABLE } EAcctGrantType; int32_t acctInit(); void acctCleanUp(); -void *acctGetAcct(char *acctName); -void acctIncRef(struct _acct_obj *pAcct); -void acctReleaseAcct(struct _acct_obj *pAcct); -int32_t acctCheck(struct _acct_obj *pAcct, EAcctGrantType type); - -void acctAddDb(struct _acct_obj *pAcct, struct _db_obj *pDb); -void acctRemoveDb(struct _acct_obj *pAcct, struct _db_obj *pDb); -void acctAddUser(struct _acct_obj *pAcct, struct _user_obj *pUser); -void acctRemoveUser(struct _acct_obj *pAcct, struct _user_obj *pUser); +int32_t acctCheck(void *pAcct, EAcctGrantType type); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtAcct.h b/src/mnode/inc/mgmtAcct.h new file mode 100644 index 0000000000..32d7aa7bfb --- /dev/null +++ b/src/mnode/inc/mgmtAcct.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_MGMT_ACCT_H +#define TDENGINE_MGMT_ACCT_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tacct.h" + +struct _acct_obj; +struct _user_obj; +struct _db_obj; + +int32_t mgmtInitAccts(); +void mgmtCleanUpAccts(); +void *mgmtGetAcct(char *acctName); +void mgmtIncAcctRef(struct _acct_obj *pAcct); +void mgmtDecAcctRef(struct _acct_obj *pAcct); + +void mgmtAddDbToAcct(struct _acct_obj *pAcct, struct _db_obj *pDb); +void mgmtDropDbFromAcct(struct _acct_obj *pAcct, struct _db_obj *pDb); +void mgmtAddUserToAcct(struct _acct_obj *pAcct, struct _user_obj *pUser); +void mgmtDropUserFromAcct(struct _acct_obj *pAcct, struct _user_obj *pUser); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/mnode/inc/mgmtDb.h b/src/mnode/inc/mgmtDb.h index 22c92bff6d..0479b274bb 100644 --- a/src/mnode/inc/mgmtDb.h +++ b/src/mnode/inc/mgmtDb.h @@ -33,7 +33,7 @@ void mgmtCleanUpDbs(); SDbObj *mgmtGetDb(char *db); SDbObj *mgmtGetDbByTableId(char *db); void mgmtIncDbRef(SDbObj *pDb); -void mgmtReleaseDb(SDbObj *pDb); +void mgmtDecDbRef(SDbObj *pDb); bool mgmtCheckIsMonitorDB(char *db, char *monitordb); void mgmtDropAllDbs(SAcctObj *pAcct); diff --git a/src/inc/tcluster.h b/src/mnode/inc/mgmtDnode.h similarity index 60% rename from src/inc/tcluster.h rename to src/mnode/inc/mgmtDnode.h index a56285fe1c..f964222960 100644 --- a/src/inc/tcluster.h +++ b/src/mnode/inc/mgmtDnode.h @@ -33,21 +33,20 @@ enum _TAOS_DN_STATUS { TAOS_DN_STATUS_READY }; -int32_t clusterInit(); -void clusterCleanUp(); -char* clusterGetDnodeStatusStr(int32_t dnodeStatus); -bool clusterCheckModuleInDnode(struct _dnode_obj *pDnode, int moduleType); -void clusterMonitorDnodeModule(); - -int32_t clusterInitDnodes(); -void clusterCleanupDnodes(); -int32_t clusterGetDnodesNum(); -void * clusterGetNextDnode(void *pNode, struct _dnode_obj **pDnode); -void clusterReleaseDnode(struct _dnode_obj *pDnode); -void * clusterGetDnode(int32_t dnodeId); -void * clusterGetDnodeByIp(uint32_t ip); -void clusterUpdateDnode(struct _dnode_obj *pDnode); -int32_t clusterDropDnode(struct _dnode_obj *pDnode); +int32_t mgmtInitDnodes(); +void mgmtCleanupDnodes(); + +char* mgmtGetDnodeStatusStr(int32_t dnodeStatus); +bool mgmtCheckModuleInDnode(struct _dnode_obj *pDnode, int moduleType); +void mgmtMonitorDnodeModule(); + +int32_t mgmtGetDnodesNum(); +void * mgmtGetNextDnode(void *pNode, struct _dnode_obj **pDnode); +void mgmtReleaseDnode(struct _dnode_obj *pDnode); +void * mgmtGetDnode(int32_t dnodeId); +void * mgmtGetDnodeByIp(uint32_t ip); +void mgmtUpdateDnode(struct _dnode_obj *pDnode); +int32_t mgmtDropDnode(struct _dnode_obj *pDnode); #ifdef __cplusplus } diff --git a/src/mnode/src/mgmtAcct.c b/src/mnode/src/mgmtAcct.c index a22313c52e..4741888e60 100644 --- a/src/mnode/src/mgmtAcct.c +++ b/src/mnode/src/mgmtAcct.c @@ -16,49 +16,178 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" +#include "dnode.h" #include "mnode.h" -#include "taccount.h" +#include "mgmtAcct.h" #include "mgmtDb.h" +#include "mgmtSdb.h" #include "mgmtUser.h" -#ifndef _ACCOUNT +static void * tsAcctSdb = NULL; +static int32_t tsAcctUpdateSize; +static void mgmtCreateRootAcct(); -static SAcctObj tsAcctObj = {0}; +static int32_t mgmtActionAcctDestroy(SSdbOperDesc *pOper) { + SAcctObj *pAcct = pOper->pObj; + pthread_mutex_destroy(&pAcct->mutex); + tfree(pOper->pObj); + return TSDB_CODE_SUCCESS; +} -int32_t acctInit() { - tsAcctObj.acctId = 0; - strcpy(tsAcctObj.user, "root"); +static int32_t mgmtAcctActionInsert(SSdbOperDesc *pOper) { + SAcctObj *pAcct = pOper->pObj; + memset(&pAcct->acctInfo, 0, sizeof(SAcctInfo)); + pthread_mutex_init(&pAcct->mutex, NULL); return TSDB_CODE_SUCCESS; } -void acctCleanUp() {} -void *acctGetAcct(char *acctName) { return &tsAcctObj; } -void acctIncRef(struct _acct_obj *pAcct) {} -void acctReleaseAcct(SAcctObj *pAcct) {} -int32_t acctCheck(SAcctObj *pAcct, EAcctGrantType type) { return TSDB_CODE_SUCCESS; } +static int32_t mgmtActionAcctDelete(SSdbOperDesc *pOper) { + SAcctObj *pAcct = pOper->pObj; + mgmtDropAllUsers(pAcct); + mgmtDropAllDbs(pAcct); + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtActionAcctUpdate(SSdbOperDesc *pOper) { + SAcctObj *pAcct = pOper->pObj; + SAcctObj *pSaved = mgmtGetAcct(pAcct->user); + if (pAcct != pSaved) { + memcpy(pSaved, pAcct, tsAcctUpdateSize); + free(pAcct); + } + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtActionActionEncode(SSdbOperDesc *pOper) { + SAcctObj *pAcct = pOper->pObj; + memcpy(pOper->rowData, pAcct, tsAcctUpdateSize); + pOper->rowSize = tsAcctUpdateSize; + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtActionAcctDecode(SSdbOperDesc *pOper) { + SAcctObj *pAcct = (SAcctObj *) calloc(1, sizeof(SAcctObj)); + if (pAcct == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; + + memcpy(pAcct, pOper->rowData, tsAcctUpdateSize); + pOper->pObj = pAcct; + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtActionAcctRestored() { + if (dnodeIsFirstDeploy()) { + mgmtCreateRootAcct(); + } + return TSDB_CODE_SUCCESS; +} + +int32_t mgmtInitAccts() { + SAcctObj tObj; + tsAcctUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; + + SSdbTableDesc tableDesc = { + .tableId = SDB_TABLE_ACCOUNT, + .tableName = "accounts", + .hashSessions = TSDB_MAX_ACCOUNTS, + .maxRowSize = tsAcctUpdateSize, + .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, + .keyType = SDB_KEY_STRING, + .insertFp = mgmtAcctActionInsert, + .deleteFp = mgmtActionAcctDelete, + .updateFp = mgmtActionAcctUpdate, + .encodeFp = mgmtActionActionEncode, + .decodeFp = mgmtActionAcctDecode, + .destroyFp = mgmtActionAcctDestroy, + .restoredFp = mgmtActionAcctRestored + }; -#endif + tsAcctSdb = sdbOpenTable(&tableDesc); + if (tsAcctSdb == NULL) { + mError("failed to init acct data"); + return -1; + } -void acctAddDb(SAcctObj *pAcct, SDbObj *pDb) { + mTrace("account table is created"); + return acctInit(); +} + +void mgmtCleanUpAccts() { + sdbCloseTable(tsAcctSdb); + acctCleanUp(); +} + +void *mgmtGetAcct(char *name) { + return sdbGetRow(tsAcctSdb, name); +} + +void mgmtIncAcctRef(SAcctObj *pAcct) { + sdbIncRef(tsAcctSdb, pAcct); +} + +void mgmtDecAcctRef(SAcctObj *pAcct) { + sdbDecRef(tsAcctSdb, pAcct); +} + +void mgmtAddDbToAcct(SAcctObj *pAcct, SDbObj *pDb) { atomic_add_fetch_32(&pAcct->acctInfo.numOfDbs, 1); pDb->pAcct = pAcct; - acctIncRef(pAcct); + mgmtIncAcctRef(pAcct); } -void acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb) { +void mgmtDropDbFromAcct(SAcctObj *pAcct, SDbObj *pDb) { atomic_sub_fetch_32(&pAcct->acctInfo.numOfDbs, 1); pDb->pAcct = NULL; - acctReleaseAcct(pAcct); + mgmtDecAcctRef(pAcct); } -void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) { +void mgmtAddUserToAcct(SAcctObj *pAcct, SUserObj *pUser) { atomic_add_fetch_32(&pAcct->acctInfo.numOfUsers, 1); pUser->pAcct = pAcct; - acctIncRef(pAcct); + mgmtIncAcctRef(pAcct); } -void acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser) { +void mgmtDropUserFromAcct(SAcctObj *pAcct, SUserObj *pUser) { atomic_sub_fetch_32(&pAcct->acctInfo.numOfUsers, 1); pUser->pAcct = NULL; - acctReleaseAcct(pAcct); -} \ No newline at end of file + mgmtDecAcctRef(pAcct); +} + +static void mgmtCreateRootAcct() { + int32_t numOfAccts = sdbGetNumOfRows(tsAcctSdb); + if (numOfAccts != 0) return; + + SAcctObj *pAcct = malloc(sizeof(SAcctObj)); + memset(pAcct, 0, sizeof(SAcctObj)); + strcpy(pAcct->user, "root"); + taosEncryptPass((uint8_t*)"taosdata", strlen("taosdata"), pAcct->pass); + pAcct->cfg = (SAcctCfg){ + .maxUsers = 10, + .maxDbs = 64, + .maxTimeSeries = INT32_MAX, + .maxConnections = 1024, + .maxStreams = 1000, + .maxPointsPerSecond = 10000000, + .maxStorage = INT64_MAX, + .maxQueryTime = INT64_MAX, + .maxInbound = 0, + .maxOutbound = 0, + .accessState = TSDB_VN_ALL_ACCCESS + }; + pAcct->acctId = sdbGetId(tsAcctSdb); + pAcct->createdTime = taosGetTimestampMs(); + + SSdbOperDesc oper = { + .type = SDB_OPER_GLOBAL, + .table = tsAcctSdb, + .pObj = pAcct, + }; + sdbInsertRow(&oper); +} + +#ifndef _ACCT + +int32_t acctInit() { return TSDB_CODE_SUCCESS; } +void acctCleanUp() {} +int32_t acctCheck(void *pAcct, EAcctGrantType type) { return TSDB_CODE_SUCCESS; } + +#endif \ No newline at end of file diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index 77d68f43d8..0b9e025acf 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "tbalance.h" #include "mnode.h" -#include "tcluster.h" +#include "mgmtDnode.h" #include "mgmtVgroup.h" #ifndef _VPEER @@ -31,17 +31,17 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) { float vnodeUsage = 1.0; while (1) { - pNode = clusterGetNextDnode(pNode, &pDnode); + pNode = mgmtGetNextDnode(pNode, &pDnode); if (pDnode == NULL) break; - if (pDnode->numOfTotalVnodes > 0 && pDnode->openVnodes < pDnode->numOfTotalVnodes) { - float usage = (float)pDnode->openVnodes / pDnode->numOfTotalVnodes; + if (pDnode->totalVnodes > 0 && pDnode->openVnodes < pDnode->totalVnodes) { + float usage = (float)pDnode->openVnodes / pDnode->totalVnodes; if (usage <= vnodeUsage) { pSelDnode = pDnode; vnodeUsage = usage; } } - clusterReleaseDnode(pDnode); + mgmtReleaseDnode(pDnode); } if (pSelDnode == NULL) { diff --git a/src/mnode/src/mgmtDClient.c b/src/mnode/src/mgmtDClient.c index b402a85005..699a1551d4 100644 --- a/src/mnode/src/mgmtDClient.c +++ b/src/mnode/src/mgmtDClient.c @@ -23,7 +23,7 @@ #include "mnode.h" #include "tbalance.h" #include "mgmtDb.h" -#include "tcluster.h" +#include "mgmtDnode.h" #include "tgrant.h" #include "mgmtProfile.h" #include "mgmtShell.h" diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index c38fe8af3e..99bcc365ae 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -19,10 +19,10 @@ #include "tutil.h" #include "name.h" #include "mnode.h" -#include "taccount.h" #include "tbalance.h" +#include "mgmtAcct.h" #include "mgmtDb.h" -#include "tcluster.h" +#include "mgmtDnode.h" #include "tgrant.h" #include "mpeer.h" #include "mgmtShell.h" @@ -51,7 +51,7 @@ static int32_t mgmtDbActionDestroy(SSdbOperDesc *pOper) { static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) { SDbObj *pDb = pOper->pObj; - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); pDb->pHead = NULL; pDb->pTail = NULL; @@ -60,7 +60,7 @@ static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) { pDb->numOfSuperTables = 0; if (pAcct != NULL) { - acctAddDb(pAcct, pDb); + mgmtAddDbToAcct(pAcct, pDb); } else { mError("db:%s, acct:%s info not exist in sdb", pDb->name, pDb->cfg.acct); @@ -72,9 +72,9 @@ static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) { static int32_t mgmtDbActionDelete(SSdbOperDesc *pOper) { SDbObj *pDb = pOper->pObj; - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); - acctRemoveDb(pAcct, pDb); + mgmtDropDbFromAcct(pAcct, pDb); mgmtDropAllChildTables(pDb); mgmtDropAllSuperTables(pDb); mgmtDropAllVgroups(pDb); @@ -156,7 +156,7 @@ void mgmtIncDbRef(SDbObj *pDb) { return sdbIncRef(tsDbSdb, pDb); } -void mgmtReleaseDb(SDbObj *pDb) { +void mgmtDecDbRef(SDbObj *pDb) { return sdbDecRef(tsDbSdb, pDb); } @@ -288,14 +288,14 @@ static int32_t mgmtCheckDbParams(SCMCreateDbMsg *pCreate) { } static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { - int32_t code = acctCheck(pAcct, TSDB_ACCT_DB); + int32_t code = acctCheck(pAcct, ACCT_GRANT_DB); if (code != 0) { return code; } SDbObj *pDb = mgmtGetDb(pCreate->db); if (pDb != NULL) { - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); return TSDB_CODE_DB_ALREADY_EXIST; } @@ -641,7 +641,7 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * cols++; numOfRows++; - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); } pShow->numOfReads += numOfRows; @@ -888,7 +888,7 @@ void mgmtDropAllDbs(SAcctObj *pAcct) { mgmtSetDbDropping(pDb); numOfDbs++; } - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); } mTrace("acct:%s, all dbs is is set dirty", pAcct->user, numOfDbs); diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index d13d37586a..97e9a89c8c 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -17,91 +17,209 @@ #include "os.h" #include "tmodule.h" #include "tbalance.h" -#include "tcluster.h" +#include "tgrant.h" +#include "mgmtDnode.h" #include "mnode.h" #include "mpeer.h" #include "mgmtDClient.h" -#include "mgmtShell.h" #include "mgmtDServer.h" +#include "mgmtSdb.h" +#include "mgmtShell.h" #include "mgmtUser.h" #include "mgmtVgroup.h" +#include "dnodeMClient.h" + +void *tsDnodeSdb = NULL; +int32_t tsDnodeUpdateSize = 0; +extern void * tsVgroupSdb; + +static int32_t mgmtCreateDnode(uint32_t ip); +static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg); +static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg); +static void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg); +static void mgmtProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) ; +static void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg); +static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); + +static int32_t mgmtDnodeActionDestroy(SSdbOperDesc *pOper) { + tfree(pOper->pObj); + return TSDB_CODE_SUCCESS; +} -static void clusterProcessCfgDnodeMsg(SQueuedMsg *pMsg); -static void clusterProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) ; -static void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg); -static int32_t clusterGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static int32_t clusterRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int32_t clusterGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static int32_t clusterRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int32_t clusterGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static int32_t clusterRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static int32_t clusterGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static int32_t clusterRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); - -#ifndef _CLUSTER - -static SDnodeObj tsDnodeObj = {0}; - -int32_t clusterInitDnodes() { - tsDnodeObj.dnodeId = 1; - tsDnodeObj.privateIp = inet_addr(tsPrivateIp); - tsDnodeObj.publicIp = inet_addr(tsPublicIp); - tsDnodeObj.createdTime = taosGetTimestampMs(); - tsDnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes; - tsDnodeObj.status = TAOS_DN_STATUS_OFFLINE; - tsDnodeObj.lastReboot = taosGetTimestampSec(); - sprintf(tsDnodeObj.dnodeName, "%d", tsDnodeObj.dnodeId); - - tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MGMT); - if (tsEnableHttpModule) { - tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_HTTP); - } - if (tsEnableMonitorModule) { - tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MONITOR); +static int32_t mgmtDnodeActionInsert(SSdbOperDesc *pOper) { + SDnodeObj *pDnode = pOper->pObj; + if (pDnode->status != TAOS_DN_STATUS_DROPPING) { + pDnode->status = TAOS_DN_STATUS_OFFLINE; } - return 0; + + return TSDB_CODE_SUCCESS; } -void *clusterGetNextDnode(void *pNode, SDnodeObj **pDnode) { - if (*pDnode == NULL) { - *pDnode = &tsDnodeObj; - } else { - *pDnode = NULL; +static int32_t mgmtDnodeActionDelete(SSdbOperDesc *pOper) { + SDnodeObj *pDnode = pOper->pObj; + void * pNode = NULL; + void * pLastNode = NULL; + SVgObj * pVgroup = NULL; + int32_t numOfVgroups = 0; + + while (1) { + pLastNode = pNode; + pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup); + if (pVgroup == NULL) break; + + if (pVgroup->vnodeGid[0].dnodeId == pDnode->dnodeId) { + SSdbOperDesc oper = { + .type = SDB_OPER_LOCAL, + .table = tsVgroupSdb, + .pObj = pVgroup, + }; + sdbDeleteRow(&oper); + pNode = pLastNode; + numOfVgroups++; + continue; + } + } + + mTrace("dnode:%d, all vgroups:%d is dropped from sdb", pDnode->dnodeId, numOfVgroups); + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtDnodeActionUpdate(SSdbOperDesc *pOper) { + SDnodeObj *pDnode = pOper->pObj; + SDnodeObj *pSaved = mgmtGetDnode(pDnode->dnodeId); + if (pDnode != pSaved) { + memcpy(pSaved, pDnode, pOper->rowSize); + free(pDnode); } - return *pDnode; + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtDnodeActionEncode(SSdbOperDesc *pOper) { + SDnodeObj *pDnode = pOper->pObj; + memcpy(pOper->rowData, pDnode, tsDnodeUpdateSize); + pOper->rowSize = tsDnodeUpdateSize; + return TSDB_CODE_SUCCESS; } -void clusterCleanupDnodes() {} -int32_t clusterGetDnodesNum() { return 1; } -void * clusterGetDnode(int32_t dnodeId) { return dnodeId == 1 ? &tsDnodeObj : NULL; } -void * clusterGetDnodeByIp(uint32_t ip) { return &tsDnodeObj; } -void clusterReleaseDnode(struct _dnode_obj *pDnode) {} -void clusterUpdateDnode(struct _dnode_obj *pDnode) {} -void clusterMonitorDnodeModule() {} +static int32_t mgmtDnodeActionDecode(SSdbOperDesc *pOper) { + SDnodeObj *pDnode = (SDnodeObj *) calloc(1, sizeof(SDnodeObj)); + if (pDnode == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; -#endif + memcpy(pDnode, pOper->rowData, tsDnodeUpdateSize); + pOper->pObj = pDnode; + return TSDB_CODE_SUCCESS; +} -int32_t clusterInit() { - mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, clusterProcessCfgDnodeMsg); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, clusterProcessCfgDnodeMsgRsp); - mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, clusterProcessDnodeStatusMsg); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, clusterGetModuleMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, clusterRetrieveModules); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONFIGS, clusterGetConfigMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_CONFIGS, clusterRetrieveConfigs); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VNODES, clusterGetVnodeMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VNODES, clusterRetrieveVnodes); - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_DNODE, clusterGetDnodeMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_DNODE, clusterRetrieveDnodes); +static int32_t mgmtDnodeActionRestored() { + int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb); + if (numOfRows <= 0) { + if (strcmp(tsMasterIp, tsPrivateIp) == 0) { + mgmtCreateDnode(inet_addr(tsPrivateIp)); + } + } + + return 0; +} + +int32_t mgmtInitDnodes() { + SDnodeObj tObj; + tsDnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; + + SSdbTableDesc tableDesc = { + .tableId = SDB_TABLE_DNODE, + .tableName = "dnodes", + .hashSessions = TSDB_MAX_DNODES, + .maxRowSize = tsDnodeUpdateSize, + .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, + .keyType = SDB_KEY_AUTO, + .insertFp = mgmtDnodeActionInsert, + .deleteFp = mgmtDnodeActionDelete, + .updateFp = mgmtDnodeActionUpdate, + .encodeFp = mgmtDnodeActionEncode, + .decodeFp = mgmtDnodeActionDecode, + .destroyFp = mgmtDnodeActionDestroy, + .restoredFp = mgmtDnodeActionRestored + }; + + tsDnodeSdb = sdbOpenTable(&tableDesc); + if (tsDnodeSdb == NULL) { + mError("failed to init dnodes data"); + return -1; + } + + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DNODE, mgmtProcessCreateDnodeMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_DNODE, mgmtProcessDropDnodeMsg); + mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp); + mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg); + mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta); + mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules); + mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtGetConfigMeta); + mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_CONFIGS, mgmtRetrieveConfigs); + mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VNODES, mgmtGetVnodeMeta); + mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VNODES, mgmtRetrieveVnodes); + mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_DNODE, mgmtGetDnodeMeta); + mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_DNODE, mgmtRetrieveDnodes); - return clusterInitDnodes(); + mTrace("dnodes table is created"); + return 0; +} + +void mgmtCleanupDnodes() { + sdbCloseTable(tsDnodeSdb); +} + +void *mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode) { + return sdbFetchRow(tsDnodeSdb, pNode, (void **)pDnode); +} + +int32_t mgmtGetDnodesNum() { + return sdbGetNumOfRows(tsDnodeSdb); +} + +void *mgmtGetDnode(int32_t dnodeId) { + return sdbGetRow(tsDnodeSdb, &dnodeId); +} + +void *mgmtGetDnodeByIp(uint32_t ip) { + SDnodeObj *pDnode = NULL; + void * pNode = NULL; + + while (1) { + pNode = sdbFetchRow(tsDnodeSdb, pNode, (void**)&pDnode); + if (pDnode == NULL) break; + if (ip == pDnode->privateIp) { + return pDnode; + } + mgmtReleaseDnode(pDnode); + } + + return NULL; +} + +void mgmtReleaseDnode(SDnodeObj *pDnode) { + sdbDecRef(tsDnodeSdb, pDnode); } -void clusterCleanUp() { - clusterCleanupDnodes(); +void mgmtUpdateDnode(SDnodeObj *pDnode) { + SSdbOperDesc oper = { + .type = SDB_OPER_GLOBAL, + .table = tsDnodeSdb, + .pObj = pDnode, + .rowSize = tsDnodeUpdateSize + }; + + sdbUpdateRow(&oper); } -void clusterProcessCfgDnodeMsg(SQueuedMsg *pMsg) { +void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SCMCfgDnodeMsg *pCmCfgDnode = pMsg->pCont; @@ -137,11 +255,11 @@ void clusterProcessCfgDnodeMsg(SQueuedMsg *pMsg) { rpcSendResponse(&rpcRsp); } -static void clusterProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) { +static void mgmtProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) { mPrint("cfg vnode rsp is received, result:%s", tstrerror(rpcMsg->code)); } -void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { +void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { SDMStatusMsg *pStatus = rpcMsg->pCont; pStatus->dnodeId = htonl(pStatus->dnodeId); pStatus->privateIp = htonl(pStatus->privateIp); @@ -159,14 +277,14 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { SDnodeObj *pDnode = NULL; if (pStatus->dnodeId == 0) { - pDnode = clusterGetDnodeByIp(pStatus->privateIp); + pDnode = mgmtGetDnodeByIp(pStatus->privateIp); if (pDnode == NULL) { mTrace("dnode not created, privateIp:%s", taosIpStr(pStatus->privateIp)); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); return; } } else { - pDnode = clusterGetDnode(pStatus->dnodeId); + pDnode = mgmtGetDnode(pStatus->dnodeId); if (pDnode == NULL) { mError("dnode:%d, not exist, privateIp:%s", pStatus->dnodeId, taosIpStr(pStatus->privateIp)); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); @@ -180,7 +298,7 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { pDnode->numOfCores = pStatus->numOfCores; pDnode->diskAvailable = pStatus->diskAvailable; pDnode->alternativeRole = pStatus->alternativeRole; - pDnode->numOfTotalVnodes = pStatus->numOfTotalVnodes; + pDnode->totalVnodes = pStatus->numOfTotalVnodes; if (pStatus->dnodeId == 0) { mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName); @@ -209,10 +327,10 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { mTrace("dnode:%d, from offline to online", pDnode->dnodeId); pDnode->status = TAOS_DN_STATUS_READY; balanceNotify(); - clusterMonitorDnodeModule(); + mgmtMonitorDnodeModule(); } - clusterReleaseDnode(pDnode); + mgmtReleaseDnode(pDnode); int32_t contLen = sizeof(SDMStatusRsp) + TSDB_MAX_VNODES * sizeof(SVnodeAccess); SDMStatusRsp *pRsp = rpcMallocCont(contLen); @@ -242,7 +360,123 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { rpcSendResponse(&rpcRsp); } -static int32_t clusterGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { +static int32_t mgmtCreateDnode(uint32_t ip) { + int32_t grantCode = grantCheck(TSDB_GRANT_DNODE); + if (grantCode != TSDB_CODE_SUCCESS) { + return grantCode; + } + + SDnodeObj *pDnode = mgmtGetDnodeByIp(ip); + if (pDnode != NULL) { + mError("dnode:%d is alredy exist, ip:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp)); + return TSDB_CODE_DNODE_ALREADY_EXIST; + } + + pDnode = (SDnodeObj *) calloc(1, sizeof(SDnodeObj)); + pDnode->privateIp = ip; + pDnode->publicIp = ip; + pDnode->createdTime = taosGetTimestampMs(); + pDnode->status = TAOS_DN_STATUS_OFFLINE; + pDnode->totalVnodes = TSDB_INVALID_VNODE_NUM; + sprintf(pDnode->dnodeName, "n%d", sdbGetId(tsDnodeSdb) + 1); + + if (pDnode->privateIp == inet_addr(tsMasterIp)) { + pDnode->moduleStatus |= (1 << TSDB_MOD_MGMT); + } + + SSdbOperDesc oper = { + .type = SDB_OPER_GLOBAL, + .table = tsDnodeSdb, + .pObj = pDnode, + .rowSize = sizeof(SDnodeObj) + }; + + int32_t code = sdbInsertRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + tfree(pDnode); + code = TSDB_CODE_SDB_ERROR; + } + + mPrint("dnode:%d is created, result:%s", pDnode->dnodeId, tstrerror(code)); + return code; +} + +int32_t mgmtDropDnode(SDnodeObj *pDnode) { + SSdbOperDesc oper = { + .type = SDB_OPER_GLOBAL, + .table = tsDnodeSdb, + .pObj = pDnode + }; + + int32_t code = sdbDeleteRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_SDB_ERROR; + } + + mLPrint("dnode:%d is dropped from cluster, result:%s", pDnode->dnodeId, tstrerror(code)); + return code; +} + +static int32_t clusterDropDnodeByIp(uint32_t ip) { + SDnodeObj *pDnode = mgmtGetDnodeByIp(ip); + if (pDnode == NULL) { + mError("dnode:%s, is not exist", taosIpStr(ip)); + return TSDB_CODE_INVALID_VALUE; + } + + if (pDnode->privateIp == dnodeGetMnodeMasteIp()) { + mError("dnode:%d, can't drop dnode which is master", pDnode->dnodeId); + return TSDB_CODE_NO_REMOVE_MASTER; + } + +#ifndef _VPEER + return mgmtDropDnode(pDnode); +#else + return balanceDropDnode(pDnode); +#endif +} + +static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg) { + SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + SCMCreateDnodeMsg *pCreate = pMsg->pCont; + + if (strcmp(pMsg->pUser->pAcct->user, "root") != 0) { + rpcRsp.code = TSDB_CODE_NO_RIGHTS; + } else { + uint32_t ip = inet_addr(pCreate->ip); + rpcRsp.code = mgmtCreateDnode(ip); + if (rpcRsp.code == TSDB_CODE_SUCCESS) { + SDnodeObj *pDnode = mgmtGetDnodeByIp(ip); + mLPrint("dnode:%d, ip:%s is created by %s", pDnode->dnodeId, pCreate->ip, pMsg->pUser->user); + } else { + mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(rpcRsp.code)); + } + } + rpcSendResponse(&rpcRsp); +} + + +static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg) { + SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; + + SCMDropDnodeMsg *pDrop = pMsg->pCont; + if (strcmp(pMsg->pUser->pAcct->user, "root") != 0) { + rpcRsp.code = TSDB_CODE_NO_RIGHTS; + } else { + uint32_t ip = inet_addr(pDrop->ip); + rpcRsp.code = clusterDropDnodeByIp(ip); + if (rpcRsp.code == TSDB_CODE_SUCCESS) { + mLPrint("dnode:%s is dropped by %s", pDrop->ip, pMsg->pUser->user); + } else { + mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(rpcRsp.code)); + } + } + + rpcSendResponse(&rpcRsp); +} + +static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); if (pUser == NULL) return 0; @@ -309,7 +543,7 @@ static int32_t clusterGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void * pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; } - pShow->numOfRows = clusterGetDnodesNum(); + pShow->numOfRows = mgmtGetDnodesNum(); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->pNode = NULL; @@ -318,7 +552,7 @@ static int32_t clusterGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void * return 0; } -static int32_t clusterRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; int32_t cols = 0; SDnodeObj *pDnode = NULL; @@ -326,7 +560,7 @@ static int32_t clusterRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, char ipstr[32]; while (numOfRows < rows) { - pShow->pNode = clusterGetNextDnode(pShow->pNode, &pDnode); + pShow->pNode = mgmtGetNextDnode(pShow->pNode, &pDnode); if (pDnode == NULL) break; cols = 0; @@ -350,7 +584,7 @@ static int32_t clusterRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, clusterGetDnodeStatusStr(pDnode->status)); + strcpy(pWrite, mgmtGetDnodeStatusStr(pDnode->status)); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -358,29 +592,29 @@ static int32_t clusterRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pDnode->numOfTotalVnodes; + *(int16_t *)pWrite = pDnode->totalVnodes; cols++; #ifdef _VPEER pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, clusterGetDnodeStatusStr(pDnode->status)); + strcpy(pWrite, mgmtGetDnodeStatusStr(pDnode->status)); cols++; #endif numOfRows++; - clusterReleaseDnode(pDnode); + mgmtReleaseDnode(pDnode); } pShow->numOfReads += numOfRows; return numOfRows; } -bool clusterCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { +bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { uint32_t status = pDnode->moduleStatus & (1 << moduleType); return status > 0; } -static int32_t clusterGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { +static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { int32_t cols = 0; SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); @@ -419,10 +653,10 @@ static int32_t clusterGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void pShow->numOfRows = 0; SDnodeObj *pDnode = NULL; while (1) { - pShow->pNode = clusterGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode); + pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode); if (pDnode == NULL) break; for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) { - if (clusterCheckModuleInDnode(pDnode, moduleType)) { + if (mgmtCheckModuleInDnode(pDnode, moduleType)) { pShow->numOfRows++; } } @@ -435,7 +669,7 @@ static int32_t clusterGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void return 0; } -int32_t clusterRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; SDnodeObj *pDnode = NULL; char * pWrite; @@ -443,12 +677,12 @@ int32_t clusterRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void * char ipstr[20]; while (numOfRows < rows) { - clusterReleaseDnode(pDnode); - pShow->pNode = clusterGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode); + mgmtReleaseDnode(pDnode); + pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode); if (pDnode == NULL) break; for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) { - if (!clusterCheckModuleInDnode(pDnode, moduleType)) { + if (!mgmtCheckModuleInDnode(pDnode, moduleType)) { continue; } @@ -464,7 +698,7 @@ int32_t clusterRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void * cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, clusterGetDnodeStatusStr(pDnode->status)); + strcpy(pWrite, mgmtGetDnodeStatusStr(pDnode->status)); cols++; numOfRows++; @@ -481,7 +715,7 @@ static bool clusterCheckConfigShow(SGlobalConfig *cfg) { return true; } -static int32_t clusterGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { +static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { int32_t cols = 0; SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); @@ -523,7 +757,7 @@ static int32_t clusterGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void return 0; } -static int32_t clusterRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; for (int32_t i = tsGlobalConfigNum - 1; i >= 0 && numOfRows < rows; --i) { @@ -570,7 +804,7 @@ static int32_t clusterRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, return numOfRows; } -static int32_t clusterGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { +static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { int32_t cols = 0; SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); if (pUser == NULL) return 0; @@ -599,7 +833,7 @@ static int32_t clusterGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void * SDnodeObj *pDnode = NULL; if (pShow->payloadLen > 0 ) { uint32_t ip = ip2uint(pShow->payload); - pDnode = clusterGetDnodeByIp(ip); + pDnode = mgmtGetDnodeByIp(ip); if (NULL == pDnode) { return TSDB_CODE_NODE_OFFLINE; } @@ -616,7 +850,7 @@ static int32_t clusterGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void * pShow->pNode = pDnode; } else { while (true) { - pShow->pNode = clusterGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode); + pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode); if (pDnode == NULL) break; pShow->numOfRows += pDnode->openVnodes; @@ -627,13 +861,13 @@ static int32_t clusterGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void * } pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - clusterReleaseDnode(pDnode); + mgmtReleaseDnode(pDnode); mgmtReleaseUser(pUser); return 0; } -static int32_t clusterRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { +static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t numOfRows = 0; SDnodeObj *pDnode = NULL; char * pWrite; @@ -674,7 +908,7 @@ static int32_t clusterRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, return numOfRows; } -char* clusterGetDnodeStatusStr(int32_t dnodeStatus) { +char* mgmtGetDnodeStatusStr(int32_t dnodeStatus) { switch (dnodeStatus) { case TAOS_DN_STATUS_OFFLINE: return "offline"; case TAOS_DN_STATUS_DROPPING: return "dropping"; @@ -683,3 +917,155 @@ char* clusterGetDnodeStatusStr(int32_t dnodeStatus) { default: return "undefined"; } } + + +static void clusterSetModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { + pDnode->moduleStatus |= (1 << moduleType); + mgmtUpdateDnode(pDnode); + + if (moduleType == TSDB_MOD_MGMT) { + mpeerAddMnode(pDnode->dnodeId); + mPrint("dnode:%d, add it into mnode list", pDnode->dnodeId); + } +} + +static void clusterUnSetModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { + pDnode->moduleStatus &= ~(1 << moduleType); + mgmtUpdateDnode(pDnode); + + if (moduleType == TSDB_MOD_MGMT) { + mpeerRemoveMnode(pDnode->dnodeId); + mPrint("dnode:%d, remove it from mnode list", pDnode->dnodeId); + } +} + +static void clusterStopAllModuleInDnode(SDnodeObj *pDnode) { + for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) { + if (!mgmtCheckModuleInDnode(pDnode, moduleType)) { + continue; + } + + mPrint("dnode:%d, stop %s module for its offline or remove", pDnode->dnodeId, tsModule[moduleType].name); + clusterUnSetModuleInDnode(pDnode, moduleType); + } +} + +static void clusterStartModuleInAllDnodes(int32_t moduleType) { + void * pNode = NULL; + SDnodeObj *pDnode = NULL; + + while (1) { + pNode = mgmtGetNextDnode(pNode, &pDnode); + if (pDnode == NULL) break; + + if (!mgmtCheckModuleInDnode(pDnode, moduleType) + && pDnode->status != TAOS_DN_STATUS_OFFLINE + && pDnode->status != TAOS_DN_STATUS_DROPPING) { + mPrint("dnode:%d, add %s module for schedule", pDnode->dnodeId, tsModule[moduleType].name); + clusterSetModuleInDnode(pDnode, moduleType); + } + + mgmtReleaseDnode(pNode); + } +} + +static void clusterStartModuleInOneDnode(int32_t moduleType) { + void * pNode = NULL; + SDnodeObj *pDnode = NULL; + + while (1) { + pNode = mgmtGetNextDnode(pNode, &pDnode); + if (pDnode == NULL) break; + + if (!mgmtCheckModuleInDnode(pDnode, moduleType) + && pDnode->status != TAOS_DN_STATUS_OFFLINE + && pDnode->status != TAOS_DN_STATUS_DROPPING + && !(moduleType == TSDB_MOD_MGMT && pDnode->alternativeRole == TSDB_DNODE_ROLE_VNODE)) { + mPrint("dnode:%d, add %s module for schedule", pDnode->dnodeId, tsModule[moduleType].name); + clusterSetModuleInDnode(pDnode, moduleType); + mgmtReleaseDnode(pNode); + break; + } + + mgmtReleaseDnode(pNode); + } +} + +static void clusterStopModuleInOneDnode(int32_t moduleType) { + void * pNode = NULL; + SDnodeObj *pDnode = NULL; + + while (1) { + pNode = mgmtGetNextDnode(pNode, &pDnode); + if (pDnode == NULL) break; + + if (mgmtCheckModuleInDnode(pDnode, moduleType)) { + mPrint("dnode:%d, stop %s module for schedule", pDnode->dnodeId, tsModule[moduleType].name); + clusterUnSetModuleInDnode(pDnode, moduleType); + mgmtReleaseDnode(pNode); + break; + } + + mgmtReleaseDnode(pNode); + } +} + +void mgmtMonitorDnodeModule() { + void * pNode = NULL; + SDnodeObj *pDnode = NULL; + int32_t onlineDnodes = 0; + + for (int32_t moduleType = 0; moduleType < TSDB_MOD_MGMT+1; ++moduleType) { + tsModule[moduleType].curNum = 0; + } + + // dnode loop + while (1) { + pNode = mgmtGetNextDnode(pNode, &pDnode); + if (pDnode == NULL) break; + + if (pDnode->status == TAOS_DN_STATUS_DROPPING) { + mPrint("dnode:%d, status:%d, remove all modules for removing", pDnode->dnodeId, pDnode->status); + clusterStopAllModuleInDnode(pDnode); + mgmtReleaseDnode(pDnode); + continue; + } + + for (int32_t moduleType = 0; moduleType < TSDB_MOD_MGMT+1; ++moduleType) { + if (mgmtCheckModuleInDnode(pDnode, moduleType)) { + tsModule[moduleType].curNum ++; + } + } + + if (pDnode->status != TAOS_DN_STATUS_OFFLINE) { + onlineDnodes++; + } + + mgmtReleaseDnode(pDnode); + } + + for (int32_t moduleType = 0; moduleType < TSDB_MOD_MGMT+1; ++moduleType) { + if (tsModule[moduleType].num == -1) { + clusterStartModuleInAllDnodes(moduleType); + continue; + } + if (tsModule[moduleType].curNum < tsModule[moduleType].num) { + if (onlineDnodes <= tsModule[moduleType].curNum) { + continue; + } + mTrace("need add %s module, curNum:%d, expectNum:%d", tsModule[moduleType].name, tsModule[moduleType].curNum, + tsModule[moduleType].num); + for (int32_t i = tsModule[moduleType].curNum; i < tsModule[moduleType].num; ++i) { + clusterStartModuleInOneDnode(moduleType); + } + } else if (tsModule[moduleType].curNum > tsModule[moduleType].num) { + mTrace("need drop %s module, curNum:%d, expectNum:%d", tsModule[moduleType].name, tsModule[moduleType].curNum, + tsModule[moduleType].num); + for (int32_t i = tsModule[moduleType].num; i < tsModule[moduleType].curNum; ++i) { + clusterStopModuleInOneDnode(moduleType); + } + } else { + } + } +} + diff --git a/src/mnode/src/mgmtMain.c b/src/mnode/src/mgmtMain.c index b6fb1ba425..46721f4834 100644 --- a/src/mnode/src/mgmtMain.c +++ b/src/mnode/src/mgmtMain.c @@ -19,9 +19,9 @@ #include "tmodule.h" #include "tsched.h" #include "mnode.h" -#include "taccount.h" +#include "mgmtAcct.h" #include "tbalance.h" -#include "tcluster.h" +#include "mgmtDnode.h" #include "tgrant.h" #include "mpeer.h" #include "mgmtDb.h" @@ -74,7 +74,7 @@ int32_t mgmtStartSystem() { return -1; } - if (acctInit() < 0) { + if (mgmtInitAccts() < 0) { mError("failed to init accts"); return -1; } @@ -89,7 +89,7 @@ int32_t mgmtStartSystem() { return -1; } - if (clusterInit() < 0) { + if (mgmtInitDnodes() < 0) { mError("failed to init dnodes"); return -1; } @@ -160,9 +160,9 @@ void mgmtCleanUpSystem() { mgmtCleanUpTables(); mgmtCleanUpVgroups(); mgmtCleanUpDbs(); - clusterCleanUp(); + mgmtCleanupDnodes(); mgmtCleanUpUsers(); - acctCleanUp(); + mgmtCleanUpAccts(); sdbCleanUp(); taosTmrCleanUp(tsMgmtTmr); mPrint("mgmt is cleaned up"); diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index f7dec4656b..db1b764ca7 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -16,8 +16,8 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taosmsg.h" -#include "taccount.h" -#include "tcluster.h" +#include "mgmtAcct.h" +#include "mgmtDnode.h" #include "mgmtDb.h" #include "mpeer.h" #include "mgmtProfile.h" @@ -787,11 +787,11 @@ void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { if (pMsg != NULL) { rpcFreeCont(pMsg->pCont); if (pMsg->pUser) mgmtReleaseUser(pMsg->pUser); - if (pMsg->pDb) mgmtReleaseDb(pMsg->pDb); + if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb); if (pMsg->pVgroup) mgmtReleaseVgroup(pMsg->pVgroup); if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable); - if (pMsg->pAcct) acctReleaseAcct(pMsg->pAcct); - if (pMsg->pDnode) clusterReleaseDnode(pMsg->pDnode); + if (pMsg->pAcct) mgmtDecAcctRef(pMsg->pAcct); + if (pMsg->pDnode) mgmtReleaseDnode(pMsg->pDnode); free(pMsg); } } diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 5010429db3..82eb2bae1e 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -22,10 +22,10 @@ #include "tsched.h" #include "dnode.h" #include "mnode.h" -#include "taccount.h" +#include "mgmtAcct.h" #include "tbalance.h" #include "mgmtDb.h" -#include "tcluster.h" +#include "mgmtDnode.h" #include "tgrant.h" #include "mpeer.h" #include "mgmtProfile.h" diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 10cd343a5b..65dfb06ad5 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -23,10 +23,10 @@ #include "taosmsg.h" #include "tscompression.h" #include "name.h" -#include "taccount.h" +#include "mgmtAcct.h" #include "mgmtDClient.h" #include "mgmtDb.h" -#include "tcluster.h" +#include "mgmtDnode.h" #include "mgmtDServer.h" #include "tgrant.h" #include "mpeer.h" @@ -101,14 +101,14 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) { mError("ctable:%s, vgroup:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); return TSDB_CODE_INVALID_DB; } - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct == NULL) { mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct); return TSDB_CODE_INVALID_ACCT; } - acctReleaseAcct(pAcct); + mgmtDecAcctRef(pAcct); if (pTable->info.type == TSDB_CHILD_TABLE) { pTable->superTable = mgmtGetSuperTable(pTable->superTableId); @@ -143,14 +143,14 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) { mError("ctable:%s, vgroup:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); return TSDB_CODE_INVALID_DB; } - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct == NULL) { mError("ctable:%s, account:%s not exists", pTable->info.tableId, pDb->cfg.acct); return TSDB_CODE_INVALID_ACCT; } - acctReleaseAcct(pAcct); + mgmtDecAcctRef(pAcct); if (pTable->info.type == TSDB_CHILD_TABLE) { grantRestore(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1); @@ -258,7 +258,7 @@ static int32_t mgmtChildTableActionRestored() { pNode = pLastNode; continue; } - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { @@ -401,7 +401,7 @@ static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) { if (pDb != NULL) { mgmtAddSuperTableIntoDb(pDb); } - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); return TSDB_CODE_SUCCESS; } @@ -413,7 +413,7 @@ static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) { mgmtRemoveSuperTableFromDb(pDb); mgmtDropAllChildTablesInStable((SSuperTableObj *)pStable); } - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); return TSDB_CODE_SUCCESS; } @@ -923,10 +923,10 @@ static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSc pStable->numOfColumns += ncols; pStable->sversion++; - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct != NULL) { pAcct->acctInfo.numOfTimeSeries += (ncols * pStable->numOfTables); - acctReleaseAcct(pAcct); + mgmtDecAcctRef(pAcct); } SSdbOperDesc oper = { @@ -960,10 +960,10 @@ static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, ch int32_t schemaSize = sizeof(SSchema) * (pStable->numOfTags + pStable->numOfColumns); pStable->schema = realloc(pStable->schema, schemaSize); - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct != NULL) { pAcct->acctInfo.numOfTimeSeries -= pStable->numOfTables; - acctReleaseAcct(pAcct); + mgmtDecAcctRef(pAcct); } SSdbOperDesc oper = { @@ -1029,7 +1029,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, pShow->numOfRows = pDb->numOfSuperTables; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); return 0; } @@ -1094,7 +1094,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v } pShow->numOfReads += numOfRows; - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); return numOfRows; } @@ -1191,14 +1191,14 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { pRsp->vgroups[vg].vgId = htonl(vgId); for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) { - SDnodeObj *pDnode = clusterGetDnode(pVgroup->vnodeGid[vn].dnodeId); + SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[vn].dnodeId); if (pDnode == NULL) break; pRsp->vgroups[vg].ipAddr[vn].ip = htonl(pDnode->privateIp); pRsp->vgroups[vg].ipAddr[vn].port = htons(tsDnodeShellPort); pRsp->vgroups[vg].numOfIps++; - clusterReleaseDnode(pDnode); + mgmtReleaseDnode(pDnode); } mgmtReleaseVgroup(pVgroup); @@ -1500,10 +1500,10 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc pTable->numOfColumns += ncols; pTable->sversion++; - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct != NULL) { pAcct->acctInfo.numOfTimeSeries += ncols; - acctReleaseAcct(pAcct); + mgmtDecAcctRef(pAcct); } SSdbOperDesc oper = { @@ -1534,10 +1534,10 @@ static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, ch pTable->numOfColumns--; pTable->sversion++; - SAcctObj *pAcct = acctGetAcct(pDb->cfg.acct); + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); if (pAcct != NULL) { pAcct->acctInfo.numOfTimeSeries--; - acctReleaseAcct(pAcct); + mgmtDecAcctRef(pAcct); } SSdbOperDesc oper = { @@ -1600,7 +1600,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { } for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - SDnodeObj *pDnode = clusterGetDnode(pVgroup->vnodeGid[i].dnodeId); + SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId); if (pDnode == NULL) break; if (usePublicIp) { pMeta->vgroup.ipAddr[i].ip = htonl(pDnode->publicIp); @@ -1610,7 +1610,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { pMeta->vgroup.ipAddr[i].port = htonl(tsDnodeShellPort); } pMeta->vgroup.numOfIps++; - clusterReleaseDnode(pDnode); + mgmtReleaseDnode(pDnode); } pMeta->vgroup.vgId = htonl(pVgroup->vgId); @@ -1730,7 +1730,7 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) { } static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_t sid) { - SDnodeObj *pObj = clusterGetDnode(dnodeId); + SDnodeObj *pObj = mgmtGetDnode(dnodeId); SVgObj *pVgroup = mgmtGetVgroup(vnode); if (pObj == NULL || pVgroup == NULL) { @@ -1968,7 +1968,7 @@ static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void pShow->numOfRows = pDb->numOfTables; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); return 0; } @@ -2045,7 +2045,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, const int32_t NUM_OF_COLUMNS = 4; mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); return numOfRows; } diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 931272fb58..7a7f2999b3 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -18,7 +18,7 @@ #include "trpc.h" #include "ttime.h" #include "tutil.h" -#include "taccount.h" +#include "mgmtAcct.h" #include "tgrant.h" #include "mpeer.h" #include "mgmtSdb.h" @@ -40,10 +40,10 @@ static int32_t mgmtUserActionDestroy(SSdbOperDesc *pOper) { static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) { SUserObj *pUser = pOper->pObj; - SAcctObj *pAcct = acctGetAcct(pUser->acct); + SAcctObj *pAcct = mgmtGetAcct(pUser->acct); if (pAcct != NULL) { - acctAddUser(pAcct, pUser); + mgmtAddUserToAcct(pAcct, pUser); } else { mError("user:%s, acct:%s info not exist in sdb", pUser->user, pUser->acct); @@ -55,10 +55,10 @@ static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) { static int32_t mgmtUserActionDelete(SSdbOperDesc *pOper) { SUserObj *pUser = pOper->pObj; - SAcctObj *pAcct = acctGetAcct(pUser->acct); + SAcctObj *pAcct = mgmtGetAcct(pUser->acct); if (pAcct != NULL) { - acctRemoveUser(pAcct, pUser); + mgmtDropUserFromAcct(pAcct, pUser); } return TSDB_CODE_SUCCESS; @@ -92,11 +92,11 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) { static int32_t mgmtUserActionRestored() { if (strcmp(tsMasterIp, tsPrivateIp) == 0) { - SAcctObj *pAcct = acctGetAcct("root"); + SAcctObj *pAcct = mgmtGetAcct("root"); mgmtCreateUser(pAcct, "root", "taosdata"); mgmtCreateUser(pAcct, "monitor", tsInternalPass); mgmtCreateUser(pAcct, "_root", tsInternalPass); - acctReleaseAcct(pAcct); + mgmtDecAcctRef(pAcct); } return 0; @@ -167,7 +167,7 @@ static int32_t mgmtUpdateUser(SUserObj *pUser) { } int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) { - int32_t code = acctCheck(pAcct, TSDB_ACCT_USER); + int32_t code = acctCheck(pAcct, ACCT_GRANT_USER); if (code != 0) { return code; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 98d9b22d8e..ee9afd9586 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -19,7 +19,7 @@ #include "tlog.h" #include "tbalance.h" #include "tsync.h" -#include "tcluster.h" +#include "mgmtDnode.h" #include "mnode.h" #include "mgmtDb.h" #include "mgmtDClient.h" @@ -63,7 +63,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) { if (pDb == NULL) { return TSDB_CODE_INVALID_DB; } - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); pVgroup->pDb = pDb; pVgroup->prev = NULL; @@ -84,12 +84,12 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) { } for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - SDnodeObj *pDnode = clusterGetDnode(pVgroup->vnodeGid[i].dnodeId); + SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId); if (pDnode != NULL) { pVgroup->vnodeGid[i].privateIp = pDnode->privateIp; pVgroup->vnodeGid[i].publicIp = pDnode->publicIp; atomic_add_fetch_32(&pDnode->openVnodes, 1); - clusterReleaseDnode(pDnode); + mgmtReleaseDnode(pDnode); } } @@ -106,14 +106,14 @@ static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) { mgmtRemoveVgroupFromDb(pVgroup); } - mgmtReleaseDb(pVgroup->pDb); + mgmtDecDbRef(pVgroup->pDb); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - SDnodeObj *pDnode = clusterGetDnode(pVgroup->vnodeGid[i].dnodeId); + SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId); if (pDnode) { atomic_sub_fetch_32(&pDnode->openVnodes, 1); } - clusterReleaseDnode(pDnode); + mgmtReleaseDnode(pDnode); } return TSDB_CODE_SUCCESS; @@ -381,18 +381,18 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { pShow->pNode = pVgroup; } - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); return 0; } char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { - SDnodeObj *pDnode = clusterGetDnode(pVnode->dnodeId); + SDnodeObj *pDnode = mgmtGetDnode(pVnode->dnodeId); if (pDnode == NULL) { mError("vgroup:%d, not exist in dnode:%d", pVgroup->vgId, pDnode->dnodeId); return "null"; } - clusterReleaseDnode(pDnode); + mgmtReleaseDnode(pDnode); if (pDnode->status == TAOS_DN_STATUS_OFFLINE) { return "offline"; @@ -467,7 +467,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo } pShow->numOfReads += numOfRows; - mgmtReleaseDb(pDb); + mgmtDecDbRef(pDb); return numOfRows; } @@ -676,13 +676,13 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) { pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->vgId = htonl(pCfg->vgId); - SDnodeObj *pDnode = clusterGetDnode(pCfg->dnodeId); + SDnodeObj *pDnode = mgmtGetDnode(pCfg->dnodeId); if (pDnode == NULL) { mTrace("dnode:%s, invalid dnode", taosIpStr(pCfg->dnodeId), pCfg->vgId); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE); return; } - clusterReleaseDnode(pDnode); + mgmtReleaseDnode(pDnode); SVgObj *pVgroup = mgmtGetVgroup(pCfg->vgId); if (pVgroup == NULL) { -- GitLab