未验证 提交 0f051171 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18603 from taosdata/fix/TD-20848

enh: add topic privilege in mnode
......@@ -120,6 +120,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_VNODES,
TSDB_MGMT_TABLE_APPS,
TSDB_MGMT_TABLE_STREAM_TASKS,
TSDB_MGMT_TABLE_PRIVILEGES,
TSDB_MGMT_TABLE_MAX,
} EShowType;
......
......@@ -71,6 +71,9 @@ typedef enum {
MND_OPER_READ_DB,
MND_OPER_READ_OR_WRITE_DB,
MND_OPER_SHOW_VARIBALES,
MND_OPER_SUBSCRIBE,
MND_OPER_CREATE_TOPIC,
MND_OPER_DROP_TOPIC,
} EOperType;
typedef enum {
......@@ -273,6 +276,7 @@ typedef struct {
int32_t authVersion;
SHashObj* readDbs;
SHashObj* writeDbs;
SHashObj* topics;
SRWLatch lock;
} SUserObj;
......
......@@ -28,6 +28,8 @@ void mndCleanupPrivilege(SMnode *pMnode);
int32_t mndCheckOperPrivilege(SMnode *pMnode, const char *user, EOperType operType);
int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType, SDbObj *pDb);
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname);
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic);
int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName);
int32_t mndCheckShowPrivilege(SMnode *pMnode, const char *user, EShowType showType, const char *dbname);
int32_t mndCheckAlterUserPrivilege(SUserObj *pOperUser, SUserObj *pUser, SAlterUserReq *pAlter);
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp);
......
......@@ -25,7 +25,7 @@ extern "C" {
int32_t mndInitTopic(SMnode *pMnode);
void mndCleanupTopic(SMnode *pMnode);
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName);
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, const char *topicName);
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic);
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
......
......@@ -147,6 +147,8 @@ _OVER:
static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SAcctObj *pAcct = NULL;
SSdbRow *pRow = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
......@@ -156,10 +158,10 @@ static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw) {
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SAcctObj));
pRow = sdbAllocRow(sizeof(SAcctObj));
if (pRow == NULL) goto _OVER;
SAcctObj *pAcct = sdbGetRowObj(pRow);
pAcct = sdbGetRowObj(pRow);
if (pAcct == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -186,7 +188,7 @@ static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw) {
_OVER:
if (terrno != 0) {
mError("acct:%s, failed to decode from raw:%p since %s", pAcct->acct, pRaw, terrstr());
mError("acct:%s, failed to decode from raw:%p since %s", pAcct == NULL ? "null" : pAcct->acct, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......
......@@ -157,6 +157,8 @@ _OVER:
static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SClusterObj *pCluster = NULL;
SSdbRow *pRow = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
......@@ -166,10 +168,10 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SClusterObj));
pRow = sdbAllocRow(sizeof(SClusterObj));
if (pRow == NULL) goto _OVER;
SClusterObj *pCluster = sdbGetRowObj(pRow);
pCluster = sdbGetRowObj(pRow);
if (pCluster == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -184,7 +186,8 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
_OVER:
if (terrno != 0) {
mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster->id, pRaw, terrstr());
mError("cluster:%" PRId64 ", failed to decode from raw:%p since %s", pCluster == NULL ? 0 : pCluster->id, pRaw,
terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......
......@@ -712,7 +712,9 @@ CM_ENCODE_OVER:
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
SSdbRow *pRow = NULL;
SMqConsumerObj *pConsumer = NULL;
void *buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;
......@@ -722,10 +724,10 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
goto CM_DECODE_OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj));
pRow = sdbAllocRow(sizeof(SMqConsumerObj));
if (pRow == NULL) goto CM_DECODE_OVER;
SMqConsumerObj *pConsumer = sdbGetRowObj(pRow);
pConsumer = sdbGetRowObj(pRow);
if (pConsumer == NULL) goto CM_DECODE_OVER;
int32_t dataPos = 0;
......@@ -745,7 +747,8 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
CM_DECODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("consumer:%" PRId64 ", failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
mError("consumer:%" PRId64 ", failed to decode from raw:%p since %s", pConsumer == NULL ? 0 : pConsumer->consumerId,
pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......
......@@ -147,6 +147,8 @@ _OVER:
static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SDbObj *pDb = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
......@@ -156,10 +158,10 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SDbObj));
pRow = sdbAllocRow(sizeof(SDbObj));
if (pRow == NULL) goto _OVER;
SDbObj *pDb = sdbGetRowObj(pRow);
pDb = sdbGetRowObj(pRow);
if (pDb == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -232,7 +234,7 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
_OVER:
if (terrno != 0) {
mError("db:%s, failed to decode from raw:%p since %s", pDb->name, pRaw, terrstr());
mError("db:%s, failed to decode from raw:%p since %s", pDb == NULL ? "null" : pDb->name, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......
......@@ -154,8 +154,9 @@ _OVER:
}
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
SSdbRow *pRow = NULL;
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SDnodeObj *pDnode = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
......@@ -166,7 +167,8 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
pRow = sdbAllocRow(sizeof(SDnodeObj));
if (pRow == NULL) goto _OVER;
SDnodeObj *pDnode = sdbGetRowObj(pRow);
pDnode = sdbGetRowObj(pRow);
if (pDnode == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -181,7 +183,7 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
_OVER:
if (terrno != 0) {
mError("dnode:%d, failed to decode from raw:%p since %s", pDnode->id, pRaw, terrstr());
mError("dnode:%d, failed to decode from raw:%p since %s", pDnode == NULL ? 0 : pDnode->id, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......
......@@ -101,6 +101,8 @@ _OVER:
static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SFuncObj *pFunc = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
......@@ -110,10 +112,10 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SFuncObj));
pRow = sdbAllocRow(sizeof(SFuncObj));
if (pRow == NULL) goto _OVER;
SFuncObj *pFunc = sdbGetRowObj(pRow);
pFunc = sdbGetRowObj(pRow);
if (pFunc == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -148,7 +150,7 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
_OVER:
if (terrno != 0) {
mError("func:%s, failed to decode from raw:%p since %s", pFunc->name, pRaw, terrstr());
mError("func:%s, failed to decode from raw:%p since %s", pFunc == NULL ? "null" : pFunc->name, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......
......@@ -142,6 +142,8 @@ _OVER:
static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SMnodeObj *pObj = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
......@@ -151,10 +153,10 @@ static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj));
pRow = sdbAllocRow(sizeof(SMnodeObj));
if (pRow == NULL) goto _OVER;
SMnodeObj *pObj = sdbGetRowObj(pRow);
pObj = sdbGetRowObj(pRow);
if (pObj == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -167,7 +169,7 @@ static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
_OVER:
if (terrno != 0) {
mError("mnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
mError("mnode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......
......@@ -28,6 +28,10 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType
int32_t mndCheckDbPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *dbname) {
return 0;
}
int32_t mndCheckTopicPrivilege(SMnode *pMnode, const char *user, EOperType operType, SMqTopicObj *pTopic) { return 0; }
int32_t mndCheckTopicPrivilegeByName(SMnode *pMnode, const char *user, EOperType operType, const char *topicName) {
return 0;
}
int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp) {
memcpy(pRsp->user, pUser->user, TSDB_USER_LEN);
pRsp->superAuth = 1;
......
......@@ -100,6 +100,8 @@ _OVER:
static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SQnodeObj *pObj = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
......@@ -109,10 +111,10 @@ static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw) {
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SQnodeObj));
pRow = sdbAllocRow(sizeof(SQnodeObj));
if (pRow == NULL) goto _OVER;
SQnodeObj *pObj = sdbGetRowObj(pRow);
pObj = sdbGetRowObj(pRow);
if (pObj == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -125,7 +127,7 @@ static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw) {
_OVER:
if (terrno != 0) {
mError("qnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
mError("qnode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......
......@@ -132,6 +132,8 @@ _OVER:
static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SSmaObj *pSma = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
......@@ -141,10 +143,10 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SSmaObj));
pRow = sdbAllocRow(sizeof(SSmaObj));
if (pRow == NULL) goto _OVER;
SSmaObj *pSma = sdbGetRowObj(pRow);
pSma = sdbGetRowObj(pRow);
if (pSma == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -200,7 +202,7 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
_OVER:
if (terrno != 0) {
mError("sma:%s, failed to decode from raw:%p since %s", pSma->name, pRaw, terrstr());
mError("sma:%s, failed to decode from raw:%p since %s", pSma == NULL ? "null" : pSma->name, pRaw, terrstr());
taosMemoryFreeClear(pSma->expr);
taosMemoryFreeClear(pSma->tagsFilter);
taosMemoryFreeClear(pSma->sql);
......
......@@ -105,6 +105,8 @@ _OVER:
static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SSnodeObj *pObj = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
......@@ -114,10 +116,10 @@ static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw) {
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SSnodeObj));
pRow = sdbAllocRow(sizeof(SSnodeObj));
if (pRow == NULL) goto _OVER;
SSnodeObj *pObj = sdbGetRowObj(pRow);
pObj = sdbGetRowObj(pRow);
if (pObj == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -130,7 +132,7 @@ static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw) {
_OVER:
if (terrno != 0) {
mError("snode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
mError("snode:%d, failed to decode from raw:%p since %s", pObj == NULL ? 0 : pObj->id, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......
......@@ -162,6 +162,8 @@ _OVER:
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SStbObj *pStb = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
......@@ -171,10 +173,10 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SStbObj));
pRow = sdbAllocRow(sizeof(SStbObj));
if (pRow == NULL) goto _OVER;
SStbObj *pStb = sdbGetRowObj(pRow);
pStb = sdbGetRowObj(pRow);
if (pStb == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -254,10 +256,12 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
_OVER:
if (terrno != 0) {
mError("stb:%s, failed to decode from raw:%p since %s", pStb->name, pRaw, terrstr());
taosMemoryFreeClear(pStb->pColumns);
taosMemoryFreeClear(pStb->pTags);
taosMemoryFreeClear(pStb->comment);
mError("stb:%s, failed to decode from raw:%p since %s", pStb == NULL ? "null" : pStb->name, pRaw, terrstr());
if (pStb != NULL) {
taosMemoryFreeClear(pStb->pColumns);
taosMemoryFreeClear(pStb->pTags);
taosMemoryFreeClear(pStb->comment);
}
taosMemoryFreeClear(pRow);
return NULL;
}
......
......@@ -120,7 +120,9 @@ STREAM_ENCODE_OVER:
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
SSdbRow *pRow = NULL;
SStreamObj *pStream = NULL;
void *buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto STREAM_DECODE_OVER;
......@@ -130,11 +132,10 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
goto STREAM_DECODE_OVER;
}
int32_t size = sizeof(SStreamObj);
SSdbRow *pRow = sdbAllocRow(size);
pRow = sdbAllocRow(sizeof(SStreamObj));
if (pRow == NULL) goto STREAM_DECODE_OVER;
SStreamObj *pStream = sdbGetRowObj(pRow);
pStream = sdbGetRowObj(pRow);
if (pStream == NULL) goto STREAM_DECODE_OVER;
int32_t tlen;
......@@ -157,7 +158,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
STREAM_DECODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("stream:%s, failed to decode from raw:%p since %s", pStream->name, pRaw, terrstr());
mError("stream:%s, failed to decode from raw:%p since %s", pStream == NULL ? "null" : pStream->name, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......
......@@ -743,7 +743,9 @@ SUB_ENCODE_OVER:
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
SSdbRow *pRow = NULL;
SMqSubscribeObj *pSub = NULL;
void *buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER;
......@@ -753,11 +755,10 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
goto SUB_DECODE_OVER;
}
int32_t size = sizeof(SMqSubscribeObj);
SSdbRow *pRow = sdbAllocRow(size);
pRow = sdbAllocRow(sizeof(SMqSubscribeObj));
if (pRow == NULL) goto SUB_DECODE_OVER;
SMqSubscribeObj *pSub = sdbGetRowObj(pRow);
pSub = sdbGetRowObj(pRow);
if (pSub == NULL) goto SUB_DECODE_OVER;
int32_t dataPos = 0;
......@@ -777,7 +778,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
SUB_DECODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
mError("subscribe:%s, failed to decode from raw:%p since %s", pSub == NULL ? "null" : pSub->key, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......
......@@ -152,8 +152,10 @@ TOPIC_ENCODE_OVER:
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SMqTopicObj *pTopic = NULL;
void *buf = NULL;
void *buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TOPIC_DECODE_OVER;
......@@ -162,11 +164,10 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
goto TOPIC_DECODE_OVER;
}
int32_t size = sizeof(SMqTopicObj);
SSdbRow *pRow = sdbAllocRow(size);
pRow = sdbAllocRow(sizeof(SMqTopicObj));
if (pRow == NULL) goto TOPIC_DECODE_OVER;
SMqTopicObj *pTopic = sdbGetRowObj(pRow);
pTopic = sdbGetRowObj(pRow);
if (pTopic == NULL) goto TOPIC_DECODE_OVER;
int32_t len;
......@@ -251,7 +252,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
TOPIC_DECODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("topic:%s, failed to decode from raw:%p since %s", pTopic->name, pRaw, terrstr());
mError("topic:%s, failed to decode from raw:%p since %s", pTopic == NULL ? "null" : pTopic->name, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......@@ -288,7 +289,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic
return 0;
}
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, const char *topicName) {
SSdb *pSdb = pMnode->pSdb;
SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
......@@ -573,7 +574,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pDb) != 0) {
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_TOPIC) != 0) {
goto _OVER;
}
......@@ -633,6 +634,11 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
}
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC) != 0) {
mndReleaseTopic(pMnode, pTopic);
return -1;
}
void *pIter = NULL;
SMqConsumerObj *pConsumer;
while (1) {
......
......@@ -18,10 +18,11 @@
#include "mndDb.h"
#include "mndPrivilege.h"
#include "mndShow.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "tbase64.h"
#define USER_VER_NUMBER 1
#define USER_VER_NUMBER 2
#define USER_RESERVE_SIZE 64
static int32_t mndCreateDefaultUsers(SMnode *pMnode);
......@@ -36,6 +37,8 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq);
static int32_t mndProcessGetUserAuthReq(SRpcMsg *pReq);
static int32_t mndRetrieveUsers(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextUser(SMnode *pMnode, void *pIter);
static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextPrivileges(SMnode *pMnode, void *pIter);
int32_t mndInitUser(SMnode *pMnode) {
SSdbTable table = {
......@@ -56,6 +59,8 @@ int32_t mndInitUser(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_USER, mndRetrieveUsers);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_USER, mndCancelGetNextUser);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_PRIVILEGES, mndRetrievePrivileges);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_PRIVILEGES, mndCancelGetNextPrivileges);
return sdbSetTable(pMnode->pSdb, table);
}
......@@ -119,7 +124,9 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
int32_t numOfReadDbs = taosHashGetSize(pUser->readDbs);
int32_t numOfWriteDbs = taosHashGetSize(pUser->writeDbs);
int32_t size = sizeof(SUserObj) + USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN;
int32_t numOfTopics = taosHashGetSize(pUser->topics);
int32_t size = sizeof(SUserObj) + USER_RESERVE_SIZE + (numOfReadDbs + numOfWriteDbs) * TSDB_DB_FNAME_LEN +
numOfTopics * TSDB_TOPIC_FNAME_LEN;
SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, USER_VER_NUMBER, size);
if (pRaw == NULL) goto _OVER;
......@@ -137,6 +144,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SDB_SET_INT32(pRaw, dataPos, pUser->authVersion, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfReadDbs, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfWriteDbs, _OVER)
SDB_SET_INT32(pRaw, dataPos, numOfTopics, _OVER)
char *db = taosHashIterate(pUser->readDbs, NULL);
while (db != NULL) {
......@@ -150,6 +158,12 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
db = taosHashIterate(pUser->writeDbs, db);
}
char *topic = taosHashIterate(pUser->topics, NULL);
while (topic != NULL) {
SDB_SET_BINARY(pRaw, dataPos, topic, TSDB_TOPIC_FNAME_LEN, _OVER);
db = taosHashIterate(pUser->topics, topic);
}
SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
......@@ -168,19 +182,21 @@ _OVER:
static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SUserObj *pUser = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != USER_VER_NUMBER) {
if (sver != 1 && sver != 2) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SUserObj));
pRow = sdbAllocRow(sizeof(SUserObj));
if (pRow == NULL) goto _OVER;
SUserObj *pUser = sdbGetRowObj(pRow);
pUser = sdbGetRowObj(pRow);
if (pUser == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -197,12 +213,18 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
int32_t numOfReadDbs = 0;
int32_t numOfWriteDbs = 0;
int32_t numOfTopics = 0;
SDB_GET_INT32(pRaw, dataPos, &numOfReadDbs, _OVER)
SDB_GET_INT32(pRaw, dataPos, &numOfWriteDbs, _OVER)
if (sver >= 2) {
SDB_GET_INT32(pRaw, dataPos, &numOfTopics, _OVER)
}
pUser->readDbs = taosHashInit(numOfReadDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
pUser->writeDbs =
taosHashInit(numOfWriteDbs, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pUser->readDbs == NULL || pUser->writeDbs == NULL) goto _OVER;
pUser->topics = taosHashInit(numOfTopics, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pUser->readDbs == NULL || pUser->writeDbs == NULL || pUser->topics == NULL) goto _OVER;
for (int32_t i = 0; i < numOfReadDbs; ++i) {
char db[TSDB_DB_FNAME_LEN] = {0};
......@@ -218,6 +240,15 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
taosHashPut(pUser->writeDbs, db, len, db, TSDB_DB_FNAME_LEN);
}
if (sver >= 2) {
for (int32_t i = 0; i < numOfTopics; ++i) {
char topic[TSDB_TOPIC_FNAME_LEN] = {0};
SDB_GET_BINARY(pRaw, dataPos, topic, TSDB_TOPIC_FNAME_LEN, _OVER)
int32_t len = strlen(topic) + 1;
taosHashPut(pUser->topics, topic, len, topic, TSDB_TOPIC_FNAME_LEN);
}
}
SDB_GET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)
taosInitRWLatch(&pUser->lock);
......@@ -225,9 +256,12 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
_OVER:
if (terrno != 0) {
mError("user:%s, failed to decode from raw:%p since %s", pUser->user, pRaw, terrstr());
taosHashCleanup(pUser->readDbs);
taosHashCleanup(pUser->writeDbs);
mError("user:%s, failed to decode from raw:%p since %s", pUser == NULL ? "null" : pUser->user, pRaw, terrstr());
if (pUser != NULL) {
taosHashCleanup(pUser->readDbs);
taosHashCleanup(pUser->writeDbs);
taosHashCleanup(pUser->topics);
}
taosMemoryFreeClear(pRow);
return NULL;
}
......@@ -255,8 +289,10 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
mTrace("user:%s, perform delete action, row:%p", pUser->user, pUser);
taosHashCleanup(pUser->readDbs);
taosHashCleanup(pUser->writeDbs);
taosHashCleanup(pUser->topics);
pUser->readDbs = NULL;
pUser->writeDbs = NULL;
pUser->topics = NULL;
return 0;
}
......@@ -270,6 +306,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) {
memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN);
TSWAP(pOld->readDbs, pNew->readDbs);
TSWAP(pOld->writeDbs, pNew->writeDbs);
TSWAP(pOld->topics, pNew->topics);
taosWUnLockLatch(&pOld->lock);
return 0;
......@@ -482,9 +519,10 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
taosRLockLatch(&pUser->lock);
newUser.readDbs = mndDupDbHash(pUser->readDbs);
newUser.writeDbs = mndDupDbHash(pUser->writeDbs);
newUser.topics = mndDupDbHash(pUser->topics);
taosRUnLockLatch(&pUser->lock);
if (newUser.readDbs == NULL || newUser.writeDbs == NULL) {
if (newUser.readDbs == NULL || newUser.writeDbs == NULL || newUser.topics == NULL) {
goto _OVER;
}
......@@ -582,6 +620,26 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
}
}
if (alterReq.alterType == TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC) {
int32_t len = strlen(alterReq.objname) + 1;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, alterReq.objname);
if (pTopic == NULL) {
mndReleaseTopic(pMnode, pTopic);
goto _OVER;
}
taosHashPut(newUser.topics, pTopic->name, len, pTopic->name, TSDB_TOPIC_FNAME_LEN);
}
if (alterReq.alterType == TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC) {
int32_t len = strlen(alterReq.objname) + 1;
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, alterReq.objname);
if (pTopic == NULL) {
mndReleaseTopic(pMnode, pTopic);
goto _OVER;
}
taosHashRemove(newUser.topics, alterReq.objname, len);
}
code = mndAlterUser(pMnode, pUser, &newUser, pReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
......@@ -594,6 +652,7 @@ _OVER:
mndReleaseUser(pMnode, pUser);
taosHashCleanup(newUser.writeDbs);
taosHashCleanup(newUser.readDbs);
taosHashCleanup(newUser.topics);
return code;
}
......@@ -756,6 +815,108 @@ static void mndCancelGetNextUser(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter);
}
static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SUserObj *pUser = NULL;
int32_t cols = 0;
char *pWrite;
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_USER, pShow->pIter, (void **)&pUser);
if (pShow->pIter == NULL) break;
int32_t numOfReadDbs = taosHashGetSize(pUser->readDbs);
int32_t numOfWriteDbs = taosHashGetSize(pUser->writeDbs);
int32_t numOfTopics = taosHashGetSize(pUser->topics);
if (numOfRows + numOfReadDbs + numOfWriteDbs + numOfTopics >= rows) break;
char *db = taosHashIterate(pUser->readDbs, NULL);
while (db != NULL) {
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes);
colDataAppend(pColInfo, numOfRows, (const char *)userName, false);
char privilege[20] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(privilege, "read", pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)privilege, false);
SName name = {0};
char objName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&name, db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&name, varDataVal(objName));
varDataSetLen(objName, strlen(varDataVal(objName)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)objName, false);
numOfRows++;
db = taosHashIterate(pUser->readDbs, db);
}
db = taosHashIterate(pUser->writeDbs, NULL);
while (db != NULL) {
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes);
colDataAppend(pColInfo, numOfRows, (const char *)userName, false);
char privilege[20] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(privilege, "write", pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)privilege, false);
SName name = {0};
char objName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&name, db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&name, varDataVal(objName));
varDataSetLen(objName, strlen(varDataVal(objName)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)objName, false);
numOfRows++;
db = taosHashIterate(pUser->writeDbs, db);
}
char *topic = taosHashIterate(pUser->topics, NULL);
while (topic != NULL) {
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char userName[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(userName, pUser->user, pShow->pMeta->pSchemas[cols].bytes);
colDataAppend(pColInfo, numOfRows, (const char *)userName, false);
char privilege[20] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(privilege, "subscribe", pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)privilege, false);
char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0};
tstrncpy(varDataVal(topicName), mndGetDbStr(topic), TSDB_TOPIC_NAME_LEN - 2);
varDataSetLen(topicName, strlen(varDataVal(topicName)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
numOfRows++;
db = taosHashIterate(pUser->topics, topic);
}
sdbRelease(pSdb, pUser);
}
pShow->numOfRows += numOfRows;
return numOfRows;
}
static void mndCancelGetNextPrivileges(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen) {
SUserAuthBatchRsp batchRsp = {0};
......
......@@ -113,6 +113,8 @@ _OVER:
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SVgObj *pVgroup = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
......@@ -122,10 +124,10 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SVgObj));
pRow = sdbAllocRow(sizeof(SVgObj));
if (pRow == NULL) goto _OVER;
SVgObj *pVgroup = sdbGetRowObj(pRow);
pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) goto _OVER;
int32_t dataPos = 0;
......@@ -152,7 +154,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
_OVER:
if (terrno != 0) {
mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup->vgId, pRaw, terrstr());
mError("vgId:%d, failed to decode from raw:%p since %s", pVgroup == NULL ? 0 : pVgroup->vgId, pRaw, terrstr());
taosMemoryFreeClear(pRow);
return NULL;
}
......
......@@ -5,6 +5,10 @@ sql connect
print =============== create db
sql create database d1 vgroups 1;
sql use d1
sql create table stb (ts timestamp, i int) tags (j int)
sql create topic topic_1 as select ts, i from stb
sql create database d2 vgroups 1;
sql create database d3 vgroups 1;
sql select * from information_schema.ins_databases
......@@ -89,5 +93,6 @@ sql_error drop database d2;
sql_error create stable d1.st (ts timestamp, i int) tags (j int)
sql create stable d2.st (ts timestamp, i int) tags (j int)
sql_error create topic topic_2 as select ts, i from stb
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -62,7 +62,7 @@ class TDTestCase:
# os.system(f"mv {self.getBuildPath()}/build/lib/libtaos.so.1 {self.getBuildPath()}/build/lib/libtaos.so.1_bak ")
packagePath = "/usr/local/src/"
dataPath = cPath + "/../data/"
packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz"
packageTPath = packageName.split("-Linux-")[0]
my_file = Path(f"{packagePath}/{packageName}")
......@@ -71,22 +71,21 @@ class TDTestCase:
os.system(f"cd {packagePath} && wget https://www.tdengine.com/assets-download/3.0/{packageName}")
else:
print(f"{packageName} has been exists")
os.system(f"cd {packagePath} && tar xvf {packageName} && cd {packageTPath} && ./install.sh -e no " )
os.system(f" cd {packagePath} && tar xvf {packageName} && cd {packageTPath} && ./install.sh -e no " )
tdDnodes.stop(1)
print(f"start taosd: nohup taosd -c {cPath} & ")
os.system(f" nohup taosd -c {cPath} & " )
print(f"start taosd: rm -rf {dataPath}/* && nohup taosd -c {cPath} & ")
os.system(f"rm -rf {dataPath}/* && nohup taosd -c {cPath} & " )
sleep(5)
def buildTaosd(self,bPath):
# os.system(f"mv {bPath}/build_bak {bPath}/build ")
os.system(f" cd {bPath} && make install ")
def run(self):
bPath=self.getBuildPath()
cPath=self.getCfgPath()
bPath = self.getBuildPath()
cPath = self.getCfgPath()
dbname = "test"
stb = f"{dbname}.meters"
self.installTaosd(bPath,cPath)
......@@ -94,6 +93,7 @@ class TDTestCase:
tableNumbers=100
recordNumbers1=100
recordNumbers2=1000
# tdsqlF=tdCom.newTdSql()
# print(tdsqlF)
# tdsqlF.query(f"SELECT SERVER_VERSION();")
......@@ -105,6 +105,7 @@ class TDTestCase:
# oldClientVersion=tdsqlF.queryResult[0][0]
# tdLog.info(f"Base client version is {oldClientVersion}")
# baseVersion = "3.0.1.8"
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}")
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册