diff --git a/src/mnode/inc/mgmtDb.h b/src/mnode/inc/mgmtDb.h index 920217b9b8e0cc2d96acfb9fc26d6712cee1f5f1..b00a2bdf3de532755576fd308bf49877b9956b43 100644 --- a/src/mnode/inc/mgmtDb.h +++ b/src/mnode/inc/mgmtDb.h @@ -32,6 +32,7 @@ int32_t mgmtInitDbs(); void mgmtCleanUpDbs(); SDbObj *mgmtGetDb(char *db); SDbObj *mgmtGetDbByTableId(char *db); +void * mgmtGetNextDb(void *pNode, SDbObj **pDb); void mgmtIncDbRef(SDbObj *pDb); void mgmtDecDbRef(SDbObj *pDb); bool mgmtCheckIsMonitorDB(char *db, char *monitordb); diff --git a/src/mnode/inc/mgmtDef.h b/src/mnode/inc/mgmtDef.h index c0fc3ea3d521fadf3d68c5519ab6f00d795f2cf4..34249d3f00cddfca1c73df0732386ba08c6a832f 100644 --- a/src/mnode/inc/mgmtDef.h +++ b/src/mnode/inc/mgmtDef.h @@ -237,7 +237,6 @@ typedef struct { typedef struct { uint8_t msgType; - int8_t usePublicIp; int8_t received; int8_t successed; int8_t expected; diff --git a/src/mnode/inc/mgmtMnode.h b/src/mnode/inc/mgmtMnode.h index cebbc061f65533f21e5aa6cf8b0936bc668f5e40..cb1d009c8c46962903316fb21b8f1c51fd032f76 100644 --- a/src/mnode/inc/mgmtMnode.h +++ b/src/mnode/inc/mgmtMnode.h @@ -33,16 +33,19 @@ void mgmtCleanupMnodes(); int32_t mgmtAddMnode(int32_t dnodeId); int32_t mgmtDropMnode(int32_t dnodeId); +void mgmtDropMnodeLocal(int32_t dnodeId); void * mgmtGetMnode(int32_t mnodeId); int32_t mgmtGetMnodesNum(); void * mgmtGetNextMnode(void *pNode, struct SMnodeObj **pMnode); -void mgmtReleaseMnode(struct SMnodeObj *pMnode); +void mgmtIncMnodeRef(struct SMnodeObj *pMnode); +void mgmtDecMnodeRef(struct SMnodeObj *pMnode); char * mgmtGetMnodeRoleStr(); void mgmtGetMnodeIpSet(SRpcIpSet *ipSet); void mgmtGetMnodeInfos(void *mnodes); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 03d31d8e4bdf0ddf30b4c00140a32dbccbed2b91..9c4aa4a2a550bf0990cb5015716e6e91d9de4ea4 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -22,13 +22,15 @@ extern "C" { #include "mgmtDef.h" -int32_t mgmtInitTables(); -void mgmtCleanUpTables(); -STableObj* mgmtGetTable(char* tableId); -void mgmtIncTableRef(void *pTable); -void mgmtDecTableRef(void *pTable); -void mgmtDropAllChildTables(SDbObj *pDropDb); -void mgmtDropAllSuperTables(SDbObj *pDropDb); +int32_t mgmtInitTables(); +void mgmtCleanUpTables(); +void * mgmtGetTable(char *tableId); +void mgmtIncTableRef(void *pTable); +void mgmtDecTableRef(void *pTable); +void * mgmtGetNextChildTable(void *pNode, SChildTableObj **pTable); +void * mgmtGetNextSuperTable(void *pNode, SSuperTableObj **pTable); +void mgmtDropAllChildTables(SDbObj *pDropDb); +void mgmtDropAllSuperTables(SDbObj *pDropDb); #ifdef __cplusplus } diff --git a/src/mnode/inc/mgmtUser.h b/src/mnode/inc/mgmtUser.h index d0fd03de779e2e4e50fb31bd6633fe0c4824f9ac..0a21b1f7044db6349466fff8d5cc5a1bf7a5b80a 100644 --- a/src/mnode/inc/mgmtUser.h +++ b/src/mnode/inc/mgmtUser.h @@ -27,7 +27,7 @@ SUserObj *mgmtGetUser(char *name); void * mgmtGetNextUser(void *pNode, SUserObj **pUser); void mgmtIncUserRef(SUserObj *pUser); void mgmtDecUserRef(SUserObj *pUser); -SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp); +SUserObj *mgmtGetUserFromConn(void *pConn); int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass); void mgmtDropAllUsers(SAcctObj *pAcct); diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 3f8927fbd05c215a863e2b14c083bffbb9549bcd..3f8dc35a00eb27f37714b3895fe28db6abf4fd99 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -32,7 +32,8 @@ void mgmtCleanUpVgroups(); SVgObj *mgmtGetVgroup(int32_t vgId); void mgmtIncVgroupRef(SVgObj *pVgroup); void mgmtDecVgroupRef(SVgObj *pVgroup); -void mgmtDropAllVgroups(SDbObj *pDropDb); +void mgmtDropAllDbVgroups(SDbObj *pDropDb); +void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode); void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup); void mgmtUpdateVgroup(SVgObj *pVgroup); diff --git a/src/mnode/src/mgmtAcct.c b/src/mnode/src/mgmtAcct.c index f9e2c8b105add14976f85b097038f8fce865c46b..9b7815af120cff534137328208502e215414c482 100644 --- a/src/mnode/src/mgmtAcct.c +++ b/src/mnode/src/mgmtAcct.c @@ -27,8 +27,8 @@ #include "mgmtUser.h" void * tsAcctSdb = NULL; -int32_t tsAcctUpdateSize; -static void mgmtCreateRootAcct(); +static int32_t tsAcctUpdateSize; +static void mgmtCreateRootAcct(); static int32_t mgmtActionAcctDestroy(SSdbOper *pOper) { SAcctObj *pAcct = pOper->pObj; diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index c6c10e0da00e426b9123b31040d8450e59d0d9bc..17d4a4114b6404670c320e2a3d1533754bdfa299 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -35,7 +35,7 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) { void * pNode = NULL; SDnodeObj *pDnode = NULL; SDnodeObj *pSelDnode = NULL; - float vnodeUsage = 1.0; + float vnodeUsage = 1000.0; while (1) { pNode = mgmtGetNextDnode(pNode, &pDnode); diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index a78fba22085afc671aab224a7d7e8afaa9b26ecf..3e7577af0696ef591217224d3e08403f16fd5097 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -36,7 +36,7 @@ #include "mgmtUser.h" #include "mgmtVgroup.h" -void * tsDbSdb = NULL; +static void * tsDbSdb = NULL; static int32_t tsDbUpdateSize; static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); @@ -82,7 +82,7 @@ static int32_t mgmtDbActionDelete(SSdbOper *pOper) { mgmtDropDbFromAcct(pAcct, pDb); mgmtDropAllChildTables(pDb); mgmtDropAllSuperTables(pDb); - mgmtDropAllVgroups(pDb); + mgmtDropAllDbVgroups(pDb); mgmtDecAcctRef(pAcct); return TSDB_CODE_SUCCESS; @@ -95,6 +95,7 @@ static int32_t mgmtDbActionUpdate(SSdbOper *pOper) { memcpy(pSaved, pDb, pOper->rowSize); free(pDb); } + mgmtDecDbRef(pSaved); return TSDB_CODE_SUCCESS; } @@ -154,6 +155,10 @@ int32_t mgmtInitDbs() { return 0; } +void *mgmtGetNextDb(void *pNode, SDbObj **pDb) { + return sdbFetchRow(tsDbSdb, pNode, (void **)pDb); +} + SDbObj *mgmtGetDb(char *db) { return (SDbObj *)sdbGetRow(tsDbSdb, db); } @@ -174,7 +179,7 @@ SDbObj *mgmtGetDbByTableId(char *tableId) { memset(db, 0, sizeof(db)); strncpy(db, tableId, pos - tableId); - return (SDbObj *)sdbGetRow(tsDbSdb, db); + return mgmtGetDb(db); } static int32_t mgmtCheckDbCfg(SDbCfg *pCfg) { @@ -346,8 +351,27 @@ bool mgmtCheckIsMonitorDB(char *db, char *monitordb) { return (strncasecmp(dbName, monitordb, len) == 0 && len == strlen(monitordb)); } +#if 0 +void mgmtPrintVgroups(SDbObj *pDb, char *oper) { + mPrint("db:%s, vgroup link from head, oper:%s", pDb->name, oper); + SVgObj *pVgroup = pDb->pHead; + while (pVgroup != NULL) { + mPrint("vgId:%d", pVgroup->vgId); + pVgroup = pVgroup->next; + } + + mPrint("db:%s, vgroup link from tail", pDb->name, pDb->numOfVgroups); + pVgroup = pDb->pTail; + while (pVgroup != NULL) { + mPrint("vgId:%d", pVgroup->vgId); + pVgroup = pVgroup->prev; + } +} +#endif + void mgmtAddVgroupIntoDb(SVgObj *pVgroup) { SDbObj *pDb = pVgroup->pDb; + pVgroup->next = pDb->pHead; pVgroup->prev = NULL; @@ -397,7 +421,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) int32_t cols = 0; SSchema *pSchema = pMeta->schema; - SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); + SUserObj *pUser = mgmtGetUserFromConn(pConn); if (pUser == NULL) return 0; pShow->bytes[cols] = TSDB_DB_NAME_LEN; @@ -545,11 +569,11 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * SDbObj *pDb = NULL; char * pWrite; int32_t cols = 0; - SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); + SUserObj *pUser = mgmtGetUserFromConn(pConn); if (pUser == NULL) return 0; while (numOfRows < rows) { - pShow->pNode = sdbFetchRow(tsDbSdb, pShow->pNode, (void **) &pDb); + pShow->pNode = mgmtGetNextDb(pShow->pNode, &pDb); if (pDb == NULL) break; cols = 0; @@ -674,8 +698,7 @@ static int32_t mgmtSetDbDropping(SDbObj *pDb) { SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsDbSdb, - .pObj = pDb, - .rowSize = tsDbUpdateSize + .pObj = pDb }; int32_t code = sdbUpdateRow(&oper); @@ -803,8 +826,7 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsDbSdb, - .pObj = pDb, - .rowSize = tsDbUpdateSize + .pObj = pDb }; int32_t code = sdbUpdateRow(&oper); @@ -839,21 +861,21 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { return; } - SDbObj *pDb = pMsg->pDb = mgmtGetDb(pAlter->db); - if (pDb == NULL) { + if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDb(pAlter->db); + if (pMsg->pDb == NULL) { mError("db:%s, failed to alter, invalid db", pAlter->db); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB); return; } - int32_t code = mgmtAlterDb(pDb, pAlter); + int32_t code = mgmtAlterDb(pMsg->pDb, pAlter); if (code != TSDB_CODE_SUCCESS) { mError("db:%s, failed to alter, invalid db option", pAlter->db); mgmtSendSimpleResp(pMsg->thandle, code); return; } - mTrace("db:%s, all vgroups is altered", pDb->name); + mTrace("db:%s, all vgroups is altered", pMsg->pDb->name); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); } @@ -884,8 +906,8 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { return; } - SDbObj *pDb = pMsg->pDb = mgmtGetDb(pDrop->db); - if (pDb == NULL) { + if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDb(pDrop->db); + if (pMsg->pDb == NULL) { if (pDrop->ignoreNotExists) { mTrace("db:%s, db is not exist, think drop success", pDrop->db); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); @@ -897,30 +919,32 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { } } - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + if (mgmtCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) { mError("db:%s, can't drop monitor database", pDrop->db); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); return; } - int32_t code = mgmtSetDbDropping(pDb); + int32_t code = mgmtSetDbDropping(pMsg->pDb); if (code != TSDB_CODE_SUCCESS) { mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code)); mgmtSendSimpleResp(pMsg->thandle, code); return; } - SVgObj *pVgroup = pDb->pHead; +#if 0 + SVgObj *pVgroup = pMsg->pDb->pHead; if (pVgroup != NULL) { - mPrint("vgroup:%d, will be dropped", pVgroup->vgId); + mPrint("vgId:%d, will be dropped", pVgroup->vgId); SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); newMsg->ahandle = pVgroup; newMsg->expected = pVgroup->numOfVnodes; mgmtDropVgroup(pVgroup, newMsg); return; } +#endif - mTrace("db:%s, all vgroups is dropped", pDb->name); + mTrace("db:%s, all vgroups is dropped", pMsg->pDb->name); mgmtDropDb(pMsg); } @@ -932,7 +956,7 @@ void mgmtDropAllDbs(SAcctObj *pAcct) { mPrint("acct:%s, all dbs will be dropped from sdb", pAcct->user); while (1) { - pNode = sdbFetchRow(tsDbSdb, pNode, (void **)&pDb); + pNode = mgmtGetNextDb(pNode, &pDb); if (pDb == NULL) break; if (pDb->pAcct == pAcct) { diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 97360e49ea6c081106cae7cf1a2f31f692c353c1..c7643b9bf906988bc2668f8c393a38c117b866c7 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -36,9 +36,9 @@ #include "mgmtUser.h" #include "mgmtVgroup.h" -void *tsDnodeSdb = NULL; -int32_t tsDnodeUpdateSize = 0; int32_t tsAccessSquence = 0; +static void *tsDnodeSdb = NULL; +static int32_t tsDnodeUpdateSize = 0; extern void * tsMnodeSdb; extern void * tsVgroupSdb; @@ -73,39 +73,12 @@ static int32_t mgmtDnodeActionInsert(SSdbOper *pOper) { static int32_t mgmtDnodeActionDelete(SSdbOper *pOper) { SDnodeObj *pDnode = pOper->pObj; - void * pNode = NULL; - void * pLastNode = NULL; - SVgObj * pVgroup = NULL; - int32_t numOfVgroups = 0; - - while (1) { - pLastNode = pNode; - pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup); - if (pVgroup == NULL) break; - - if (pVgroup->vnodeGid[0].dnodeId == pDnode->dnodeId) { - SSdbOper oper = { - .type = SDB_OPER_LOCAL, - .table = tsVgroupSdb, - .pObj = pVgroup, - }; - sdbDeleteRow(&oper); - pNode = pLastNode; - numOfVgroups++; - continue; - } - } - - SMnodeObj *pMnode = mgmtGetMnode(pDnode->dnodeId); - if (pMnode != NULL) { - SSdbOper oper = {.type = SDB_OPER_LOCAL, .table = tsMnodeSdb, .pObj = pMnode}; - sdbDeleteRow(&oper); - mgmtReleaseMnode(pMnode); - } - + + mgmtDropAllDnodeVgroups(pDnode); + mgmtDropMnodeLocal(pDnode->dnodeId); balanceNotify(); - mTrace("dnode:%d, all vgroups:%d is dropped from sdb", pDnode->dnodeId, numOfVgroups); + mTrace("dnode:%d, all vgroups is dropped from sdb", pDnode->dnodeId); return TSDB_CODE_SUCCESS; } @@ -116,6 +89,7 @@ static int32_t mgmtDnodeActionUpdate(SSdbOper *pOper) { memcpy(pSaved, pDnode, pOper->rowSize); free(pDnode); } + mgmtDecDnodeRef(pSaved); return TSDB_CODE_SUCCESS; } @@ -212,7 +186,7 @@ void *mgmtGetDnodeByIp(char *ep) { void * pNode = NULL; while (1) { - pNode = sdbFetchRow(tsDnodeSdb, pNode, (void**)&pDnode); + pNode = mgmtGetNextDnode(pNode, &pDnode); if (pDnode == NULL) break; if (strcmp(ep, pDnode->dnodeEp) == 0) { return pDnode; @@ -235,8 +209,7 @@ void mgmtUpdateDnode(SDnodeObj *pDnode) { SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsDnodeSdb, - .pObj = pDnode, - .rowSize = tsDnodeUpdateSize + .pObj = pDnode }; sdbUpdateRow(&oper); @@ -336,7 +309,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { SVgObj *pVgroup = mgmtGetVgroup(pVload->vgId); if (pVgroup == NULL) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->dnodeEp); - mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pVload->vgId); + mPrint("dnode:%d, vgId:%d not exist in mnode, drop it", pDnode->dnodeId, pVload->vgId); mgmtSendDropVnodeMsg(pVload->vgId, &ipSet, NULL); } else { mgmtUpdateVgroupStatus(pVgroup, pDnode, pVload); @@ -387,6 +360,7 @@ static int32_t mgmtCreateDnode(char *ep) { SDnodeObj *pDnode = mgmtGetDnodeByIp(ep); if (pDnode != NULL) { + mgmtDecDnodeRef(pDnode); mError("dnode:%d is alredy exist, %s:%d", pDnode->dnodeId, pDnode->dnodeFqdn, pDnode->dnodePort); return TSDB_CODE_DNODE_ALREADY_EXIST; } @@ -440,6 +414,7 @@ static int32_t mgmtDropDnodeByIp(char *ep) { return TSDB_CODE_DNODE_NOT_EXIST; } + mgmtDecDnodeRef(pDnode); if (strcmp(pDnode->dnodeEp, dnodeGetMnodeMasterEp()) == 0) { mError("dnode:%d, can't drop dnode:%s which is master", pDnode->dnodeId, ep); return TSDB_CODE_NO_REMOVE_MASTER; @@ -464,6 +439,7 @@ static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg) { if (rpcRsp.code == TSDB_CODE_SUCCESS) { SDnodeObj *pDnode = mgmtGetDnodeByIp(pCreate->ep); mLPrint("dnode:%d, %s is created by %s", pDnode->dnodeId, pCreate->ep, pMsg->pUser->user); + mgmtDecDnodeRef(pDnode); } else { mError("failed to create dnode:%s, reason:%s", pCreate->ep, tstrerror(rpcRsp.code)); } @@ -492,7 +468,7 @@ static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg) { } static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { - SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); + SUserObj *pUser = mgmtGetUserFromConn(pConn); if (pUser == NULL) return 0; if (strcmp(pUser->pAcct->user, "root") != 0) { @@ -609,7 +585,7 @@ static bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { int32_t cols = 0; - SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); + SUserObj *pUser = mgmtGetUserFromConn(pConn); if (pUser == NULL) return 0; if (strcmp(pUser->user, "root") != 0) { @@ -719,7 +695,7 @@ static bool mgmtCheckConfigShow(SGlobalCfg *cfg) { static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { int32_t cols = 0; - SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); + SUserObj *pUser = mgmtGetUserFromConn(pConn); if (pUser == NULL) return 0; if (strcmp(pUser->user, "root") != 0) { @@ -806,7 +782,7 @@ static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, vo static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { int32_t cols = 0; - SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); + SUserObj *pUser = mgmtGetUserFromConn(pConn); if (pUser == NULL) return 0; if (strcmp(pUser->user, "root") != 0) { diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index ccc01fc304c5d1c4d8ca4b2924c6442c85c45924..940fdca4d8bcf8e735a12c73ea9b534578ff697e 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -30,7 +30,7 @@ #include "mgmtShell.h" #include "mgmtUser.h" -void * tsMnodeSdb = NULL; +static void * tsMnodeSdb = NULL; static int32_t tsMnodeUpdateSize = 0; static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -71,7 +71,7 @@ static int32_t mgmtMnodeActionUpdate(SSdbOper *pOper) { memcpy(pSaved, pMnode, pOper->rowSize); free(pMnode); } - + mgmtDecMnodeRef(pSaved); return TSDB_CODE_SUCCESS; } @@ -97,7 +97,7 @@ static int32_t mgmtMnodeActionRestored() { mgmtGetNextMnode(NULL, &pMnode); if (pMnode != NULL) { pMnode->role = TAOS_SYNC_ROLE_MASTER; - mgmtReleaseMnode(pMnode); + mgmtDecMnodeRef(pMnode); } } return TSDB_CODE_SUCCESS; @@ -148,7 +148,11 @@ void *mgmtGetMnode(int32_t mnodeId) { return sdbGetRow(tsMnodeSdb, &mnodeId); } -void mgmtReleaseMnode(SMnodeObj *pMnode) { +void mgmtIncMnodeRef(SMnodeObj *pMnode) { + sdbIncRef(tsMnodeSdb, pMnode); +} + +void mgmtDecMnodeRef(SMnodeObj *pMnode) { sdbDecRef(tsMnodeSdb, pMnode); } @@ -187,7 +191,7 @@ void mgmtGetMnodeIpSet(SRpcIpSet *ipSet) { ipSet->numOfIps++; - mgmtReleaseMnode(pMnode); + mgmtDecMnodeRef(pMnode); } } @@ -209,7 +213,7 @@ void mgmtGetMnodeInfos(void *param) { } index++; - mgmtReleaseMnode(pMnode); + mgmtDecMnodeRef(pMnode); } mnodes->nodeNum = index; @@ -235,8 +239,17 @@ int32_t mgmtAddMnode(int32_t dnodeId) { return code; } +void mgmtDropMnodeLocal(int32_t dnodeId) { + SMnodeObj *pMnode = mgmtGetMnode(dnodeId); + if (pMnode != NULL) { + SSdbOper oper = {.type = SDB_OPER_LOCAL, .table = tsMnodeSdb, .pObj = pMnode}; + sdbDeleteRow(&oper); + mgmtDecMnodeRef(pMnode); + } +} + int32_t mgmtDropMnode(int32_t dnodeId) { - SMnodeObj *pMnode = sdbGetRow(tsMnodeSdb, &dnodeId); + SMnodeObj *pMnode = mgmtGetMnode(dnodeId); if (pMnode == NULL) { return TSDB_CODE_DNODE_NOT_EXIST; } @@ -258,7 +271,7 @@ int32_t mgmtDropMnode(int32_t dnodeId) { static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { sdbUpdateMnodeRoles(); - SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); + SUserObj *pUser = mgmtGetUserFromConn(pConn); if (pUser == NULL) return 0; if (strcmp(pUser->pAcct->user, "root") != 0) { @@ -339,7 +352,7 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi numOfRows++; - mgmtReleaseMnode(pMnode); + mgmtDecMnodeRef(pMnode); } pShow->numOfReads += numOfRows; diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index 754be9a1d0c9614d3a9e0c53cfbd001d0fa67326..77871f037bcd5e39993febc83082c728e3b773fc 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -675,7 +675,7 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; rpcSendResponse(&rpcRsp); @@ -699,7 +699,7 @@ void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) { void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; rpcSendResponse(&rpcRsp); @@ -723,7 +723,7 @@ void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) { void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL); + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; rpcSendResponse(&rpcRsp); diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 26afc1c6f0696e8082da65fd5438833034c32e87..53b9d2b814630bba44185d8f7b01e6a29a24b306 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -184,7 +184,7 @@ void sdbUpdateMnodeRoles() { if (pMnode != NULL) { pMnode->role = roles.role[i]; sdbPrint("mnode:%d, role:%s", pMnode->mnodeId, mgmtGetMnodeRoleStr(pMnode->role)); - mgmtReleaseMnode(pMnode); + mgmtDecMnodeRef(pMnode); } } } @@ -252,7 +252,7 @@ void sdbUpdateSync() { strcpy(syncCfg.nodeInfo[index].nodeFqdn, pMnode->pDnode->dnodeEp); index++; - mgmtReleaseMnode(pMnode); + mgmtDecMnodeRef(pMnode); } } diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index fdf2ea695329ae425460a2f4a76dee698a90bc5e..193521b0260c42213f3d07578fa129a33c206f65 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -421,6 +421,7 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) { code = TSDB_CODE_INVALID_DB; goto connect_over; } + mgmtDecDbRef(pDb); } SCMConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp)); @@ -454,9 +455,8 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) { SCMUseDbMsg *pUseDbMsg = pMsg->pCont; // todo check for priority of current user - pMsg->pDb = mgmtGetDb(pUseDbMsg->db); - int32_t code = TSDB_CODE_SUCCESS; + if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDb(pUseDbMsg->db); if (pMsg->pDb == NULL) { code = TSDB_CODE_INVALID_DB; } @@ -470,7 +470,7 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) { */ static bool mgmtCheckTableMetaMsgReadOnly(SQueuedMsg *pMsg) { SCMTableInfoMsg *pInfo = pMsg->pCont; - pMsg->pTable = mgmtGetTable(pInfo->tableId); + if (pMsg->pTable == NULL) pMsg->pTable = mgmtGetTable(pInfo->tableId); if (pMsg->pTable != NULL) return true; // If table does not exists and autoCreate flag is set, we add the handler into task queue @@ -551,8 +551,7 @@ void mgmtFreeQhandle(void *qhandle, bool forceRemove) { } void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) { - bool usePublicIp = false; - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp); + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle); if (pUser == NULL) { return NULL; } @@ -563,7 +562,6 @@ void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) { pMsg->contLen = rpcMsg->contLen; pMsg->pCont = rpcMsg->pCont; pMsg->pUser = pUser; - pMsg->usePublicIp = usePublicIp; return pMsg; } @@ -591,8 +589,7 @@ void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) { pDestMsg->retry = pSrcMsg->retry; pDestMsg->maxRetry= pSrcMsg->maxRetry; pDestMsg->pUser = pSrcMsg->pUser; - pDestMsg->usePublicIp = pSrcMsg->usePublicIp; - + pSrcMsg->pCont = NULL; pSrcMsg->pUser = NULL; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index bfbe497b4be19b8c8de700f4086fdb734b1916a1..a1d260d1cf1d86d2e92ebefe8ce5dd331c9e9d1f 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -41,8 +41,8 @@ #include "mgmtVgroup.h" #include "tcompare.h" -void * tsChildTableSdb; -void * tsSuperTableSdb; +static void * tsChildTableSdb; +static void * tsSuperTableSdb; static int32_t tsChildTableUpdateSize; static int32_t tsSuperTableUpdateSize; static void * mgmtGetChildTable(char *tableId); @@ -97,14 +97,14 @@ static int32_t mgmtChildTableActionInsert(SSdbOper *pOper) { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("ctable:%s, not in vgroup:%d", pTable->info.tableId, pTable->vgId); + mError("ctable:%s, not in vgId:%d", pTable->info.tableId, pTable->vgId); return TSDB_CODE_INVALID_VGROUP_ID; } mgmtDecVgroupRef(pVgroup); SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { - mError("ctable:%s, vgroup:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); + mError("ctable:%s, vgId:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); return TSDB_CODE_INVALID_DB; } mgmtDecDbRef(pDb); @@ -117,6 +117,7 @@ static int32_t mgmtChildTableActionInsert(SSdbOper *pOper) { mgmtDecAcctRef(pAcct); if (pTable->info.type == TSDB_CHILD_TABLE) { + // add ref pTable->superTable = mgmtGetSuperTable(pTable->superTableId); mgmtAddTableIntoStable(pTable->superTable, pTable); grantAdd(TSDB_GRANT_TIMESERIES, pTable->superTable->numOfColumns - 1); @@ -146,7 +147,7 @@ static int32_t mgmtChildTableActionDelete(SSdbOper *pOper) { SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { - mError("ctable:%s, vgroup:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); + mError("ctable:%s, vgId:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName); return TSDB_CODE_INVALID_DB; } mgmtDecDbRef(pDb); @@ -186,6 +187,7 @@ static int32_t mgmtChildTableActionUpdate(SSdbOper *pOper) { free(oldSql); free(oldSchema); } + mgmtDecTableRef(pTable); return TSDB_CODE_SUCCESS; } @@ -250,7 +252,7 @@ static int32_t mgmtChildTableActionRestored() { while (1) { pLastNode = pNode; mgmtDecTableRef(pTable); - pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); + pNode = mgmtGetNextChildTable(pNode, &pTable); if (pTable == NULL) break; SDbObj *pDb = mgmtGetDbByTableId(pTable->info.tableId); @@ -268,7 +270,7 @@ static int32_t mgmtChildTableActionRestored() { SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); if (pVgroup == NULL) { - mError("ctable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->sid); + mError("ctable:%s, failed to get vgId:%d sid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->sid); pTable->vgId = 0; SSdbOper desc = {0}; desc.type = SDB_OPER_LOCAL; @@ -281,7 +283,7 @@ static int32_t mgmtChildTableActionRestored() { mgmtDecVgroupRef(pVgroup); if (strcmp(pVgroup->dbName, pDb->name) != 0) { - mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it", + mError("ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it", pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid); pTable->vgId = 0; SSdbOper desc = {0}; @@ -294,7 +296,7 @@ static int32_t mgmtChildTableActionRestored() { } if (pVgroup->tableList == NULL) { - mError("ctable:%s, vgroup:%d tableList is null", pTable->info.tableId, pTable->vgId); + mError("ctable:%s, vgId:%d tableList is null", pTable->info.tableId, pTable->vgId); pTable->vgId = 0; SSdbOper desc = {0}; desc.type = SDB_OPER_LOCAL; @@ -435,7 +437,7 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) { free(pNew); free(oldSchema); } - + mgmtDecTableRef(pTable); return TSDB_CODE_SUCCESS; } @@ -558,20 +560,28 @@ static void *mgmtGetSuperTable(char *tableId) { return sdbGetRow(tsSuperTableSdb, tableId); } -STableObj *mgmtGetTable(char *tableId) { - STableObj *tableInfo = sdbGetRow(tsSuperTableSdb, tableId); - if (tableInfo != NULL) { - return tableInfo; +void *mgmtGetTable(char *tableId) { + void *pTable = mgmtGetSuperTable(tableId); + if (pTable != NULL) { + return pTable; } - tableInfo = sdbGetRow(tsChildTableSdb, tableId); - if (tableInfo != NULL) { - return tableInfo; + pTable = mgmtGetChildTable(tableId); + if (pTable != NULL) { + return pTable; } return NULL; } +void *mgmtGetNextChildTable(void *pNode, SChildTableObj **pTable) { + return sdbFetchRow(tsChildTableSdb, pNode, (void **)pTable); +} + +void *mgmtGetNextSuperTable(void *pNode, SSuperTableObj **pTable) { + return sdbFetchRow(tsSuperTableSdb, pNode, (void **)pTable); +} + void mgmtIncTableRef(void *p1) { STableObj *pTable = (STableObj *)p1; if (pTable->type == TSDB_SUPER_TABLE) { @@ -787,8 +797,6 @@ static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { mgmtDecVgroupRef(pVgroup); } } - //mError("stable:%s, numOfTables:%d not 0", pStable->info.tableId, pStable->numOfTables); - //mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS); } else { SSdbOper oper = { .type = SDB_OPER_GLOBAL, @@ -846,8 +854,7 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, - .pObj = pStable, - .rowSize = tsSuperTableUpdateSize + .pObj = pStable }; int32_t code = sdbUpdateRow(&oper); @@ -874,8 +881,7 @@ static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, - .pObj = pStable, - .rowSize = tsSuperTableUpdateSize + .pObj = pStable }; int32_t code = sdbUpdateRow(&oper); @@ -911,8 +917,7 @@ static int32_t mgmtModifySuperTableTagName(SSuperTableObj *pStable, char *oldTag SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, - .pObj = pStable, - .rowSize = tsSuperTableUpdateSize + .pObj = pStable }; int32_t code = sdbUpdateRow(&oper); @@ -977,8 +982,7 @@ static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSc SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, - .pObj = pStable, - .rowSize = tsSuperTableUpdateSize + .pObj = pStable }; int32_t code = sdbUpdateRow(&oper); @@ -1015,8 +1019,7 @@ static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, ch SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsSuperTableSdb, - .pObj = pStable, - .rowSize = tsSuperTableUpdateSize + .pObj = pStable }; int32_t code = sdbUpdateRow(&oper); @@ -1099,7 +1102,8 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v char stableName[TSDB_TABLE_NAME_LEN] = {0}; while (numOfRows < rows) { - pShow->pNode = sdbFetchRow(tsSuperTableSdb, pShow->pNode, (void **) &pTable); + mgmtDecTableRef(pTable); + pShow->pNode = mgmtGetNextSuperTable(pShow->pNode, &pTable); if (pTable == NULL) break; if (strncmp(pTable->info.tableId, prefix, prefixLen)) { continue; @@ -1135,8 +1139,6 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v cols++; numOfRows++; - mgmtDecTableRef(pTable); - } pShow->numOfReads += numOfRows; @@ -1155,7 +1157,8 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) { mPrint("db:%s, all super tables will be dropped from sdb", pDropDb->name); while (1) { - pNode = sdbFetchRow(tsSuperTableSdb, pNode, (void **)&pTable); + pLastNode = pNode; + pNode = mgmtGetNextSuperTable(pNode, &pTable); if (pTable == NULL) break; if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) { @@ -1213,14 +1216,13 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) { static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { SCMSTableVgroupMsg *pInfo = pMsg->pCont; - SSuperTableObj *pTable = mgmtGetSuperTable(pInfo->tableId); - - pMsg->pTable = (STableObj *)pTable; + if (pMsg->pTable == NULL) pMsg->pTable = mgmtGetSuperTable(pInfo->tableId); if (pMsg->pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); return; } + SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + sizeof(SCMVgroupInfo) * pTable->vgLen; SCMSTableVgroupRspMsg *pRsp = rpcMallocCont(contLen); if (pRsp == NULL) { @@ -1430,17 +1432,21 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { } int32_t sid = taosAllocateId(pVgroup->idPool); - if (sid < 0) { - mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId); + if (sid <= 0) { + mTrace("tables:%s, no enough sid in vgId:%d", pCreate->tableId, pVgroup->vgId); mgmtCreateVgroup(mgmtCloneQueuedMsg(pMsg), pMsg->pDb); return; } if (pMsg->retry == 0) { - pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid); + if (pMsg->pTable == NULL) { + pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid); + mgmtIncTableRef(pMsg->pTable); + } } else { - pMsg->pTable = mgmtGetTable(pCreate->tableId); + if (pMsg->pTable == NULL) pMsg->pTable = mgmtGetTable(pCreate->tableId); } + if (pMsg->pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; @@ -1456,7 +1462,6 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); newMsg->ahandle = pMsg->pTable; newMsg->maxRetry = 5; - mgmtIncTableRef(pMsg->pTable); SRpcMsg rpcMsg = { .handle = newMsg, .pCont = pMDCreate, @@ -1470,8 +1475,8 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; - SVgObj *pVgroup = pMsg->pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { + if (pMsg->pVgroup == NULL) pMsg->pVgroup = mgmtGetVgroup(pTable->vgId); + if (pMsg->pVgroup == NULL) { mError("table:%s, failed to drop ctable, vgroup not exist", pTable->info.tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS); return; @@ -1490,7 +1495,7 @@ static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { pDrop->sid = htonl(pTable->sid); pDrop->uid = htobe64(pTable->uid); - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pMsg->pVgroup); mTrace("table:%s, send drop ctable msg", pDrop->tableId); SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); @@ -1556,8 +1561,7 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, - .pObj = pTable, - .rowSize = tsChildTableUpdateSize + .pObj = pTable }; int32_t code = sdbUpdateRow(&oper); @@ -1589,8 +1593,7 @@ static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, ch SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsChildTableSdb, - .pObj = pTable, - .rowSize = tsChildTableUpdateSize + .pObj = pTable }; int32_t code = sdbUpdateRow(&oper); @@ -1638,21 +1641,21 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) { pMeta->contLen = sizeof(STableMetaMsg) + mgmtSetSchemaFromNormalTable(pMeta->schema, pTable); } - SVgObj *pVgroup = pMsg->pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { + if (pMsg->pVgroup == NULL) pMsg->pVgroup = mgmtGetVgroup(pTable->vgId); + if (pMsg->pVgroup == NULL) { mError("table:%s, failed to get table meta, db not selected", pTable->info.tableId); return TSDB_CODE_INVALID_VGROUP_ID; } - for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId); + for (int32_t i = 0; i < pMsg->pVgroup->numOfVnodes; ++i) { + SDnodeObj *pDnode = mgmtGetDnode(pMsg->pVgroup->vnodeGid[i].dnodeId); if (pDnode == NULL) break; strcpy(pMeta->vgroup.ipAddr[i].fqdn, pDnode->dnodeFqdn); pMeta->vgroup.ipAddr[i].port = htons(pDnode->dnodePort + TSDB_PORT_DNODESHELL); pMeta->vgroup.numOfIps++; mgmtDecDnodeRef(pDnode); } - pMeta->vgroup.vgId = htonl(pVgroup->vgId); + pMeta->vgroup.vgId = htonl(pMsg->pVgroup->vgId); mTrace("table:%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid); @@ -1714,7 +1717,8 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) { mPrint("db:%s, all child tables will be dropped from sdb", pDropDb->name); while (1) { - pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); + pLastNode = pNode; + pNode = mgmtGetNextChildTable(pNode, &pTable); if (pTable == NULL) break; if (strncmp(pDropDb->name, pTable->info.tableId, dbNameLen) == 0) { @@ -1742,7 +1746,8 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) { mPrint("stable:%s, all child tables will dropped from sdb", pStable->info.tableId, numOfTables); while (1) { - pNode = sdbFetchRow(tsChildTableSdb, pNode, (void **)&pTable); + pLastNode = pNode; + pNode = mgmtGetNextChildTable(pNode, &pTable); if (pTable == NULL) break; if (pTable->superTable == pStable) { @@ -1762,16 +1767,13 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) { mPrint("stable:%s, all child tables:%d is dropped from sdb", pStable->info.tableId, numOfTables); } -static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_t sid) { - SDnodeObj *pObj = mgmtGetDnode(dnodeId); +static SChildTableObj* mgmtGetTableByPos(int32_t vnode, int32_t sid) { SVgObj *pVgroup = mgmtGetVgroup(vnode); + if (pVgroup == NULL) return NULL; - if (pObj == NULL || pVgroup == NULL) { - return NULL; - } - - SChildTableObj *pTable = pVgroup->tableList[sid]; + SChildTableObj *pTable = pVgroup->tableList[sid - 1]; mgmtIncTableRef((STableObj *)pTable); + mgmtDecVgroupRef(pVgroup); return pTable; } @@ -1783,7 +1785,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) { pCfg->sid = htonl(pCfg->sid); mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); - SChildTableObj *pTable = mgmtGetTableByPos(pCfg->dnode, pCfg->vnode, pCfg->sid); + SChildTableObj *pTable = mgmtGetTableByPos(pCfg->vnode, pCfg->sid); if (pTable == NULL) { mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_TABLE); @@ -1798,6 +1800,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) { mgmtDecTableRef(pTable); return; } + SDnodeObj *pDnode = mgmtGetDnode(pCfg->dnode); SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->dnodeEp); SRpcMsg rpcRsp = { @@ -1808,7 +1811,9 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) { .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE }; mgmtSendMsgToDnode(&ipSet, &rpcRsp); + mgmtDecTableRef(pTable); + mgmtDecDnodeRef(pDnode); } // handle drop child response @@ -1829,8 +1834,8 @@ static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) { return; } - SVgObj *pVgroup = queueMsg->pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { + if (queueMsg->pVgroup == NULL) queueMsg->pVgroup = mgmtGetVgroup(pTable->vgId); + if (queueMsg->pVgroup == NULL) { mError("table:%s, failed to get vgroup", pTable->info.tableId); mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID); return; @@ -1849,9 +1854,9 @@ static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) { return; } - if (pVgroup->numOfTables <= 0) { - mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId); - mgmtDropVgroup(pVgroup, NULL); + if (queueMsg->pVgroup->numOfTables <= 0) { + mPrint("vgId:%d, all tables is dropped, drop vgroup", queueMsg->pVgroup->vgId); + mgmtDropVgroup(queueMsg->pVgroup, NULL); } mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS); @@ -1928,8 +1933,8 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) { SChildTableObj *pTable = mgmtGetChildTable(tableId); if (pTable == NULL) continue; - SDbObj *pDb = mgmtGetDbByTableId(tableId); - if (pDb == NULL) continue; + if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDbByTableId(tableId); + if (pMsg->pDb == NULL) continue; int availLen = totalMallocLen - pMultiMeta->contLen; if (availLen <= sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS) { @@ -2028,7 +2033,8 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, int32_t prefixLen = strlen(prefix); while (numOfRows < rows) { - pShow->pNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable); + mgmtDecTableRef(pTable); + pShow->pNode = mgmtGetNextChildTable(pShow->pNode, &pTable); if (pTable == NULL) break; // not belong to current db @@ -2072,7 +2078,6 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, cols++; numOfRows++; - mgmtDecTableRef(pTable); } pShow->numOfReads += numOfRows; @@ -2088,7 +2093,7 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { SCMAlterTableMsg *pAlter = pMsg->pCont; mTrace("table:%s, alter table msg is received from thandle:%p", pAlter->tableId, pMsg->thandle); - pMsg->pDb = mgmtGetDbByTableId(pAlter->tableId); + if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDbByTableId(pAlter->tableId); if (pMsg->pDb == NULL || pMsg->pDb->status != TSDB_DB_STATUS_READY) { mError("table:%s, failed to alter table, db not selected", pAlter->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); @@ -2101,7 +2106,7 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { return; } - pMsg->pTable = mgmtGetTable(pAlter->tableId); + if (pMsg->pTable == NULL) pMsg->pTable = mgmtGetTable(pAlter->tableId); if (pMsg->pTable == NULL) { mError("table:%s, failed to alter table, table not exist", pMsg->pTable->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 708f436d0a75b8295128a1bed968f67f81cbf8a7..787459d6675f929d803669aabab07cf0b7d627f8 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -168,8 +168,7 @@ static int32_t mgmtUpdateUser(SUserObj *pUser) { SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsUserSdb, - .pObj = pUser, - .rowSize = tsUserUpdateSize + .pObj = pUser }; int32_t code = sdbUpdateRow(&oper); @@ -249,7 +248,7 @@ static int32_t mgmtDropUser(SUserObj *pUser) { } static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { - SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); + SUserObj *pUser = mgmtGetUserFromConn(pConn); if (pUser == NULL) { return TSDB_CODE_NO_USER_FROM_CONN; } @@ -298,7 +297,7 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void char *pWrite; while (numOfRows < rows) { - pShow->pNode = sdbFetchRow(tsUserSdb, pShow->pNode, (void **) &pUser); + pShow->pNode = mgmtGetNextUser(pShow->pNode, &pUser); if (pUser == NULL) break; cols = 0; @@ -329,12 +328,9 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void return numOfRows; } -SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp) { +SUserObj *mgmtGetUserFromConn(void *pConn) { SRpcConnInfo connInfo; if (rpcGetConnInfo(pConn, &connInfo) == 0) { - if (usePublicIp) { - *usePublicIp = (connInfo.serverIp == tsPublicIpInt); - } return mgmtGetUser(connInfo.user); } else { mError("can not get user from conn:%p", pConn); @@ -510,7 +506,7 @@ void mgmtDropAllUsers(SAcctObj *pAcct) { while (1) { pLastNode = pNode; - pNode = sdbFetchRow(tsUserSdb, pNode, (void **)&pUser); + pNode = mgmtGetNextUser(pNode, &pUser); if (pUser == NULL) break; if (strncmp(pUser->acct, pAcct->user, acctNameLen) == 0) { diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 3143e6065fe8e27c93c6397c70a8b62a91d8527a..839dce5c38a65528e1ea3c0f8c691738476bce45 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -36,8 +36,8 @@ #include "mgmtTable.h" #include "mgmtVgroup.h" -void *tsVgroupSdb = NULL; -int32_t tsVgUpdateSize = 0; +static void *tsVgroupSdb = NULL; +static int32_t tsVgUpdateSize = 0; static int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -62,6 +62,8 @@ static int32_t mgmtVgroupActionDestroy(SSdbOper *pOper) { static int32_t mgmtVgroupActionInsert(SSdbOper *pOper) { SVgObj *pVgroup = pOper->pObj; + + // refer to db SDbObj *pDb = mgmtGetDb(pVgroup->dbName); if (pDb == NULL) { return TSDB_CODE_INVALID_DB; @@ -74,13 +76,13 @@ static int32_t mgmtVgroupActionInsert(SSdbOper *pOper) { int32_t size = sizeof(SChildTableObj *) * pDb->cfg.maxTables; pVgroup->tableList = calloc(pDb->cfg.maxTables, sizeof(SChildTableObj *)); if (pVgroup->tableList == NULL) { - mError("vgroup:%d, failed to malloc(size:%d) for the tableList of vgroups", pVgroup->vgId, size); + mError("vgId:%d, failed to malloc(size:%d) for the tableList of vgroups", pVgroup->vgId, size); return -1; } pVgroup->idPool = taosInitIdPool(pDb->cfg.maxTables); if (pVgroup->idPool == NULL) { - mError("vgroup:%d, failed to taosInitIdPool for vgroups", pVgroup->vgId); + mError("vgId:%d, failed to taosInitIdPool for vgroups", pVgroup->vgId); tfree(pVgroup->tableList); return -1; } @@ -101,7 +103,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOper *pOper) { static int32_t mgmtVgroupActionDelete(SSdbOper *pOper) { SVgObj *pVgroup = pOper->pObj; - + if (pVgroup->pDb != NULL) { mgmtRemoveVgroupFromDb(pVgroup); } @@ -140,6 +142,7 @@ static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) { if (pDnode != NULL) { atomic_add_fetch_32(&pDnode->openVnodes, 1); } + mgmtDecDnodeRef(pDnode); } } @@ -147,14 +150,15 @@ static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) { SDbObj *pDb = pVgroup->pDb; if (pDb != NULL) { if (pDb->cfg.maxTables != oldTables) { - mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxTables); + mPrint("vgId:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxTables); taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxTables); int32_t size = sizeof(SChildTableObj *) * pDb->cfg.maxTables; pVgroup->tableList = (SChildTableObj **)realloc(pVgroup->tableList, size); } } - mTrace("vgroup:%d, is updated, tables:%d numOfVnode:%d", pVgroup->vgId, pDb->cfg.maxTables, pVgroup->numOfVnodes); + mgmtDecVgroupRef(pVgroup); + mTrace("vgId:%d, is updated, tables:%d numOfVnode:%d", pVgroup->vgId, pDb->cfg.maxTables, pVgroup->numOfVnodes); return TSDB_CODE_SUCCESS; } @@ -237,8 +241,7 @@ void mgmtUpdateVgroup(SVgObj *pVgroup) { SSdbOper oper = { .type = SDB_OPER_GLOBAL, .table = tsVgroupSdb, - .pObj = pVgroup, - .rowSize = tsVgUpdateSize + .pObj = pVgroup }; sdbUpdateRow(&oper); @@ -261,7 +264,7 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo if (!dnodeExist) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->dnodeEp); - mError("vgroup:%d, dnode:%d not exist in mnode, drop it", pVload->vgId, pDnode->dnodeId); + mError("vgId:%d, dnode:%d not exist in mnode, drop it", pVload->vgId, pDnode->dnodeId); mgmtSendDropVnodeMsg(pVload->vgId, &ipSet, NULL); return; } @@ -273,7 +276,7 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo } if (pVload->cfgVersion != pVgroup->pDb->cfgVersion || pVload->replica != pVgroup->numOfVnodes) { - mError("dnode:%d, vgroup:%d, vnode cfgVersion:%d repica:%d not match with mgmt cfgVersion:%d replica:%d", + mError("dnode:%d, vgId:%d, vnode cfgVersion:%d repica:%d not match with mgmt cfgVersion:%d replica:%d", pDnode->dnodeId, pVload->vgId, pVload->cfgVersion, pVload->replica, pVgroup->pDb->cfgVersion, pVgroup->numOfVnodes); mgmtSendCreateVgroupMsg(pVgroup, NULL); @@ -317,9 +320,9 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) { return; } - mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); + mPrint("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - mPrint("vgroup:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId); + mPrint("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId); } pMsg->ahandle = pVgroup; @@ -331,7 +334,7 @@ void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) { if (ahandle != NULL) { mgmtSendDropVgroupMsg(pVgroup, ahandle); } else { - mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); + mTrace("vgId:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes); mgmtSendDropVgroupMsg(pVgroup, NULL); SSdbOper oper = { .type = SDB_OPER_GLOBAL, @@ -379,6 +382,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { if (pShow->payloadLen > 0 ) { pTable = mgmtGetTable(pShow->payload); if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) { + mgmtDecTableRef(pTable); return TSDB_CODE_INVALID_TABLE_ID; } mgmtDecTableRef(pTable); @@ -505,25 +509,28 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo } void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { - if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] == NULL) { - pVgroup->tableList[pTable->sid] = pTable; + if (pTable->sid >= 1 && pVgroup->tableList[pTable->sid - 1] == NULL) { + pVgroup->tableList[pTable->sid - 1] = pTable; taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid); pVgroup->numOfTables++; } - if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxTables) - mgmtAddVgroupIntoDbTail(pVgroup); + if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxTables) { + mgmtMoveVgroupToTail(pVgroup); + } + + mgmtIncVgroupRef(pVgroup); } void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) { - if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] != NULL) { - pVgroup->tableList[pTable->sid] = NULL; + if (pTable->sid >= 1 && pVgroup->tableList[pTable->sid - 1] != NULL) { + pVgroup->tableList[pTable->sid - 1] = NULL; taosFreeId(pVgroup->idPool, pTable->sid); pVgroup->numOfTables--; } - if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxTables) - mgmtAddVgroupIntoDbTail(pVgroup); + mgmtMoveVgroupToHead(pVgroup); + mgmtDecVgroupRef(pVgroup); } SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) { @@ -588,7 +595,7 @@ SRpcIpSet mgmtGetIpSetFromIp(char *ep) { } void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { - mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, pVgroup->vgId, ahandle); + mTrace("vgId:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, pVgroup->vgId, ahandle); SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup); SRpcMsg rpcMsg = { .handle = ahandle, @@ -601,7 +608,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { } void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { - mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); + mTrace("vgId:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); mgmtSendCreateVnodeMsg(pVgroup, &ipSet, ahandle); @@ -619,7 +626,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { } SVgObj *pVgroup = queueMsg->ahandle; - mTrace("vgroup:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p", + mTrace("vgId:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p", pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected, queueMsg->thandle, rpcMsg->handle); @@ -654,7 +661,7 @@ static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(int32_t vgId) { } void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) { - mTrace("vgroup:%d, send drop vnode msg, ahandle:%p", vgId, ahandle); + mTrace("vgId:%d, send drop vnode msg, ahandle:%p", vgId, ahandle); SMDDropVnodeMsg *pDrop = mgmtBuildDropVnodeMsg(vgId); SRpcMsg rpcMsg = { .handle = ahandle, @@ -667,7 +674,7 @@ void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) { } static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { - mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); + mTrace("vgId:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp); mgmtSendDropVnodeMsg(pVgroup->vgId, &ipSet, ahandle); @@ -675,7 +682,7 @@ static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { } static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { - mTrace("drop vnode rsp is received"); + mTrace("drop vnode rsp is received, handle:%p", rpcMsg->handle); if (rpcMsg->handle == NULL) return; SQueuedMsg *queueMsg = rpcMsg->handle; @@ -686,7 +693,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { } SVgObj *pVgroup = queueMsg->ahandle; - mTrace("vgroup:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p", + mTrace("vgId:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p", pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected, queueMsg->thandle, rpcMsg->handle); @@ -736,7 +743,33 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) { mgmtSendCreateVnodeMsg(pVgroup, &ipSet, NULL); } -void mgmtDropAllVgroups(SDbObj *pDropDb) { +void mgmtDropAllDnodeVgroups(SDnodeObj *pDropDnode) { + void * pNode = NULL; + void * pLastNode = NULL; + SVgObj *pVgroup = NULL; + int32_t numOfVgroups = 0; + + while (1) { + pLastNode = pNode; + pNode = mgmtGetNextVgroup(pNode, &pVgroup); + if (pVgroup == NULL) break; + + if (pVgroup->vnodeGid[0].dnodeId == pDropDnode->dnodeId) { + SSdbOper oper = { + .type = SDB_OPER_LOCAL, + .table = tsVgroupSdb, + .pObj = pVgroup, + }; + sdbDeleteRow(&oper); + pNode = pLastNode; + numOfVgroups++; + continue; + } + mgmtDecVgroupRef(pVgroup); + } +} + +void mgmtDropAllDbVgroups(SDbObj *pDropDb) { void *pNode = NULL; void *pLastNode = NULL; int32_t numOfVgroups = 0; @@ -744,7 +777,8 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) { mPrint("db:%s, all vgroups will be dropped from sdb", pDropDb->name); while (1) { - pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup); + pLastNode = pNode; + pNode = mgmtGetNextVgroup(pNode, &pVgroup); if (pVgroup == NULL) break; if (pVgroup->pDb == pDropDb) {