提交 75b15760 编写于 作者: S slguan

[TD-93] add ref to sdb

上级 eb6f21bd
...@@ -169,7 +169,6 @@ typedef struct _db_obj { ...@@ -169,7 +169,6 @@ typedef struct _db_obj {
int8_t reserved[15]; int8_t reserved[15];
int8_t updateEnd[1]; int8_t updateEnd[1];
int32_t refCount; int32_t refCount;
struct _db_obj *prev, *next;
int32_t numOfVgroups; int32_t numOfVgroups;
int32_t numOfTables; int32_t numOfTables;
int32_t numOfSuperTables; int32_t numOfSuperTables;
...@@ -222,7 +221,6 @@ typedef struct _acctObj { ...@@ -222,7 +221,6 @@ typedef struct _acctObj {
int8_t updateEnd[1]; int8_t updateEnd[1];
int32_t refCount; int32_t refCount;
SAcctInfo acctInfo; SAcctInfo acctInfo;
SDbObj * pHead;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SAcctObj; } SAcctObj;
...@@ -252,8 +250,10 @@ typedef struct { ...@@ -252,8 +250,10 @@ typedef struct {
void *ahandle; void *ahandle;
void *thandle; void *thandle;
void *pCont; void *pCont;
SDbObj *pDb;
SUserObj *pUser; SUserObj *pUser;
SDbObj *pDb;
SVgObj *pVgroup;
STableInfo *pTable;
} SQueuedMsg; } SQueuedMsg;
int32_t mgmtInitSystem(); int32_t mgmtInitSystem();
......
...@@ -15,5 +15,3 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -15,5 +15,3 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
ADD_LIBRARY(mnode ${SRC}) ADD_LIBRARY(mnode ${SRC})
TARGET_LINK_LIBRARIES(mnode trpc tutil pthread) TARGET_LINK_LIBRARIES(mnode trpc tutil pthread)
ENDIF () ENDIF ()
...@@ -28,6 +28,8 @@ bool mgmtCheckQhandle(uint64_t qhandle); ...@@ -28,6 +28,8 @@ bool mgmtCheckQhandle(uint64_t qhandle);
void mgmtSaveQhandle(void *qhandle); void mgmtSaveQhandle(void *qhandle);
void mgmtFreeQhandle(void *qhandle); void mgmtFreeQhandle(void *qhandle);
void * mgmtMallocQueuedMsg(SRpcMsg *rpcMsg);
void * mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg);
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg); void mgmtFreeQueuedMsg(SQueuedMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -28,8 +28,8 @@ extern "C" { ...@@ -28,8 +28,8 @@ extern "C" {
int32_t mgmtInitTables(); int32_t mgmtInitTables();
void mgmtCleanUpTables(); void mgmtCleanUpTables();
STableInfo* mgmtGetTable(char* tableId); STableInfo* mgmtGetTable(char* tableId);
void mgmtIncTableRef(STableInfo *pTable); void mgmtIncTableRef(void *pTable);
void mgmtDecTableRef(STableInfo *pTable); void mgmtDecTableRef(void *pTable);
void mgmtDropAllChildTables(SDbObj *pDropDb); void mgmtDropAllChildTables(SDbObj *pDropDb);
void mgmtDropAllSuperTables(SDbObj *pDropDb); void mgmtDropAllSuperTables(SDbObj *pDropDb);
......
...@@ -27,9 +27,11 @@ extern "C" { ...@@ -27,9 +27,11 @@ extern "C" {
int32_t mgmtInitVgroups(); int32_t mgmtInitVgroups();
void mgmtCleanUpVgroups(); void mgmtCleanUpVgroups();
SVgObj *mgmtGetVgroup(int32_t vgId); SVgObj *mgmtGetVgroup(int32_t vgId);
void mgmtIncVgroupRef(SVgObj *pVgroup);
void mgmtDecVgroupRef(SVgObj *pVgroup);
void mgmtDropAllVgroups(SDbObj *pDropDb); void mgmtDropAllVgroups(SDbObj *pDropDb);
void mgmtCreateVgroup(SQueuedMsg *pMsg); void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb);
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle); void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle); void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle);
SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb); SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb);
......
...@@ -38,56 +38,25 @@ int32_t acctCheck(SAcctObj *pAcct, EAcctGrantType type) { return TSDB_CODE_SUC ...@@ -38,56 +38,25 @@ int32_t acctCheck(SAcctObj *pAcct, EAcctGrantType type) { return TSDB_CODE_SUC
#endif #endif
void acctAddDb(SAcctObj *pAcct, SDbObj *pDb) { void acctAddDb(SAcctObj *pAcct, SDbObj *pDb) {
pthread_mutex_lock(&pAcct->mutex); atomic_add_fetch_32(&pAcct->acctInfo.numOfDbs, 1);
pDb->next = pAcct->pHead;
pDb->prev = NULL;
pDb->pAcct = pAcct; pDb->pAcct = pAcct;
if (pAcct->pHead) {
pAcct->pHead->prev = pDb;
}
pAcct->pHead = pDb;
pAcct->acctInfo.numOfDbs++;
pthread_mutex_unlock(&pAcct->mutex);
acctIncRef(pAcct); acctIncRef(pAcct);
} }
void acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb) { void acctRemoveDb(SAcctObj *pAcct, SDbObj *pDb) {
pthread_mutex_lock(&pAcct->mutex); atomic_sub_fetch_32(&pAcct->acctInfo.numOfDbs, 1);
if (pDb->prev) { pDb->pAcct = NULL;
pDb->prev->next = pDb->next;
}
if (pDb->next) {
pDb->next->prev = pDb->prev;
}
if (pDb->prev == NULL) {
pAcct->pHead = pDb->next;
}
pAcct->acctInfo.numOfDbs--;
pthread_mutex_unlock(&pAcct->mutex);
acctDecRef(pAcct); acctDecRef(pAcct);
} }
void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) { void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) {
pthread_mutex_lock(&pAcct->mutex); atomic_add_fetch_32(&pAcct->acctInfo.numOfUsers, 1);
pAcct->acctInfo.numOfUsers++;
pUser->pAcct = pAcct; pUser->pAcct = pAcct;
pthread_mutex_unlock(&pAcct->mutex);
acctIncRef(pAcct); acctIncRef(pAcct);
} }
void acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser) { void acctRemoveUser(SAcctObj *pAcct, SUserObj *pUser) {
pthread_mutex_lock(&pAcct->mutex); atomic_sub_fetch_32(&pAcct->acctInfo.numOfUsers, 1);
pAcct->acctInfo.numOfUsers--;
pUser->pAcct = NULL; pUser->pAcct = NULL;
pthread_mutex_unlock(&pAcct->mutex);
acctDecRef(pAcct); acctDecRef(pAcct);
} }
\ No newline at end of file
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
#include "mgmtUser.h" #include "mgmtUser.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
void * tsDbSdb = NULL; static void * tsDbSdb = NULL;
static int32_t tsDbUpdateSize; static int32_t tsDbUpdateSize;
static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
...@@ -55,8 +55,6 @@ static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) { ...@@ -55,8 +55,6 @@ static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) {
pDb->pHead = NULL; pDb->pHead = NULL;
pDb->pTail = NULL; pDb->pTail = NULL;
pDb->prev = NULL;
pDb->next = NULL;
pDb->numOfVgroups = 0; pDb->numOfVgroups = 0;
pDb->numOfTables = 0; pDb->numOfTables = 0;
pDb->numOfSuperTables = 0; pDb->numOfSuperTables = 0;
...@@ -288,8 +286,9 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { ...@@ -288,8 +286,9 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
return code; return code;
} }
SDbObj *pDb = (SDbObj *)sdbGetRow(tsDbSdb, pCreate->db); SDbObj *pDb = mgmtGetDb(pCreate->db);
if (pDb != NULL) { if (pDb != NULL) {
mgmtDecDbRef(pDb);
return TSDB_CODE_DB_ALREADY_EXIST; return TSDB_CODE_DB_ALREADY_EXIST;
} }
...@@ -517,16 +516,14 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) ...@@ -517,16 +516,14 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
} }
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->numOfRows = pUser->pAcct->acctInfo.numOfDbs; pShow->numOfRows = pUser->pAcct->acctInfo.numOfDbs;
pShow->pNode = pUser->pAcct->pHead;
mgmtDecUserRef(pUser);
return 0; return 0;
} }
static char *mgmtGetDbStr(char *src) { static char *mgmtGetDbStr(char *src) {
char *pos = strstr(src, TS_PATH_DELIMITER); char *pos = strstr(src, TS_PATH_DELIMITER);
return ++pos; return ++pos;
} }
...@@ -539,14 +536,8 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * ...@@ -539,14 +536,8 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
while (numOfRows < rows) { while (numOfRows < rows) {
pDb = (SDbObj *)pShow->pNode; pShow->pNode = sdbFetchRow(tsDbSdb, pShow->pNode, (void **) &pDb);
if (pDb == NULL) break; if (pDb == NULL) break;
pShow->pNode = (void *)pDb->next;
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
if (strcmp(pUser->user, "root") != 0 && strcmp(pUser->user, "_root") != 0 && strcmp(pUser->user, "monitor") != 0 ) {
continue;
}
}
cols = 0; cols = 0;
...@@ -643,25 +634,31 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void * ...@@ -643,25 +634,31 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
cols++; cols++;
numOfRows++; numOfRows++;
mgmtDecDbRef(pDb);
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
mgmtDecUserRef(pUser);
return numOfRows; return numOfRows;
} }
void mgmtAddSuperTableIntoDb(SDbObj *pDb) { void mgmtAddSuperTableIntoDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfSuperTables, 1); atomic_add_fetch_32(&pDb->numOfSuperTables, 1);
mgmtIncDbRef(pDb);
} }
void mgmtRemoveSuperTableFromDb(SDbObj *pDb) { void mgmtRemoveSuperTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfSuperTables, -1); atomic_add_fetch_32(&pDb->numOfSuperTables, -1);
mgmtDecDbRef(pDb);
} }
void mgmtAddTableIntoDb(SDbObj *pDb) { void mgmtAddTableIntoDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfTables, 1); atomic_add_fetch_32(&pDb->numOfTables, 1);
mgmtIncDbRef(pDb);
} }
void mgmtRemoveTableFromDb(SDbObj *pDb) { void mgmtRemoveTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32(&pDb->numOfTables, -1); atomic_add_fetch_32(&pDb->numOfTables, -1);
mgmtDecDbRef(pDb);
} }
static int32_t mgmtSetDbDirty(SDbObj *pDb) { static int32_t mgmtSetDbDirty(SDbObj *pDb) {
...@@ -800,15 +797,16 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { ...@@ -800,15 +797,16 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mError("db:%s, failed to alter, invalid db option", pAlter->db); mError("db:%s, failed to alter, invalid db option", pAlter->db);
mgmtSendSimpleResp(pMsg->thandle, code); mgmtSendSimpleResp(pMsg->thandle, code);
mgmtDecDbRef(pDb);
return;
} }
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
pMsg->pCont = NULL;
SVgObj *pVgroup = pDb->pHead; SVgObj *pVgroup = pDb->pHead;
if (pVgroup != NULL) { if (pVgroup != NULL) {
mPrint("vgroup:%d, will be altered", pVgroup->vgId); mPrint("vgroup:%d, will be altered", pVgroup->vgId);
SQueuedMsg *newMsg = malloc(sizeof(SQueuedMsg));
memcpy(newMsg, pMsg, sizeof(SQueuedMsg));
memset(pMsg, 0, sizeof(SQueuedMsg));
newMsg->ahandle = pVgroup; newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes; newMsg->expected = pVgroup->numOfVnodes;
mgmtAlterVgroup(pVgroup, newMsg); mgmtAlterVgroup(pVgroup, newMsg);
...@@ -817,9 +815,9 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) { ...@@ -817,9 +815,9 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
mTrace("db:%s, all vgroups is altered", pDb->name); mTrace("db:%s, all vgroups is altered", pDb->name);
mgmtSendSimpleResp(newMsg->thandle, TSDB_CODE_SUCCESS); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
rpcFreeCont(newMsg->pCont); rpcFreeCont(pMsg->pCont);
free(newMsg); mgmtDecDbRef(pDb);
} }
static void mgmtDropDb(void *handle, void *tmrId) { static void mgmtDropDb(void *handle, void *tmrId) {
...@@ -876,6 +874,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { ...@@ -876,6 +874,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
mError("db:%s, can't drop monitor database", pDrop->db); mError("db:%s, can't drop monitor database", pDrop->db);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN);
mgmtDecDbRef(pDb);
return; return;
} }
...@@ -883,6 +882,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) { ...@@ -883,6 +882,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code)); mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code));
mgmtSendSimpleResp(pMsg->thandle, code); mgmtSendSimpleResp(pMsg->thandle, code);
mgmtDecDbRef(pDb);
return; return;
} }
...@@ -919,6 +919,7 @@ void mgmtDropAllDbs(SAcctObj *pAcct) { ...@@ -919,6 +919,7 @@ void mgmtDropAllDbs(SAcctObj *pAcct) {
mgmtSetDbDirty(pDb); mgmtSetDbDirty(pDb);
numOfDbs++; numOfDbs++;
} }
mgmtDecDbRef(pDb);
} }
mTrace("acct:%s, all dbs is is set dirty", pAcct->user, numOfDbs); mTrace("acct:%s, all dbs is is set dirty", pAcct->user, numOfDbs);
......
...@@ -215,6 +215,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -215,6 +215,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId); mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId);
mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL); mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL);
} }
mgmtDecVgroupRef(pVgroup);
} }
if (pDnode->status != TSDB_DN_STATUS_READY) { if (pDnode->status != TSDB_DN_STATUS_READY) {
......
...@@ -16,10 +16,13 @@ ...@@ -16,10 +16,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "mgmtDb.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtTable.h"
#include "mgmtUser.h" #include "mgmtUser.h"
#include "mgmtVgroup.h"
int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg); int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg);
...@@ -763,12 +766,47 @@ int32_t mgmtInitProfile() { ...@@ -763,12 +766,47 @@ int32_t mgmtInitProfile() {
void mgmtCleanUpProfile() { void mgmtCleanUpProfile() {
} }
void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) {
bool usePublicIp = false;
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp);
if (pUser == NULL) {
return NULL;
}
SQueuedMsg *pMsg = calloc(1, sizeof(SQueuedMsg));
pMsg->thandle = rpcMsg->handle;
pMsg->msgType = rpcMsg->msgType;
pMsg->contLen = rpcMsg->contLen;
pMsg->pCont = rpcMsg->pCont;
pMsg->pUser = pUser;
pMsg->usePublicIp = usePublicIp;
return pMsg;
}
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) { void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) {
if (pMsg != NULL) { if (pMsg != NULL) {
if (pMsg->pCont != NULL) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL; if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser);
} if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb);
if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup);
if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable);
free(pMsg); free(pMsg);
} }
} }
void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) {
SQueuedMsg *pDestMsg = calloc(1, sizeof(SQueuedMsg));
pDestMsg->thandle = pSrcMsg->thandle;
pDestMsg->msgType = pSrcMsg->msgType;
pDestMsg->pCont = pSrcMsg->pCont;
pDestMsg->contLen = pSrcMsg->contLen;
pDestMsg->pUser = pSrcMsg->pUser;
pDestMsg->usePublicIp = pSrcMsg->usePublicIp;
pSrcMsg->pCont = NULL;
pSrcMsg->pUser = NULL;
return pDestMsg;
}
\ No newline at end of file
...@@ -435,7 +435,7 @@ void sdbIncRef(void *handle, void *pRow) { ...@@ -435,7 +435,7 @@ void sdbIncRef(void *handle, void *pRow) {
SSdbTable *pTable = handle; SSdbTable *pTable = handle;
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
atomic_add_fetch_32(pRefCount, 1); atomic_add_fetch_32(pRefCount, 1);
sdbTrace("table:%s, add ref:%d to record:%s", pTable->tableName, *pRefCount, sdbGetkeyStr(pTable, pRow)); sdbTrace("add ref to record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
} }
} }
...@@ -444,7 +444,7 @@ void sdbDecRef(void *handle, void *pRow) { ...@@ -444,7 +444,7 @@ void sdbDecRef(void *handle, void *pRow) {
SSdbTable *pTable = handle; SSdbTable *pTable = handle;
int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos); int32_t *pRefCount = (int32_t *)(pRow + pTable->refCountPos);
int32_t refCount = atomic_sub_fetch_32(pRefCount, 1); int32_t refCount = atomic_sub_fetch_32(pRefCount, 1);
sdbTrace("table:%s, def ref:%d from record:%s", pTable->tableName, *pRefCount, sdbGetkeyStr(pTable, pRow)); sdbTrace("def ref of record:%s:%s:%d", pTable->tableName, sdbGetkeyStr(pTable, pRow), *pRefCount);
if (refCount <= 0) { if (refCount <= 0) {
SSdbOperDesc oper = {.pObj = pRow}; SSdbOperDesc oper = {.pObj = pRow};
(*pTable->destroyFp)(&oper); (*pTable->destroyFp)(&oper);
......
...@@ -117,9 +117,7 @@ void mgmtAddShellShowRetrieveHandle(uint8_t msgType, SShowRetrieveFp fp) { ...@@ -117,9 +117,7 @@ void mgmtAddShellShowRetrieveHandle(uint8_t msgType, SShowRetrieveFp fp) {
void mgmtProcessTranRequest(SSchedMsg *sched) { void mgmtProcessTranRequest(SSchedMsg *sched) {
SQueuedMsg *queuedMsg = sched->msg; SQueuedMsg *queuedMsg = sched->msg;
(*tsMgmtProcessShellMsgFp[queuedMsg->msgType])(queuedMsg); (*tsMgmtProcessShellMsgFp[queuedMsg->msgType])(queuedMsg);
mgmtDecUserRef(queuedMsg->pUser); mgmtFreeQueuedMsg(queuedMsg);
rpcFreeCont(queuedMsg->pCont);
free(queuedMsg);
} }
void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) { void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) {
...@@ -134,6 +132,12 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -134,6 +132,12 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return; return;
} }
if (mgmtCheckRedirect(rpcMsg->handle)) {
// send resp in redirect func
rpcFreeCont(rpcMsg->pCont);
return;
}
if (!mgmtInServerStatus()) { if (!mgmtInServerStatus()) {
mgmtProcessMsgWhileNotReady(rpcMsg); mgmtProcessMsgWhileNotReady(rpcMsg);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
...@@ -142,6 +146,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -142,6 +146,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) { if (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_GRANT_EXPIRED); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_GRANT_EXPIRED);
rpcFreeCont(rpcMsg->pCont);
return; return;
} }
...@@ -151,45 +156,28 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -151,45 +156,28 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return; return;
} }
bool usePublicIp = false; SQueuedMsg *pMsg = mgmtMallocQueuedMsg(rpcMsg);
SUserObj *pUser = mgmtGetUserFromConn(rpcMsg->handle, &usePublicIp); if (pMsg == NULL) {
if (pUser == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_USER);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return; return;
} }
if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) { if (mgmtCheckMsgReadOnly(rpcMsg->msgType, rpcMsg->pCont)) {
SQueuedMsg queuedMsg = {0}; (*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(pMsg);
queuedMsg.thandle = rpcMsg->handle; mgmtFreeQueuedMsg(pMsg);
queuedMsg.msgType = rpcMsg->msgType; } else {
queuedMsg.contLen = rpcMsg->contLen; if (!pMsg->pUser->writeAuth) {
queuedMsg.pCont = rpcMsg->pCont; mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
queuedMsg.pUser = pUser; mgmtFreeQueuedMsg(pMsg);
queuedMsg.usePublicIp = usePublicIp;
(*tsMgmtProcessShellMsgFp[rpcMsg->msgType])(&queuedMsg);
mgmtDecUserRef(pUser);
rpcFreeCont(rpcMsg->pCont);
} else { } else {
SQueuedMsg *queuedMsg = calloc(1, sizeof(SQueuedMsg)); mgmtAddToShellQueue(pMsg);
queuedMsg->thandle = rpcMsg->handle; }
queuedMsg->msgType = rpcMsg->msgType;
queuedMsg->contLen = rpcMsg->contLen;
queuedMsg->pCont = rpcMsg->pCont;
queuedMsg->pUser = pUser;
queuedMsg->usePublicIp = usePublicIp;
mgmtAddToShellQueue(queuedMsg);
} }
} }
static void mgmtProcessShowMsg(SQueuedMsg *pMsg) { static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
SCMShowMsg *pShowMsg = pMsg->pCont; SCMShowMsg *pShowMsg = pMsg->pCont;
if (pShowMsg->type == TSDB_MGMT_TABLE_DNODE || TSDB_MGMT_TABLE_GRANTS || TSDB_MGMT_TABLE_SCORES) {
if (mgmtCheckRedirect(pMsg->thandle)) {
return;
}
}
if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) { if (pShowMsg->type >= TSDB_MGMT_TABLE_MAX) {
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_MSG_TYPE); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_MSG_TYPE);
return; return;
......
此差异已折叠。
...@@ -44,7 +44,6 @@ static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) { ...@@ -44,7 +44,6 @@ static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) {
if (pAcct != NULL) { if (pAcct != NULL) {
acctAddUser(pAcct, pUser); acctAddUser(pAcct, pUser);
acctDecRef(pAcct);
} }
else { else {
mError("user:%s, acct:%s info not exist in sdb", pUser->user, pUser->acct); mError("user:%s, acct:%s info not exist in sdb", pUser->user, pUser->acct);
...@@ -60,7 +59,6 @@ static int32_t mgmtUserActionDelete(SSdbOperDesc *pOper) { ...@@ -60,7 +59,6 @@ static int32_t mgmtUserActionDelete(SSdbOperDesc *pOper) {
if (pAcct != NULL) { if (pAcct != NULL) {
acctRemoveUser(pAcct, pUser); acctRemoveUser(pAcct, pUser);
acctDecRef(pAcct);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -175,6 +175,14 @@ int32_t mgmtInitVgroups() { ...@@ -175,6 +175,14 @@ int32_t mgmtInitVgroups() {
return 0; return 0;
} }
void mgmtIncVgroupRef(SVgObj *pVgroup) {
return sdbIncRef(tsVgroupSdb, pVgroup);
}
void mgmtDecVgroupRef(SVgObj *pVgroup) {
return sdbDecRef(tsVgroupSdb, pVgroup);
}
SVgObj *mgmtGetVgroup(int32_t vgId) { SVgObj *mgmtGetVgroup(int32_t vgId) {
return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId); return (SVgObj *)sdbGetRow(tsVgroupSdb, &vgId);
} }
...@@ -183,15 +191,7 @@ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) { ...@@ -183,15 +191,7 @@ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) {
return pDb->pHead; return pDb->pHead;
} }
void mgmtCreateVgroup(SQueuedMsg *pMsg) { void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) {
SDbObj *pDb = pMsg->pDb;
if (pDb == NULL) {
mError("failed to create vgroup, db not found");
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_DB);
mgmtFreeQueuedMsg(pMsg);
return;
}
SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj)); SVgObj *pVgroup = (SVgObj *)calloc(1, sizeof(SVgObj));
strcpy(pVgroup->dbName, pDb->name); strcpy(pVgroup->dbName, pDb->name);
pVgroup->numOfVnodes = pDb->cfg.replications; pVgroup->numOfVnodes = pDb->cfg.replications;
...@@ -548,7 +548,6 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { ...@@ -548,7 +548,6 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg));
newMsg->msgType = queueMsg->msgType; newMsg->msgType = queueMsg->msgType;
newMsg->thandle = queueMsg->thandle; newMsg->thandle = queueMsg->thandle;
newMsg->pDb = queueMsg->pDb;
newMsg->pUser = queueMsg->pUser; newMsg->pUser = queueMsg->pUser;
newMsg->contLen = queueMsg->contLen; newMsg->contLen = queueMsg->contLen;
newMsg->pCont = rpcMallocCont(newMsg->contLen); newMsg->pCont = rpcMallocCont(newMsg->contLen);
...@@ -632,7 +631,6 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { ...@@ -632,7 +631,6 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg));
newMsg->msgType = queueMsg->msgType; newMsg->msgType = queueMsg->msgType;
newMsg->thandle = queueMsg->thandle; newMsg->thandle = queueMsg->thandle;
newMsg->pDb = queueMsg->pDb;
newMsg->pUser = queueMsg->pUser; newMsg->pUser = queueMsg->pUser;
newMsg->contLen = queueMsg->contLen; newMsg->contLen = queueMsg->contLen;
newMsg->pCont = rpcMallocCont(newMsg->contLen); newMsg->pCont = rpcMallocCont(newMsg->contLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册