From 75b157602402c4ee07c8defbfe834ada3baa31f4 Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 3 Apr 2020 21:04:42 +0800 Subject: [PATCH] [TD-93] add ref to sdb --- src/inc/mnode.h | 6 +- src/mnode/CMakeLists.txt | 4 +- src/mnode/inc/mgmtProfile.h | 2 + src/mnode/inc/mgmtTable.h | 4 +- src/mnode/inc/mgmtVgroup.h | 4 +- src/mnode/src/mgmtAcct.c | 41 +----- src/mnode/src/mgmtDb.c | 43 +++--- src/mnode/src/mgmtDnode.c | 1 + src/mnode/src/mgmtProfile.c | 46 ++++++- src/mnode/src/mgmtSdb.c | 4 +- src/mnode/src/mgmtShell.c | 50 +++---- src/mnode/src/mgmtTable.c | 261 ++++++++++++++---------------------- src/mnode/src/mgmtUser.c | 6 +- src/mnode/src/mgmtVgroup.c | 20 ++- 14 files changed, 217 insertions(+), 275 deletions(-) diff --git a/src/inc/mnode.h b/src/inc/mnode.h index c7f4b07efa..ca93bf4661 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -169,7 +169,6 @@ typedef struct _db_obj { int8_t reserved[15]; int8_t updateEnd[1]; int32_t refCount; - struct _db_obj *prev, *next; int32_t numOfVgroups; int32_t numOfTables; int32_t numOfSuperTables; @@ -222,7 +221,6 @@ typedef struct _acctObj { int8_t updateEnd[1]; int32_t refCount; SAcctInfo acctInfo; - SDbObj * pHead; pthread_mutex_t mutex; } SAcctObj; @@ -252,8 +250,10 @@ typedef struct { void *ahandle; void *thandle; void *pCont; - SDbObj *pDb; SUserObj *pUser; + SDbObj *pDb; + SVgObj *pVgroup; + STableInfo *pTable; } SQueuedMsg; int32_t mgmtInitSystem(); diff --git a/src/mnode/CMakeLists.txt b/src/mnode/CMakeLists.txt index 3d8e712edd..b830695f52 100644 --- a/src/mnode/CMakeLists.txt +++ b/src/mnode/CMakeLists.txt @@ -14,6 +14,4 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ADD_LIBRARY(mnode ${SRC}) TARGET_LINK_LIBRARIES(mnode trpc tutil pthread) -ENDIF () - - +ENDIF () \ No newline at end of file diff --git a/src/mnode/inc/mgmtProfile.h b/src/mnode/inc/mgmtProfile.h index a40f572f8b..b05877f32c 100644 --- a/src/mnode/inc/mgmtProfile.h +++ b/src/mnode/inc/mgmtProfile.h @@ -28,6 +28,8 @@ bool mgmtCheckQhandle(uint64_t qhandle); void mgmtSaveQhandle(void *qhandle); void mgmtFreeQhandle(void *qhandle); +void * mgmtMallocQueuedMsg(SRpcMsg *rpcMsg); +void * mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg); void mgmtFreeQueuedMsg(SQueuedMsg *pMsg); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index 41ddb3b9f3..ddbbfb4a70 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -28,8 +28,8 @@ extern "C" { int32_t mgmtInitTables(); void mgmtCleanUpTables(); STableInfo* mgmtGetTable(char* tableId); -void mgmtIncTableRef(STableInfo *pTable); -void mgmtDecTableRef(STableInfo *pTable); +void mgmtIncTableRef(void *pTable); +void mgmtDecTableRef(void *pTable); void mgmtDropAllChildTables(SDbObj *pDropDb); void mgmtDropAllSuperTables(SDbObj *pDropDb); diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index de0648da44..d0b1e0de97 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -27,9 +27,11 @@ extern "C" { int32_t mgmtInitVgroups(); void mgmtCleanUpVgroups(); SVgObj *mgmtGetVgroup(int32_t vgId); +void mgmtIncVgroupRef(SVgObj *pVgroup); +void mgmtDecVgroupRef(SVgObj *pVgroup); void mgmtDropAllVgroups(SDbObj *pDropDb); -void mgmtCreateVgroup(SQueuedMsg *pMsg); +void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb); void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle); void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle); SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb); diff --git a/src/mnode/src/mgmtAcct.c b/src/mnode/src/mgmtAcct.c index 0d863e5249..0436db0ea8 100644 --- a/src/mnode/src/mgmtAcct.c +++ b/src/mnode/src/mgmtAcct.c @@ -38,56 +38,25 @@ int32_t acctCheck(SAcctObj *pAcct, EAcctGrantType type) { return TSDB_CODE_SUC #endif void acctAddDb(SAcctObj *pAcct, SDbObj *pDb) { - pthread_mutex_lock(&pAcct->mutex); - pDb->next = pAcct->pHead; - pDb->prev = NULL; + atomic_add_fetch_32(&pAcct->acctInfo.numOfDbs, 1); pDb->pAcct = pAcct; - - if (pAcct->pHead) { - pAcct->pHead->prev = pDb; - } - - pAcct->pHead = pDb; - pAcct->acctInfo.numOfDbs++; - pthread_mutex_unlock(&pAcct->mutex); - acctIncRef(pAcct); } void acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb) { - pthread_mutex_lock(&pAcct->mutex); - if (pDb->prev) { - pDb->prev->next = pDb->next; - } - - if (pDb->next) { - pDb->next->prev = pDb->prev; - } - - if (pDb->prev == NULL) { - pAcct->pHead = pDb->next; - } - - pAcct->acctInfo.numOfDbs--; - pthread_mutex_unlock(&pAcct->mutex); - + atomic_sub_fetch_32(&pAcct->acctInfo.numOfDbs, 1); + pDb->pAcct = NULL; acctDecRef(pAcct); } void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) { - pthread_mutex_lock(&pAcct->mutex); - pAcct->acctInfo.numOfUsers++; + atomic_add_fetch_32(&pAcct->acctInfo.numOfUsers, 1); pUser->pAcct = pAcct; - pthread_mutex_unlock(&pAcct->mutex); - acctIncRef(pAcct); } void acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser) { - pthread_mutex_lock(&pAcct->mutex); - pAcct->acctInfo.numOfUsers--; + atomic_sub_fetch_32(&pAcct->acctInfo.numOfUsers, 1); pUser->pAcct = NULL; - pthread_mutex_unlock(&pAcct->mutex); - acctDecRef(pAcct); } \ No newline at end of file diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 69c61531a0..ec1ff62862 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -32,7 +32,7 @@ #include "mgmtUser.h" #include "mgmtVgroup.h" -void * tsDbSdb = NULL; +static void * tsDbSdb = NULL; static int32_t tsDbUpdateSize; static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); @@ -55,8 +55,6 @@ static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) { pDb->pHead = NULL; pDb->pTail = NULL; - pDb->prev = NULL; - pDb->next = NULL; pDb->numOfVgroups = 0; pDb->numOfTables = 0; pDb->numOfSuperTables = 0; @@ -288,8 +286,9 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { return code; } - SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, pCreate->db); + SDbObj *pDb = mgmtGetDb(pCreate->db); if (pDb != NULL) { + mgmtDecDbRef(pDb); return TSDB_CODE_DB_ALREADY_EXIST; } @@ -517,16 +516,14 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) } pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - pShow->numOfRows = pUser->pAcct->acctInfo.numOfDbs; - pShow->pNode = pUser->pAcct->pHead; + mgmtDecUserRef(pUser); return 0; } static char *mgmtGetDbStr(char *src) { char *pos = strstr(src, TS_PATH_DELIMITER); - return ++pos; } @@ -539,14 +536,8 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * if (pUser == NULL) return 0; while (numOfRows < rows) { - pDb = (SDbObj *)pShow->pNode; + pShow->pNode = sdbFetchRow(tsDbSdb, pShow->pNode, (void **) &pDb); if (pDb == NULL) break; - pShow->pNode = (void *)pDb->next; - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - if (strcmp(pUser->user, "root") != 0 && strcmp(pUser->user, "_root") != 0 && strcmp(pUser->user, "monitor") != 0 ) { - continue; - } - } cols = 0; @@ -643,25 +634,31 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * cols++; numOfRows++; + mgmtDecDbRef(pDb); } pShow->numOfReads += numOfRows; + mgmtDecUserRef(pUser); return numOfRows; } void mgmtAddSuperTableIntoDb(SDbObj *pDb) { atomic_add_fetch_32(&pDb->numOfSuperTables, 1); + mgmtIncDbRef(pDb); } void mgmtRemoveSuperTableFromDb(SDbObj *pDb) { atomic_add_fetch_32(&pDb->numOfSuperTables, -1); + mgmtDecDbRef(pDb); } void mgmtAddTableIntoDb(SDbObj *pDb) { atomic_add_fetch_32(&pDb->numOfTables, 1); + mgmtIncDbRef(pDb); } void mgmtRemoveTableFromDb(SDbObj *pDb) { atomic_add_fetch_32(&pDb->numOfTables, -1); + mgmtDecDbRef(pDb); } static int32_t mgmtSetDbDirty(SDbObj *pDb) { @@ -800,15 +797,16 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { if (code != TSDB_CODE_SUCCESS) { mError("db:%s, failed to alter, invalid db option", pAlter->db); mgmtSendSimpleResp(pMsg->thandle, code); + mgmtDecDbRef(pDb); + return; } - SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); - memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); - pMsg->pCont = NULL; - SVgObj *pVgroup = pDb->pHead; if (pVgroup != NULL) { mPrint("vgroup:%d, will be altered", pVgroup->vgId); + SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); + memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); + memset(pMsg, 0, sizeof(SQueuedMsg)); newMsg->ahandle = pVgroup; newMsg->expected = pVgroup->numOfVnodes; mgmtAlterVgroup(pVgroup, newMsg); @@ -817,9 +815,9 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { mTrace("db:%s, all vgroups is altered", pDb->name); - mgmtSendSimpleResp(newMsg->thandle, TSDB_CODE_SUCCESS); - rpcFreeCont(newMsg->pCont); - free(newMsg); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); + rpcFreeCont(pMsg->pCont); + mgmtDecDbRef(pDb); } static void mgmtDropDb(void *handle, void *tmrId) { @@ -876,6 +874,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { mError("db:%s, can't drop monitor database", pDrop->db); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); + mgmtDecDbRef(pDb); return; } @@ -883,6 +882,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { if (code != TSDB_CODE_SUCCESS) { mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code)); mgmtSendSimpleResp(pMsg->thandle, code); + mgmtDecDbRef(pDb); return; } @@ -919,6 +919,7 @@ void mgmtDropAllDbs(SAcctObj *pAcct) { mgmtSetDbDirty(pDb); numOfDbs++; } + mgmtDecDbRef(pDb); } mTrace("acct:%s, all dbs is is set dirty", pAcct->user, numOfDbs); diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 53f0a668b4..65531bd36d 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -215,6 +215,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId); mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL); } + mgmtDecVgroupRef(pVgroup); } if (pDnode->status != TSDB_DN_STATUS_READY) { diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index 44bd45faea..ae921a0505 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -16,10 +16,13 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taosmsg.h" +#include "mgmtDb.h" #include "mgmtMnode.h" #include "mgmtProfile.h" #include "mgmtShell.h" +#include "mgmtTable.h" #include "mgmtUser.h" +#include "mgmtVgroup.h" int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg); @@ -763,12 +766,47 @@ int32_t mgmtInitProfile() { void mgmtCleanUpProfile() { } +void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) { + bool usePublicIp = false; + SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp); + if (pUser == NULL) { + return NULL; + } + + SQueuedMsg *pMsg = calloc(1, sizeof(SQueuedMsg)); + pMsg->thandle = rpcMsg->handle; + pMsg->msgType = rpcMsg->msgType; + pMsg->contLen = rpcMsg->contLen; + pMsg->pCont = rpcMsg->pCont; + pMsg->pUser = pUser; + pMsg->usePublicIp = usePublicIp; + + return pMsg; +} + void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { if (pMsg != NULL) { - if (pMsg->pCont != NULL) { - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - } + rpcFreeCont(pMsg->pCont); + if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser); + if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb); + if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup); + if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable); free(pMsg); } +} + +void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) { + SQueuedMsg *pDestMsg = calloc(1, sizeof(SQueuedMsg)); + + pDestMsg->thandle = pSrcMsg->thandle; + pDestMsg->msgType = pSrcMsg->msgType; + pDestMsg->pCont = pSrcMsg->pCont; + pDestMsg->contLen = pSrcMsg->contLen; + pDestMsg->pUser = pSrcMsg->pUser; + pDestMsg->usePublicIp = pSrcMsg->usePublicIp; + + pSrcMsg->pCont = NULL; + pSrcMsg->pUser = NULL; + + return pDestMsg; } \ No newline at end of file diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 0de540ae78..d6a1cf5260 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -435,7 +435,7 @@ void sdbIncRef(void *handle, void *pRow) { SSdbTable *pTable = handle; int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); atomic_add_fetch_32(pRefCount, 1); - sdbTrace("table:%s, add ref:%d to record:%s", pTable->tableName, *pRefCount, sdbGetkeyStr(pTable, pRow)); + sdbTrace("add ref to record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); } } @@ -444,7 +444,7 @@ void sdbDecRef(void *handle, void *pRow) { SSdbTable *pTable = handle; int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); - sdbTrace("table:%s, def ref:%d from record:%s", pTable->tableName, *pRefCount, sdbGetkeyStr(pTable, pRow)); + sdbTrace("def ref of record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount); if (refCount <= 0) { SSdbOperDesc oper = {.pObj = pRow}; (*pTable->destroyFp)(&oper); diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index d2bd78671d..b4ffd541f4 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -117,9 +117,7 @@ void mgmtAddShellShowRetrieveHandle(uint8_t msgType, SShowRetrieveFp fp) { void mgmtProcessTranRequest(SSchedMsg *sched) { SQueuedMsg *queuedMsg = sched->msg; (*tsMgmtProcessShellMsgFp[queuedMsg->msgType])(queuedMsg); - mgmtDecUserRef(queuedMsg->pUser); - rpcFreeCont(queuedMsg->pCont); - free(queuedMsg); + mgmtFreeQueuedMsg(queuedMsg); } void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) { @@ -134,6 +132,12 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { return; } + if (mgmtCheckRedirect(rpcMsg->handle)) { + // send resp in redirect func + rpcFreeCont(rpcMsg->pCont); + return; + } + if (!mgmtInServerStatus()) { mgmtProcessMsgWhileNotReady(rpcMsg); rpcFreeCont(rpcMsg->pCont); @@ -142,6 +146,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_GRANT_EXPIRED); + rpcFreeCont(rpcMsg->pCont); return; } @@ -151,45 +156,28 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { return; } - bool usePublicIp = false; - SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp); - if (pUser == NULL) { + SQueuedMsg *pMsg = mgmtMallocQueuedMsg(rpcMsg); + if (pMsg == NULL) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER); rpcFreeCont(rpcMsg->pCont); return; } - + if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) { - SQueuedMsg queuedMsg = {0}; - queuedMsg.thandle = rpcMsg->handle; - queuedMsg.msgType = rpcMsg->msgType; - queuedMsg.contLen = rpcMsg->contLen; - queuedMsg.pCont = rpcMsg->pCont; - queuedMsg.pUser = pUser; - queuedMsg.usePublicIp = usePublicIp; - (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(&queuedMsg); - mgmtDecUserRef(pUser); - rpcFreeCont(rpcMsg->pCont); + (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(pMsg); + mgmtFreeQueuedMsg(pMsg); } else { - SQueuedMsg *queuedMsg = calloc(1, sizeof(SQueuedMsg)); - queuedMsg->thandle = rpcMsg->handle; - queuedMsg->msgType = rpcMsg->msgType; - queuedMsg->contLen = rpcMsg->contLen; - queuedMsg->pCont = rpcMsg->pCont; - queuedMsg->pUser = pUser; - queuedMsg->usePublicIp = usePublicIp; - mgmtAddToShellQueue(queuedMsg); + if (!pMsg->pUser->writeAuth) { + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); + mgmtFreeQueuedMsg(pMsg); + } else { + mgmtAddToShellQueue(pMsg); + } } } static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { SCMShowMsg *pShowMsg = pMsg->pCont; - if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) { - if (mgmtCheckRedirect(pMsg->thandle)) { - return; - } - } - if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) { mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_MSG_TYPE); return; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 3e0036571a..0238d829a0 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -53,35 +53,38 @@ static int32_t tsSuperTableUpdateSize; static void * mgmtGetChildTable(char *tableId); static void * mgmtGetSuperTable(char *tableId); -void mgmtCreateChildTable(SQueuedMsg *pMsg); -void mgmtDropChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable); void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable); void mgmtAlterChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable); void mgmtDropAllChildTables(SDbObj *pDropDb); void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable); -void mgmtCreateSuperTable(SQueuedMsg *pMsg); -void mgmtDropSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable); -void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable); +static void mgmtProcessCreateTableMsg(SQueuedMsg *queueMsg); +static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg); +static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg); +static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg); + +static void mgmtProcessDropTableMsg(SQueuedMsg *queueMsg); +static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg); +static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg); +static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg); +static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg); + +void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable, SDbObj *pDb); void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable); void mgmtDropAllSuperTables(SDbObj *pDropDb); int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable); static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *queueMsg); -static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg); static int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *queueMsg); -static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg); static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg); -static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg); static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg); static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *pConn); -static void mgmtProcessCreateTableMsg(SQueuedMsg *queueMsg); -static void mgmtProcessDropTableMsg(SQueuedMsg *queueMsg); + static void mgmtProcessAlterTableMsg(SQueuedMsg *queueMsg); static void mgmtProcessTableMetaMsg(SQueuedMsg *queueMsg); @@ -450,7 +453,7 @@ int32_t mgmtInitTables() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLES_META, mgmtProcessMultiTableMetaMsg); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_TABLE, mgmtGetShowTableMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_TABLE, mgmtRetrieveShowTables); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateTableRsp); + mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateChildTableRsp); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropTableRsp); mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp); mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg); @@ -490,7 +493,8 @@ STableInfo *mgmtGetTable(char *tableId) { return NULL; } -void mgmtIncTableRef(STableInfo *pTable) { +void mgmtIncTableRef(void *p1) { + STableInfo *pTable = (STableInfo *)p1; if (pTable->type == TSDB_SUPER_TABLE) { sdbIncRef(tsSuperTableSdb, pTable); } else { @@ -498,7 +502,8 @@ void mgmtIncTableRef(STableInfo *pTable) { } } -void mgmtDecTableRef(STableInfo *pTable) { +void mgmtDecTableRef(void *p1) { + STableInfo *pTable = (STableInfo *)p1; if (pTable->type == TSDB_SUPER_TABLE) { sdbDecRef(tsSuperTableSdb, pTable); } else { @@ -526,13 +531,16 @@ void mgmtExtractTableName(char* tableId, char* name) { static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { SCMCreateTableMsg *pCreate = pMsg->pCont; - mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->thandle); - - if (mgmtCheckRedirect(pMsg->thandle)) return; - - if (!pMsg->pUser->writeAuth) { - mError("table:%s, failed to create, no rights", pCreate->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); + + pMsg->pTable = mgmtGetTable(pCreate->tableId); + if (pMsg->pTable != NULL) { + if (pCreate->igExists) { + mTrace("table:%s, is already exist", pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); + } else { + mError("table:%s, failed to create, table already exist", pCreate->tableId); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_TABLE_ALREADY_EXIST); + } return; } @@ -543,42 +551,19 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { return; } - STableInfo *pTable = mgmtGetTable(pCreate->tableId); - if (pTable != NULL) { - if (pCreate->igExists) { - mTrace("table:%s is already exist", pCreate->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); - mgmtDecTableRef(pTable); - return; - } else { - mError("table:%s, failed to create, table already exist", pCreate->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_TABLE_ALREADY_EXIST); - mgmtDecTableRef(pTable); - return; - } - } - if (pCreate->numOfTags != 0) { - mTrace("table:%s, is a stable", pCreate->tableId); - mgmtCreateSuperTable(pMsg); + mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->thandle); + mgmtProcessCreateSuperTableMsg(pMsg); } else { - mTrace("table:%s, is a ctable", pCreate->tableId); - mgmtCreateChildTable(pMsg); + mTrace("table:%s, create msg is received from thandle:%p", pCreate->tableId, pMsg->thandle); + mgmtProcessCreateChildTableMsg(pMsg); } } static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { SCMDropTableMsg *pDrop = pMsg->pCont; - mTrace("table:%s, drop table msg is received from thandle:%p", pDrop->tableId, pMsg->thandle); - if (mgmtCheckRedirect(pMsg->thandle)) return; - if (!pMsg->pUser->writeAuth) { - mError("table:%s, failed to drop, no rights", pDrop->tableId); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); - return; - } - pMsg->pDb = mgmtGetDbByTableId(pDrop->tableId); if (pMsg->pDb == NULL || pMsg->pDb->dirty) { mError("table:%s, failed to drop table, db not selected", pDrop->tableId); @@ -592,8 +577,8 @@ static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { return; } - STableInfo *pTable = mgmtGetTable(pDrop->tableId); - if (pTable == NULL) { + pMsg->pTable = mgmtGetTable(pDrop->tableId); + if (pMsg->pTable == NULL) { if (pDrop->igNotExists) { mTrace("table:%s, table is not exist, think drop success", pDrop->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); @@ -605,12 +590,12 @@ static void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) { } } - if (pTable->type == TSDB_SUPER_TABLE) { - mTrace("table:%s, start to drop stable", pDrop->tableId); - mgmtDropSuperTable(pMsg, (SSuperTableObj *)pTable); + if (pMsg->pTable->type == TSDB_SUPER_TABLE) { + mTrace("table:%s, start to drop ctable", pDrop->tableId); + mgmtProcessDropSuperTableMsg(pMsg); } else { mTrace("table:%s, start to drop ctable", pDrop->tableId); - mgmtDropChildTable(pMsg, (SChildTableObj *)pTable); + mgmtProcessDropChildTableMsg(pMsg); } } @@ -626,14 +611,14 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) { return; } - pMsg->pDb = mgmtGetDbByTableId(pAlter->tableId); - if (pMsg->pDb == NULL || pMsg->pDb->dirty) { + SDbObj *pDb = mgmtGetDbByTableId(pAlter->tableId); + if (pDb == NULL || pDb->dirty) { mError("table:%s, failed to alter table, db not selected", pAlter->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); return; } - if (mgmtCheckIsMonitorDB(pMsg->pDb->name, tsMonitorDbName)) { + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { mError("table:%s, failed to alter table, its log db", pAlter->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); return; @@ -671,8 +656,8 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { SCMTableInfoMsg *pInfo = pMsg->pCont; mTrace("table:%s, table meta msg is received from thandle:%p", pInfo->tableId, pMsg->thandle); - pMsg->pDb = mgmtGetDbByTableId(pInfo->tableId); - if (pMsg->pDb == NULL || pMsg->pDb->dirty) { + SDbObj *pDb = mgmtGetDbByTableId(pInfo->tableId); + if (pDb == NULL || pDb->dirty) { mError("table:%s, failed to get table meta, db not selected", pInfo->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); return; @@ -685,16 +670,16 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) { if (pTable->type != TSDB_SUPER_TABLE) { mgmtGetChildTableMeta(pMsg, (SChildTableObj *)pTable); } else { - mgmtGetSuperTableMeta(pMsg, (SSuperTableObj *)pTable); + mgmtGetSuperTableMeta(pMsg, (SSuperTableObj *)pTable, pDb); } } } - -void mgmtCreateSuperTable(SQueuedMsg *pMsg) { +static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) { SCMCreateTableMsg *pCreate = pMsg->pCont; SSuperTableObj *pStable = (SSuperTableObj *)calloc(1, sizeof(SSuperTableObj)); if (pStable == NULL) { + mError("table:%s, failed to create, no enough memory", pCreate->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); return; } @@ -712,7 +697,7 @@ void mgmtCreateSuperTable(SQueuedMsg *pMsg) { pStable->schema = (SSchema *)calloc(1, schemaSize); if (pStable->schema == NULL) { free(pStable); - mError("stable:%s, no schema input", pCreate->tableId); + mError("table:%s, failed to create, no schema input", pCreate->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE); return; } @@ -735,18 +720,19 @@ void mgmtCreateSuperTable(SQueuedMsg *pMsg) { int32_t code = sdbInsertRow(&oper); if (code != TSDB_CODE_SUCCESS) { mgmtDestroySuperTable(pStable); + mError("table:%s, failed to create, sdb error", pCreate->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SDB_ERROR); } else { - mLPrint("stable:%s, is created, tags:%d cols:%d", pStable->info.tableId, pStable->numOfTags, pStable->numOfColumns); + mLPrint("table:%s, is created, tags:%d cols:%d", pStable->info.tableId, pStable->numOfTags, pStable->numOfColumns); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); } } -void mgmtDropSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pStable) { +void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { + SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable; if (pStable->numOfTables != 0) { mError("stable:%s, numOfTables:%d not 0", pStable->info.tableId, pStable->numOfTables); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS); - mgmtDecTableRef((STableInfo *)pStable); } else { SSdbOperDesc oper = { .type = SDB_OPER_TYPE_GLOBAL, @@ -756,7 +742,6 @@ void mgmtDropSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pStable) { int32_t code = sdbDeleteRow(&oper); mLPrint("stable:%s, is dropped from sdb, result:%s", pStable->info.tableId, tstrerror(code)); mgmtSendSimpleResp(pMsg->thandle, code); - mgmtDecTableRef((STableInfo *)pStable); } } @@ -826,7 +811,7 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i return TSDB_CODE_SUCCESS; } -static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) { +static int32_t mgmtProcessDropSuperTableMsgTag(SSuperTableObj *pStable, char *tagName) { int32_t col = mgmtFindSuperTableTagIndex(pStable, tagName); if (col <= 0 || col >= pStable->numOfTags) { return TSDB_CODE_APP_ERROR; @@ -953,7 +938,7 @@ static int32_t mgmtAddSuperTableColumn(SSuperTableObj *pStable, SSchema schema[] return TSDB_CODE_SUCCESS; } -static int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pStable, char *colName) { +static int32_t mgmtProcessDropSuperTableMsgColumnByName(SSuperTableObj *pStable, char *colName) { int32_t col = mgmtFindSuperTableColumnIndex(pStable, colName); if (col < 0) { return TSDB_CODE_APP_ERROR; @@ -1034,6 +1019,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, pShow->numOfRows = pDb->numOfSuperTables; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + mgmtDecDbRef(pDb); return 0; } @@ -1048,14 +1034,6 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v SDbObj *pDb = mgmtGetDb(pShow->db); if (pDb == NULL) return 0; - SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); - - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - if (strcmp(pUser->user, "root") != 0 && strcmp(pUser->user, "_root") != 0 && strcmp(pUser->user, "monitor") != 0 ) { - return 0; - } - } - strcpy(prefix, pDb->name); strcat(prefix, TS_PATH_DELIMITER); prefixLen = strlen(prefix); @@ -1103,6 +1081,8 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v } pShow->numOfReads += numOfRows; + mgmtDecDbRef(pDb); + return numOfRows; } @@ -1148,9 +1128,7 @@ int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable) { return (pTable->numOfColumns + pTable->numOfTags) * sizeof(SSchema); } -void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable) { - SDbObj *pDb = pMsg->pDb; - +void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable, SDbObj *pDb) { STableMetaMsg *pMeta = rpcMallocCont(sizeof(STableMetaMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS); pMeta->uid = htobe64(pTable->uid); pMeta->sversion = htons(pTable->sversion); @@ -1170,7 +1148,7 @@ void mgmtGetSuperTableMeta(SQueuedMsg *pMsg, SSuperTableObj *pTable) { rpcSendResponse(&rpcRsp); mTrace("stable:%%s, uid:%" PRIu64 " table meta is retrieved", pTable->info.tableId, pTable->uid); - mgmtDecTableRef((STableInfo *)pTable); + mgmtDecTableRef(pTable); } static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) { @@ -1201,13 +1179,13 @@ void mgmtAlterSuperTable(SQueuedMsg *pMsg, SSuperTableObj *pTable) { if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) { code = mgmtAddSuperTableTag((SSuperTableObj *) pTable, pAlter->schema, 1); } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) { - code = mgmtDropSuperTableTag((SSuperTableObj *) pTable, pAlter->schema[0].name); + code = mgmtProcessDropSuperTableMsgTag((SSuperTableObj *) pTable, pAlter->schema[0].name); } else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { code = mgmtModifySuperTableTagNameByName((SSuperTableObj *) pTable, pAlter->schema[0].name, pAlter->schema[1].name); } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { code = mgmtAddSuperTableColumn((SSuperTableObj *) pTable, pAlter->schema, 1); } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { - code = mgmtDropSuperTableColumnByName((SSuperTableObj *) pTable, pAlter->schema[0].name); + code = mgmtProcessDropSuperTableMsgColumnByName((SSuperTableObj *) pTable, pAlter->schema[0].name); } else {} mgmtSendSimpleResp(pMsg->thandle, code); @@ -1217,7 +1195,6 @@ static void mgmtProcessDropStableRsp(SRpcMsg *rpcMsg) { mTrace("drop stable rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code); } - static void *mgmtBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableObj *pTable) { char * pTagData = NULL; int32_t tagDataLen = 0; @@ -1369,51 +1346,44 @@ static SChildTableObj* mgmtDoCreateChildTable(SCMCreateTableMsg *pCreate, SVgObj return pTable; } -void mgmtCreateChildTable(SQueuedMsg *pMsg) { +static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { SCMCreateTableMsg *pCreate = pMsg->pCont; - int32_t code = grantCheck(TSDB_GRANT_TIMESERIES); if (code != TSDB_CODE_SUCCESS) { - mError("table:%s, failed to create, grant not", pCreate->tableId); + mError("table:%s, failed to create, grant timeseries failed", pCreate->tableId); mgmtSendSimpleResp(pMsg->thandle, code); return; } - SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg)); - memcpy(newMsg, pMsg, sizeof(SQueuedMsg)); - pMsg->pCont = NULL; - - SVgObj *pVgroup = mgmtGetAvailableVgroup(pMsg->pDb); - if (pVgroup == NULL) { + pMsg->pVgroup = mgmtGetAvailableVgroup(pMsg->pDb); + if (pMsg->pVgroup == NULL) { mTrace("table:%s, start to create a new vgroup", pCreate->tableId); - mgmtCreateVgroup(newMsg); + mgmtCreateVgroup(mgmtCloneQueuedMsg(pMsg), pMsg->pDb); return; } - int32_t sid = taosAllocateId(pVgroup->idPool); + int32_t sid = taosAllocateId(pMsg->pVgroup->idPool); if (sid < 0) { - mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId); - mgmtCreateVgroup(newMsg); + mTrace("tables:%s, no enough sid in vgroup:%d", pMsg->pVgroup->vgId); + mgmtCreateVgroup(mgmtCloneQueuedMsg(pMsg), pMsg->pDb); return; } - SChildTableObj *pTable = mgmtDoCreateChildTable(pCreate, pVgroup, sid); - if (pTable == NULL) { + pMsg->pTable = (STableInfo *)mgmtDoCreateChildTable(pCreate, pMsg->pVgroup, sid); + if (pMsg->pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); - mgmtFreeQueuedMsg(newMsg); return; } - mgmtIncTableRef((STableInfo *)pTable); - SMDCreateTableMsg *pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, (SChildTableObj *) pTable); + SMDCreateTableMsg *pMDCreate = mgmtBuildCreateChildTableMsg(pCreate, (SChildTableObj *) pMsg->pTable); if (pMDCreate == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); - mgmtFreeQueuedMsg(newMsg); - mgmtDecTableRef((STableInfo *)pTable); return; } - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pMsg->pVgroup); + SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); + newMsg->ahandle = pMsg->pTable; SRpcMsg rpcMsg = { .handle = newMsg, .pCont = pMDCreate, @@ -1422,24 +1392,22 @@ void mgmtCreateChildTable(SQueuedMsg *pMsg) { .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE }; - newMsg->ahandle = pTable; mgmtSendMsgToDnode(&ipSet, &rpcMsg); } -void mgmtDropChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable) { - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { - mError("ctable:%s, failed to drop child table, vgroup not exist", pTable->info.tableId); +void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { + SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; + pMsg->pVgroup = mgmtGetVgroup(pTable->vgId); + if (pMsg->pVgroup == NULL) { + mError("table:%s, failed to drop ctable, vgroup not exist", pTable->info.tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS); - mgmtDecTableRef((STableInfo *)pTable); return; } SMDDropTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropTableMsg)); if (pDrop == NULL) { - mError("ctable:%s, failed to drop child table, no enough memory", pTable->info.tableId); + mError("table:%s, failed to drop ctable, no enough memory", pTable->info.tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); - mgmtDecTableRef((STableInfo *)pTable); return; } @@ -1449,22 +1417,22 @@ void mgmtDropChildTable(SQueuedMsg *pMsg, SChildTableObj *pTable) { pDrop->sid = htonl(pTable->sid); pDrop->uid = htobe64(pTable->uid); - SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); + SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pMsg->pVgroup); - mTrace("ctable:%s, send drop table msg", pDrop->tableId); + mTrace("table:%s, send drop ctable msg", pDrop->tableId); + SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); + newMsg->ahandle = pMsg->pTable; SRpcMsg rpcMsg = { - .handle = pMsg, + .handle = newMsg, .pCont = pDrop, .contLen = sizeof(SMDDropTableMsg), .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE }; - pMsg->ahandle = pTable; mgmtSendMsgToDnode(&ipSet, &rpcMsg); } - int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) { // TODO: send message to dnode // int32_t col = mgmtFindSuperTableTagIndex(pTable->superTable, tagName); @@ -1672,7 +1640,7 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable) { if (pDb == NULL || pDb->dirty) { mError("table:%s, failed to get table meta, db not selected", pInfo->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_DB_NOT_SELECTED); - mgmtDecTableRef((STableInfo *)pTable); + mgmtDecTableRef(pTable); return; } @@ -1709,7 +1677,7 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable) { if (pMeta == NULL) { mError("table:%s, failed to get table meta, no enough memory", pTable->info.tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY); - mgmtDecTableRef((STableInfo *)pTable); + mgmtDecTableRef(pTable); return; } @@ -1722,7 +1690,7 @@ void mgmtGetChildTableMeta(SQueuedMsg *pMsg, SChildTableObj *pTable) { }; pMeta->contLen = htons(pMeta->contLen); rpcSendResponse(&rpcRsp); - mgmtDecTableRef((STableInfo *)pTable); + mgmtDecTableRef(pTable); } void mgmtDropAllChildTables(SDbObj *pDropDb) { @@ -1816,7 +1784,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) { SMDCreateTableMsg *pMDCreate = NULL; pMDCreate = mgmtBuildCreateChildTableMsg(NULL, (SChildTableObj *) pTable); if (pMDCreate == NULL) { - mgmtDecTableRef((STableInfo *)pTable); + mgmtDecTableRef(pTable); return; } @@ -1829,7 +1797,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) { .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE }; mgmtSendMsgToDnode(&ipSet, &rpcRsp); - mgmtDecTableRef((STableInfo *)pTable); + mgmtDecTableRef(pTable); } static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { @@ -1845,16 +1813,14 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { mError("table:%s, failed to drop in dnode, reason:%s", pTable->info.tableId, tstrerror(rpcMsg->code)); mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); free(queueMsg); - mgmtDecTableRef((STableInfo *)pTable); + mgmtDecTableRef(pTable); return; } - SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId); - if (pVgroup == NULL) { + queueMsg->pVgroup = mgmtGetVgroup(pTable->vgId); + if (queueMsg->pVgroup == NULL) { mError("table:%s, failed to get vgroup", pTable->info.tableId); mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_INVALID_VGROUP_ID); - free(queueMsg); - mgmtDecTableRef((STableInfo *)pTable); return; } @@ -1868,22 +1834,19 @@ static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) { if (code != TSDB_CODE_SUCCESS) { mError("table:%s, update ctables sdb error", pTable->info.tableId); mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SDB_ERROR); - free(queueMsg); - mgmtDecTableRef((STableInfo *)pTable); return; } - if (pVgroup->numOfTables <= 0) { - mPrint("vgroup:%d, all tables is dropped, drop vgroup", pVgroup->vgId); - mgmtDropVgroup(pVgroup, NULL); + if (queueMsg->pVgroup->numOfTables <= 0) { + mPrint("vgroup:%d, all tables is dropped, drop vgroup", queueMsg->pVgroup->vgId); + mgmtDropVgroup(queueMsg->pVgroup, NULL); } mgmtSendSimpleResp(queueMsg->thandle, TSDB_CODE_SUCCESS); - free(queueMsg); - mgmtDecTableRef((STableInfo *)pTable); + mgmtFreeQueuedMsg(queueMsg); } -static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { +static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { if (rpcMsg->handle == NULL) return; SQueuedMsg *queueMsg = rpcMsg->handle; @@ -1903,27 +1866,17 @@ static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) { mError("table:%s, failed to create in dnode, reason:%s", pTable->info.tableId, tstrerror(rpcMsg->code)); mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); - mgmtDecTableRef((STableInfo *)pTable); } else { mTrace("table:%s, created in dnode", pTable->info.tableId); if (queueMsg->msgType != TSDB_MSG_TYPE_CM_CREATE_TABLE) { - SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); - newMsg->msgType = queueMsg->msgType; - newMsg->thandle = queueMsg->thandle; - newMsg->pDb = queueMsg->pDb; - newMsg->pUser = queueMsg->pUser; - newMsg->contLen = queueMsg->contLen; - newMsg->pCont = rpcMallocCont(newMsg->contLen); - memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); mTrace("table:%s, start to get meta", pTable->info.tableId); - mgmtAddToShellQueue(newMsg); + mgmtAddToShellQueue(mgmtCloneQueuedMsg(queueMsg)); } else { mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); - mgmtDecTableRef((STableInfo *)pTable); } } - free(queueMsg); + mgmtFreeQueuedMsg(queueMsg); } static void mgmtProcessAlterTableRsp(SRpcMsg *rpcMsg) { @@ -2038,6 +1991,7 @@ static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void pShow->numOfRows = pDb->numOfTables; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + mgmtDecDbRef(pDb); return 0; } @@ -2053,16 +2007,6 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, SDbObj *pDb = mgmtGetDb(pShow->db); if (pDb == NULL) return 0; - SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); - if (pUser == NULL) return 0; - - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - if (strcmp(pUser->user, "root") != 0 && strcmp(pUser->user, "_root") != 0 && - strcmp(pUser->user, "monitor") != 0) { - return 0; - } - } - int32_t numOfRows = 0; SChildTableObj *pTable = NULL; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; @@ -2118,12 +2062,15 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, cols++; numOfRows++; + + mgmtDecTableRef(pTable); } pShow->numOfReads += numOfRows; const int32_t NUM_OF_COLUMNS = 4; mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + mgmtDecDbRef(pDb); return numOfRows; } diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index cedb32b44d..04b2a9ad2b 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -25,7 +25,7 @@ #include "mgmtShell.h" #include "mgmtUser.h" -static void * tsUserSdb = NULL; +static void * tsUserSdb = NULL; static int32_t tsUserUpdateSize = 0; static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void *pConn); @@ -44,7 +44,6 @@ static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) { if (pAcct != NULL) { acctAddUser(pAcct, pUser); - acctDecRef(pAcct); } else { mError("user:%s, acct:%s info not exist in sdb", pUser->user, pUser->acct); @@ -60,7 +59,6 @@ static int32_t mgmtUserActionDelete(SSdbOperDesc *pOper) { if (pAcct != NULL) { acctRemoveUser(pAcct, pUser); - acctDecRef(pAcct); } return TSDB_CODE_SUCCESS; @@ -426,7 +424,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) { mgmtSendSimpleResp(pMsg->thandle, code); } else { - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); + mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS); } mgmtDecUserRef(pUser); diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index dd12128ccc..32dd6e5096 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -175,6 +175,14 @@ int32_t mgmtInitVgroups() { return 0; } +void mgmtIncVgroupRef(SVgObj *pVgroup) { + return sdbIncRef(tsVgroupSdb, pVgroup); +} + +void mgmtDecVgroupRef(SVgObj *pVgroup) { + return sdbDecRef(tsVgroupSdb, pVgroup); +} + SVgObj *mgmtGetVgroup(int32_t vgId) { return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId); } @@ -183,15 +191,7 @@ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) { return pDb->pHead; } -void mgmtCreateVgroup(SQueuedMsg *pMsg) { - SDbObj *pDb = pMsg->pDb; - if (pDb == NULL) { - mError("failed to create vgroup, db not found"); - mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB); - mgmtFreeQueuedMsg(pMsg); - return; - } - +void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) { SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj)); strcpy(pVgroup->dbName, pDb->name); pVgroup->numOfVnodes = pDb->cfg.replications; @@ -548,7 +548,6 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); newMsg->msgType = queueMsg->msgType; newMsg->thandle = queueMsg->thandle; - newMsg->pDb = queueMsg->pDb; newMsg->pUser = queueMsg->pUser; newMsg->contLen = queueMsg->contLen; newMsg->pCont = rpcMallocCont(newMsg->contLen); @@ -632,7 +631,6 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); newMsg->msgType = queueMsg->msgType; newMsg->thandle = queueMsg->thandle; - newMsg->pDb = queueMsg->pDb; newMsg->pUser = queueMsg->pUser; newMsg->contLen = queueMsg->contLen; newMsg->pCont = rpcMallocCont(newMsg->contLen); -- GitLab