提交 2fd10c2a 编写于 作者: C cademfly

table

上级 28584839
......@@ -177,12 +177,16 @@ typedef enum _mgmt_table {
#define TSDB_ALTER_USER_SYSINFO 0xA
#define TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC 0xB
#define TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC 0xC
#define TSDB_ALTER_USER_ADD_READ_STABLE 0xD
#define TSDB_ALTER_USER_REMOVE_READ_STABLE 0xE
#define TSDB_ALTER_USER_ADD_WRITE_STABLE 0xF
#define TSDB_ALTER_USER_REMOVE_WRITE_STABLE 0x10
#define TSDB_ALTER_USER_ADD_ALL_STABLE 0x11
#define TSDB_ALTER_USER_REMOVE_ALL_STABLE 0x12
#define TSDB_ALTER_USER_ADD_READ_TABLE 0xD
#define TSDB_ALTER_USER_REMOVE_READ_TABLE 0xE
#define TSDB_ALTER_USER_ADD_WRITE_TABLE 0xF
#define TSDB_ALTER_USER_REMOVE_WRITE_TABLE 0x10
#define TSDB_ALTER_USER_ADD_ALL_TABLE 0x11
#define TSDB_ALTER_USER_REMOVE_ALL_TABLE 0x12
#define TSDB_ALTER_USER_ADD_READ_TAG 0x13
#define TSDB_ALTER_USER_REMOVE_READ_TAG 0x14
#define TSDB_ALTER_USER_ADD_WRITE_TAG 0x15
#define TSDB_ALTER_USER_REMOVE_WRITE_TAG 0x16
#define TSDB_ALTER_USER_PRIVILEGES 0x2
......@@ -706,6 +710,8 @@ typedef struct {
SHashObj* createdDbs;
SHashObj* readDbs;
SHashObj* writeDbs;
SHashObj* readTbs;
SHashObj* writeTbs;
} SGetUserAuthRsp;
int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
......
......@@ -280,6 +280,8 @@ typedef struct {
SHashObj* readDbs;
SHashObj* writeDbs;
SHashObj* topics;
SHashObj* readStbs;
SHashObj* writeStbs;
SRWLatch lock;
} SUserObj;
......
......@@ -21,8 +21,9 @@
#include "mndTopic.h"
#include "mndTrans.h"
#include "tbase64.h"
#include "mndStb.h"
#define USER_VER_NUMBER 2
#define USER_VER_NUMBER 3
#define USER_RESERVE_SIZE 64
static int32_t mndCreateDefaultUsers(SMnode *pMnode);
......@@ -124,8 +125,12 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
int32_t numOfReadDbs = taosHashGetSize(pUser->readDbs);
int32_t numOfWriteDbs = taosHashGetSize(pUser->writeDbs);
int32_t numOfReadStbs = taosHashGetSize(pUser->readStbs);
int32_t numOfWriteStbs = taosHashGetSize(pUser->writeStbs);
int32_t numOfTopics = taosHashGetSize(pUser->topics);
int32_t size = sizeof(SUserObj) + USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN +
int32_t size = sizeof(SUserObj) + USER_RESERVE_SIZE +
(numOfReadDbs + numOfWriteDbs ) * TSDB_DB_FNAME_LEN +
(numOfReadStbs + numOfWriteStbs) * TSDB_TABLE_FNAME_LEN +
numOfTopics * TSDB_TOPIC_FNAME_LEN;
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, USER_VER_NUMBER, size);
......@@ -145,6 +150,8 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SDB_SET_INT32(pRaw, dataPos, numOfReadDbs, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfWriteDbs, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfTopics, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfReadStbs, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfWriteStbs, _OVER)
char *db = taosHashIterate(pUser->readDbs, NULL);
while (db != NULL) {
......@@ -164,6 +171,18 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
topic = taosHashIterate(pUser->topics, topic);
}
char *stb = taosHashIterate(pUser->readStbs, NULL);
while (stb != NULL) {
SDB_SET_BINARY(pRaw, dataPos, stb, TSDB_TABLE_FNAME_LEN, _OVER);
stb = taosHashIterate(pUser->readStbs, stb);
}
stb = taosHashIterate(pUser->writeStbs, NULL);
while (stb != NULL) {
SDB_SET_BINARY(pRaw, dataPos, stb, TSDB_TABLE_FNAME_LEN, _OVER);
stb = taosHashIterate(pUser->writeStbs, stb);
}
SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
......@@ -188,7 +207,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != 1 && sver != 2) {
if (sver != 1 && sver != 2 && sver != 3) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
......@@ -214,16 +233,25 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
int32_t numOfReadDbs = 0;
int32_t numOfWriteDbs = 0;
int32_t numOfTopics = 0;
int32_t numOfReadStbs = 0;
int32_t numOfWriteStbs = 0;
SDB_GET_INT32(pRaw, dataPos, &numOfReadDbs, _OVER)
SDB_GET_INT32(pRaw, dataPos, &numOfWriteDbs, _OVER)
if (sver >= 2) {
SDB_GET_INT32(pRaw, dataPos, &numOfTopics, _OVER)
}
if(sver >= 3){
SDB_GET_INT32(pRaw, dataPos, &numOfReadStbs, _OVER)
SDB_GET_INT32(pRaw, dataPos, &numOfWriteStbs, _OVER)
}
pUser->readDbs = taosHashInit(numOfReadDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pUser->writeDbs =
taosHashInit(numOfWriteDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pUser->topics = taosHashInit(numOfTopics, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pUser->readStbs = taosHashInit(numOfReadStbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pUser->writeStbs =
taosHashInit(numOfWriteStbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pUser->readDbs == NULL || pUser->writeDbs == NULL || pUser->topics == NULL) goto _OVER;
for (int32_t i = 0; i < numOfReadDbs; ++i) {
......@@ -249,6 +277,22 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
}
}
if(sver >= 3){
for (int32_t i = 0; i < numOfReadStbs; ++i) {
char stb[TSDB_TABLE_FNAME_LEN] = {0};
SDB_GET_BINARY(pRaw, dataPos, stb, TSDB_TABLE_FNAME_LEN, _OVER)
int32_t len = strlen(stb) + 1;
taosHashPut(pUser->readStbs, stb, len, stb, TSDB_DB_FNAME_LEN);
}
for (int32_t i = 0; i < numOfWriteStbs; ++i) {
char stb[TSDB_TABLE_FNAME_LEN] = {0};
SDB_GET_BINARY(pRaw, dataPos, stb, TSDB_TABLE_FNAME_LEN, _OVER)
int32_t len = strlen(stb) + 1;
taosHashPut(pUser->writeStbs, stb, len, stb, TSDB_TABLE_FNAME_LEN);
}
}
SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pUser->lock);
......@@ -261,6 +305,8 @@ _OVER:
taosHashCleanup(pUser->readDbs);
taosHashCleanup(pUser->writeDbs);
taosHashCleanup(pUser->topics);
taosHashCleanup(pUser->readStbs);
taosHashCleanup(pUser->writeStbs);
}
taosMemoryFreeClear(pRow);
return NULL;
......@@ -293,6 +339,8 @@ static int32_t mndUserDupObj(SUserObj *pUser, SUserObj *pNew) {
taosRLockLatch(&pUser->lock);
pNew->readDbs = mndDupDbHash(pUser->readDbs);
pNew->writeDbs = mndDupDbHash(pUser->writeDbs);
pNew->readStbs = mndDupTopicHash(pUser->readStbs);
pNew->writeStbs = mndDupTopicHash(pUser->writeStbs);
pNew->topics = mndDupTopicHash(pUser->topics);
taosRUnLockLatch(&pUser->lock);
......@@ -306,9 +354,13 @@ static void mndUserFreeObj(SUserObj *pUser) {
taosHashCleanup(pUser->readDbs);
taosHashCleanup(pUser->writeDbs);
taosHashCleanup(pUser->topics);
taosHashCleanup(pUser->readStbs);
taosHashCleanup(pUser->writeStbs);
pUser->readDbs = NULL;
pUser->writeDbs = NULL;
pUser->topics = NULL;
pUser->readStbs = NULL;
pUser->writeStbs = NULL;
}
static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
......@@ -328,6 +380,8 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
TSWAP(pOld->readDbs, pNew->readDbs);
TSWAP(pOld->writeDbs, pNew->writeDbs);
TSWAP(pOld->topics, pNew->topics);
TSWAP(pOld->readStbs, pNew->readStbs);
TSWAP(pOld->writeStbs, pNew->writeStbs);
taosWUnLockLatch(&pOld->lock);
return 0;
......@@ -637,6 +691,60 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
}
}
if (alterReq.alterType == TSDB_ALTER_USER_ADD_READ_TABLE || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_TABLE) {
if (strcmp(alterReq.tabName, "1.*") != 0) {
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", alterReq.objname, alterReq.tabName);
int32_t len = strlen(tbFName) + 1;
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseStb(pMnode, pStb);
goto _OVER;
}
if (taosHashPut(newUser.readStbs, tbFName, len, tbFName, TSDB_TABLE_NAME_LEN) != 0) {
mndReleaseStb(pMnode, pStb);
goto _OVER;
}
} else {
while (1) {
SStbObj *pStb = NULL;
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
if (pIter == NULL) break;
int32_t len = strlen(pStb->name) + 1;
taosHashPut(newUser.readStbs, pStb->name, len, pStb->name, TSDB_TABLE_NAME_LEN);
sdbRelease(pSdb, pStb);
}
}
}
if (alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_TABLE || alterReq.alterType == TSDB_ALTER_USER_ADD_ALL_TABLE) {
if (strcmp(alterReq.tabName, "1.*") != 0) {
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", alterReq.objname, alterReq.tabName);
int32_t len = strlen(tbFName) + 1;
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseStb(pMnode, pStb);
goto _OVER;
}
if (taosHashPut(newUser.writeStbs, tbFName, len, tbFName, TSDB_TABLE_NAME_LEN) != 0) {
mndReleaseStb(pMnode, pStb);
goto _OVER;
}
} else {
while (1) {
SStbObj *pStb = NULL;
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
if (pIter == NULL) break;
int32_t len = strlen(pStb->name) + 1;
taosHashPut(newUser.writeStbs, pStb->name, len, pStb->name, TSDB_DB_FNAME_LEN);
sdbRelease(pSdb, pStb);
}
}
}
if (alterReq.alterType == TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC) {
int32_t len = strlen(alterReq.objname) + 1;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, alterReq.objname);
......@@ -917,6 +1025,52 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
db = taosHashIterate(pUser->writeDbs, db);
}
char *stb = taosHashIterate(pUser->readStbs, NULL);
while (stb != NULL) {
cols = 0;
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)userName, false);
char privilege[20] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(privilege, "read", pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)privilege, false);
char objName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
mndExtractTbNameFromStbFullName(stb, &objName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
varDataSetLen(objName, strlen(&objName[VARSTR_HEADER_SIZE]));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)objName, false);
numOfRows++;
stb = taosHashIterate(pUser->readStbs, stb);
}
stb = taosHashIterate(pUser->writeStbs, NULL);
while (stb != NULL) {
cols = 0;
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)userName, false);
char privilege[20] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(privilege, "write", pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)privilege, false);
char objName[TSDB_TABLE_NAME_LEN] = {0};
mndExtractTbNameFromStbFullName(stb, &objName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
varDataSetLen(objName, strlen(&objName[VARSTR_HEADER_SIZE]));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)objName, false);
numOfRows++;
stb = taosHashIterate(pUser->writeStbs, stb);
}
char *topic = taosHashIterate(pUser->topics, NULL);
while (topic != NULL) {
cols = 0;
......@@ -1039,6 +1193,8 @@ int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) {
if (inRead || inWrite) {
(void)taosHashRemove(newUser.readDbs, db, len);
(void)taosHashRemove(newUser.writeDbs, db, len);
(void)taosHashRemove(newUser.readStbs, db, len);
(void)taosHashRemove(newUser.writeStbs, db, len);
SSdbRaw *pCommitRaw = mndUserActionEncode(&newUser);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册