提交 04b02806 编写于 作者: S Shengliang Guan

enh: add topic privilege in mnode

上级 5998f72b
...@@ -71,6 +71,7 @@ typedef enum { ...@@ -71,6 +71,7 @@ typedef enum {
MND_OPER_READ_DB, MND_OPER_READ_DB,
MND_OPER_READ_OR_WRITE_DB, MND_OPER_READ_OR_WRITE_DB,
MND_OPER_SHOW_VARIBALES, MND_OPER_SHOW_VARIBALES,
MND_OPER_SUBSCRIBE,
} EOperType; } EOperType;
typedef enum { typedef enum {
...@@ -273,6 +274,7 @@ typedef struct { ...@@ -273,6 +274,7 @@ typedef struct {
int32_t authVersion; int32_t authVersion;
SHashObj* readDbs; SHashObj* readDbs;
SHashObj* writeDbs; SHashObj* writeDbs;
SHashObj* topics;
SRWLatch lock; SRWLatch lock;
} SUserObj; } SUserObj;
......
...@@ -28,6 +28,7 @@ void mndCleanupPrivilege(SMnode *pMnode); ...@@ -28,6 +28,7 @@ void mndCleanupPrivilege(SMnode *pMnode);
int32_t mndCheckOperPrivilege(SMnode *pMnode, const char *user, EOperType operType); int32_t mndCheckOperPrivilege(SMnode *pMnode, const char *user, EOperType operType);
int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType, SDbObj *pDb); int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType, SDbObj *pDb);
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname); int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname);
int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname);
int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname); int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname);
int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter); int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp); int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp);
......
...@@ -28,6 +28,9 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType ...@@ -28,6 +28,9 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname) { int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname) {
return 0; return 0;
} }
int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname) {
return 0;
}
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp) { int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp) {
memcpy(pRsp->user, pUser->user, TSDB_USER_LEN); memcpy(pRsp->user, pUser->user, TSDB_USER_LEN);
pRsp->superAuth = 1; pRsp->superAuth = 1;
......
...@@ -18,10 +18,11 @@ ...@@ -18,10 +18,11 @@
#include "mndDb.h" #include "mndDb.h"
#include "mndPrivilege.h" #include "mndPrivilege.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndTopic.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "tbase64.h" #include "tbase64.h"
#define USER_VER_NUMBER 1 #define USER_VER_NUMBER 2
#define USER_RESERVE_SIZE 64 #define USER_RESERVE_SIZE 64
static int32_t mndCreateDefaultUsers(SMnode *pMnode); static int32_t mndCreateDefaultUsers(SMnode *pMnode);
...@@ -36,6 +37,8 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq); ...@@ -36,6 +37,8 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq);
static int32_t mndProcessGetUserAuthReq(SRpcMsg *pReq); static int32_t mndProcessGetUserAuthReq(SRpcMsg *pReq);
static int32_t mndRetrieveUsers(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveUsers(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter); static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextPrivileges(SMnode *pMnode, void *pIter);
int32_t mndInitUser(SMnode *pMnode) { int32_t mndInitUser(SMnode *pMnode) {
SSdbTable table = { SSdbTable table = {
...@@ -119,7 +122,9 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { ...@@ -119,7 +122,9 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
int32_t numOfReadDbs = taosHashGetSize(pUser->readDbs); int32_t numOfReadDbs = taosHashGetSize(pUser->readDbs);
int32_t numOfWriteDbs = taosHashGetSize(pUser->writeDbs); int32_t numOfWriteDbs = taosHashGetSize(pUser->writeDbs);
int32_t size = sizeof(SUserObj) + USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN; int32_t numOfTopics = taosHashGetSize(pUser->topics);
int32_t size = sizeof(SUserObj) + USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN +
numOfTopics * TSDB_TOPIC_FNAME_LEN;
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, USER_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, USER_VER_NUMBER, size);
if (pRaw == NULL) goto _OVER; if (pRaw == NULL) goto _OVER;
...@@ -137,6 +142,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { ...@@ -137,6 +142,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SDB_SET_INT32(pRaw, dataPos, pUser->authVersion, _OVER) SDB_SET_INT32(pRaw, dataPos, pUser->authVersion, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfReadDbs, _OVER) SDB_SET_INT32(pRaw, dataPos, numOfReadDbs, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfWriteDbs, _OVER) SDB_SET_INT32(pRaw, dataPos, numOfWriteDbs, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfTopics, _OVER)
char *db = taosHashIterate(pUser->readDbs, NULL); char *db = taosHashIterate(pUser->readDbs, NULL);
while (db != NULL) { while (db != NULL) {
...@@ -150,6 +156,12 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { ...@@ -150,6 +156,12 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
db = taosHashIterate(pUser->writeDbs, db); db = taosHashIterate(pUser->writeDbs, db);
} }
char *topic = taosHashIterate(pUser->topics, NULL);
while (topic != NULL) {
SDB_SET_BINARY(pRaw, dataPos, topic, TSDB_TOPIC_FNAME_LEN, _OVER);
db = taosHashIterate(pUser->topics, topic);
}
SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER) SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER)
...@@ -172,7 +184,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { ...@@ -172,7 +184,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != USER_VER_NUMBER) { if (sver != 1 && sver != 2) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER; goto _OVER;
} }
...@@ -197,12 +209,18 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { ...@@ -197,12 +209,18 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
int32_t numOfReadDbs = 0; int32_t numOfReadDbs = 0;
int32_t numOfWriteDbs = 0; int32_t numOfWriteDbs = 0;
int32_t numOfTopics = 0;
SDB_GET_INT32(pRaw, dataPos, &numOfReadDbs, _OVER) SDB_GET_INT32(pRaw, dataPos, &numOfReadDbs, _OVER)
SDB_GET_INT32(pRaw, dataPos, &numOfWriteDbs, _OVER) SDB_GET_INT32(pRaw, dataPos, &numOfWriteDbs, _OVER)
if (sver >= 2) {
SDB_GET_INT32(pRaw, dataPos, &numOfTopics, _OVER)
}
pUser->readDbs = taosHashInit(numOfReadDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); pUser->readDbs = taosHashInit(numOfReadDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pUser->writeDbs = pUser->writeDbs =
taosHashInit(numOfWriteDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); taosHashInit(numOfWriteDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pUser->readDbs == NULL || pUser->writeDbs == NULL) goto _OVER; pUser->topics = taosHashInit(numOfTopics, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pUser->readDbs == NULL || pUser->writeDbs == NULL || pUser->topics == NULL) goto _OVER;
for (int32_t i = 0; i < numOfReadDbs; ++i) { for (int32_t i = 0; i < numOfReadDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0}; char db[TSDB_DB_FNAME_LEN] = {0};
...@@ -218,6 +236,15 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { ...@@ -218,6 +236,15 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
taosHashPut(pUser->writeDbs, db, len, db, TSDB_DB_FNAME_LEN); taosHashPut(pUser->writeDbs, db, len, db, TSDB_DB_FNAME_LEN);
} }
if (sver >= 2) {
for (int32_t i = 0; i < numOfTopics; ++i) {
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
SDB_GET_BINARY(pRaw, dataPos, topic, TSDB_TOPIC_FNAME_LEN, _OVER)
int32_t len = strlen(topic) + 1;
taosHashPut(pUser->topics, topic, len, topic, TSDB_TOPIC_FNAME_LEN);
}
}
SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER) SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pUser->lock); taosInitRWLatch(&pUser->lock);
...@@ -228,6 +255,7 @@ _OVER: ...@@ -228,6 +255,7 @@ _OVER:
mError("user:%s, failed to decode from raw:%p since %s", pUser->user, pRaw, terrstr()); mError("user:%s, failed to decode from raw:%p since %s", pUser->user, pRaw, terrstr());
taosHashCleanup(pUser->readDbs); taosHashCleanup(pUser->readDbs);
taosHashCleanup(pUser->writeDbs); taosHashCleanup(pUser->writeDbs);
taosHashCleanup(pUser->topics);
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
return NULL; return NULL;
} }
...@@ -255,8 +283,10 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { ...@@ -255,8 +283,10 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
mTrace("user:%s, perform delete action, row:%p", pUser->user, pUser); mTrace("user:%s, perform delete action, row:%p", pUser->user, pUser);
taosHashCleanup(pUser->readDbs); taosHashCleanup(pUser->readDbs);
taosHashCleanup(pUser->writeDbs); taosHashCleanup(pUser->writeDbs);
taosHashCleanup(pUser->topics);
pUser->readDbs = NULL; pUser->readDbs = NULL;
pUser->writeDbs = NULL; pUser->writeDbs = NULL;
pUser->topics = NULL;
return 0; return 0;
} }
...@@ -270,6 +300,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) { ...@@ -270,6 +300,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN);
TSWAP(pOld->readDbs, pNew->readDbs); TSWAP(pOld->readDbs, pNew->readDbs);
TSWAP(pOld->writeDbs, pNew->writeDbs); TSWAP(pOld->writeDbs, pNew->writeDbs);
TSWAP(pOld->topics, pNew->topics);
taosWUnLockLatch(&pOld->lock); taosWUnLockLatch(&pOld->lock);
return 0; return 0;
...@@ -482,9 +513,10 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { ...@@ -482,9 +513,10 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
taosRLockLatch(&pUser->lock); taosRLockLatch(&pUser->lock);
newUser.readDbs = mndDupDbHash(pUser->readDbs); newUser.readDbs = mndDupDbHash(pUser->readDbs);
newUser.writeDbs = mndDupDbHash(pUser->writeDbs); newUser.writeDbs = mndDupDbHash(pUser->writeDbs);
newUser.topics = mndDupDbHash(pUser->topics);
taosRUnLockLatch(&pUser->lock); taosRUnLockLatch(&pUser->lock);
if (newUser.readDbs == NULL || newUser.writeDbs == NULL) { if (newUser.readDbs == NULL || newUser.writeDbs == NULL || newUser.topics == NULL) {
goto _OVER; goto _OVER;
} }
...@@ -582,6 +614,26 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { ...@@ -582,6 +614,26 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
} }
} }
if (alterReq.alterType == TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC) {
int32_t len = strlen(alterReq.objname) + 1;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, alterReq.objname);
if (pTopic == NULL) {
mndReleaseTopic(pMnode, pTopic);
goto _OVER;
}
taosHashPut(newUser.topics, pTopic->name, len, pTopic->name, TSDB_TOPIC_FNAME_LEN);
}
if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC) {
int32_t len = strlen(alterReq.objname) + 1;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, alterReq.objname);
if (pTopic == NULL) {
mndReleaseTopic(pMnode, pTopic);
goto _OVER;
}
taosHashRemove(newUser.topics, alterReq.objname, len);
}
code = mndAlterUser(pMnode, pUser, &newUser, pReq); code = mndAlterUser(pMnode, pUser, &newUser, pReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
...@@ -594,6 +646,7 @@ _OVER: ...@@ -594,6 +646,7 @@ _OVER:
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
taosHashCleanup(newUser.writeDbs); taosHashCleanup(newUser.writeDbs);
taosHashCleanup(newUser.readDbs); taosHashCleanup(newUser.readDbs);
taosHashCleanup(newUser.topics);
return code; return code;
} }
...@@ -756,6 +809,10 @@ static void mndCancelGetNextUser(SMnode *pMnode, void *pIter) { ...@@ -756,6 +809,10 @@ static void mndCancelGetNextUser(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { return 0; }
static void mndCancelGetNextPrivileges(SMnode *pMnode, void *pIter) {}
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp, int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen) { int32_t *pRspLen) {
SUserAuthBatchRsp batchRsp = {0}; SUserAuthBatchRsp batchRsp = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册