提交 585d95e9 编写于 作者: S Shengliang Guan

[TD-570] add user/db into sdb write queue

上级 357384f0
......@@ -20,6 +20,8 @@
extern "C" {
#endif
struct SMnodeMsg;
typedef enum {
SDB_TABLE_DNODE = 0,
SDB_TABLE_MNODE = 1,
......@@ -48,10 +50,11 @@ typedef struct {
ESdbOper type;
void * table;
void * pObj;
void * pMnodeMsg;
void * rowData;
int32_t rowSize;
int32_t retCode; // for callback in sdb queue
void (*cb)(struct SMnodeMsg *pMsg, int32_t code);
struct SMnodeMsg *pMsg;
} SSdbOper;
typedef struct {
......
......@@ -28,6 +28,7 @@ void * mnodeGetNextUser(void *pIter, SUserObj **pUser);
void mnodeIncUserRef(SUserObj *pUser);
void mnodeDecUserRef(SUserObj *pUser);
SUserObj *mnodeGetUserFromConn(void *pConn);
char * mnodeGetUserFromMsg(void *pMnodeMsg);
int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg);
void mnodeDropAllUsers(SAcctObj *pAcct);
......
......@@ -41,7 +41,7 @@
static void * tsDbSdb = NULL;
static int32_t tsDbUpdateSize;
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate);
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMsg);
static int32_t mnodeDropDb(SMnodeMsg *newMsg);
static int32_t mnodeSetDbDropping(SDbObj *pDb);
static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
......@@ -308,7 +308,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->replications < 0) pCfg->replications = tsReplications;
}
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMsg) {
int32_t code = acctCheck(pAcct, ACCT_GRANT_DB);
if (code != 0) return code;
......@@ -361,15 +361,17 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
.table = tsDbSdb,
.pObj = pDb,
.rowSize = sizeof(SDbObj),
.pMsg = pMsg
};
code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
tfree(pDb);
code = TSDB_CODE_MND_SDB_ERROR;
return code;
} else {
mLPrint("db:%s, is created by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
}
bool mnodeCheckIsMonitorDB(char *db, char *monitordb) {
......@@ -768,12 +770,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) {
} else if (!pMsg->pUser->writeAuth) {
code = TSDB_CODE_MND_NO_RIGHTS;
} else {
code = mnodeCreateDb(pMsg->pUser->pAcct, pCreate);
if (code == TSDB_CODE_SUCCESS) {
mLPrint("db:%s, is created by %s", pCreate->db, pMsg->pUser->user);
} else {
mError("db:%s, failed to create, reason:%s", pCreate->db, tstrerror(code));
}
code = mnodeCreateDb(pMsg->pUser->pAcct, pCreate, pMsg);
}
return code;
......@@ -890,7 +887,29 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
return newCfg;
}
static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
static void mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) return;
SDbObj *pDb = pMsg->pDb;
void *pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (pVgroup->pDb == pDb) {
mnodeSendCreateVgroupMsg(pVgroup, NULL);
}
mnodeDecVgroupRef(pVgroup);
}
sdbFreeIter(pIter);
mTrace("db:%s, all vgroups is altered", pDb->name);
mLPrint("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
balanceNotify();
}
static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter, void *pMsg) {
SDbCfg newCfg = mnodeGetAlterDbOption(pDb, pAlter);
if (terrno != TSDB_CODE_SUCCESS) {
return terrno;
......@@ -901,37 +920,25 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
return code;
}
int32_t oldReplica = pDb->cfg.replications;
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
pDb->cfg = newCfg;
pDb->cfgVersion++;
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.table = tsDbSdb,
.pObj = pDb
.pObj = pDb,
.pMsg = pMsg,
.cb = mnodeAlterDbCb
};
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_SDB_ERROR;
return code;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
}
void *pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
mnodeSendCreateVgroupMsg(pVgroup, NULL);
mnodeDecVgroupRef(pVgroup);
}
sdbFreeIter(pIter);
if (oldReplica != pDb->cfg.replications) {
balanceNotify();
}
return TSDB_CODE_SUCCESS;
}
......@@ -945,14 +952,7 @@ static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) {
return TSDB_CODE_MND_INVALID_DB;
}
int32_t code = mnodeAlterDb(pMsg->pDb, pAlter);
if (code != TSDB_CODE_SUCCESS) {
mError("db:%s, failed to alter, invalid db option", pAlter->db);
return code;
}
mTrace("db:%s, all vgroups is altered", pMsg->pDb->name);
return TSDB_CODE_SUCCESS;
return mnodeAlterDb(pMsg->pDb, pAlter, pMsg);
}
static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
......@@ -960,16 +960,19 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
mPrint("db:%s, drop db from sdb", pDb->name);
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.table = tsDbSdb,
.pObj = pDb
.pObj = pDb,
.pMsg = pMsg
};
int32_t code = sdbDeleteRow(&oper);
if (code != 0) {
code = TSDB_CODE_MND_SDB_ERROR;
if (code != TSDB_CODE_SUCCESS) {
return code;
} else {
mLPrint("db:%s, is dropped by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
}
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
......
......@@ -975,7 +975,10 @@ static void *sdbWorkerFp(void *param) {
taosGetQitem(tsSdbWriteQall, &type, &item);
if (type == TAOS_QTYPE_RPC) {
pOper = (SSdbOper *)item;
dnodeSendRpcMnodeWriteRsp(pOper->pMnodeMsg, pOper->retCode);
if (pOper->cb) {
(*pOper->cb)(pOper->pMsg, pOper->retCode);
}
dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode);
}
taosFreeQitem(item);
}
......
......@@ -172,19 +172,21 @@ void mnodeDecUserRef(SUserObj *pUser) {
return sdbDecRef(tsUserSdb, pUser);
}
static int32_t mnodeUpdateUser(SUserObj *pUser) {
static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) {
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.table = tsUserSdb,
.pObj = pUser
.pObj = pUser,
.pMsg = pMsg
};
int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_SDB_ERROR;
return code;
} else {
mLPrint("user:%s, is altered by %s", pUser->user, mnodeGetUserFromMsg(pMsg));
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
}
int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
......@@ -225,11 +227,11 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
}
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsUserSdb,
.pObj = pUser,
.type = SDB_OPER_GLOBAL,
.table = tsUserSdb,
.pObj = pUser,
.rowSize = sizeof(SUserObj),
.pMnodeMsg = pMsg
.pMsg = pMsg
};
code = sdbInsertRow(&oper);
......@@ -237,23 +239,26 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
tfree(pUser);
return code;
} else {
mLPrint("user:%s, is created by %s", pUser->user, mnodeGetUserFromMsg(pMsg));
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
}
static int32_t mnodeDropUser(SUserObj *pUser) {
static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) {
SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.type = SDB_OPER_GLOBAL,
.table = tsUserSdb,
.pObj = pUser
.pObj = pUser,
.pMsg = pMsg
};
int32_t code = sdbDeleteRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_SDB_ERROR;
return code;
} else {
mLPrint("user:%s, is dropped by %s", pUser->user, mnodeGetUserFromMsg(pMsg));
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
}
static int32_t mnodeGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
......@@ -361,22 +366,25 @@ SUserObj *mnodeGetUserFromConn(void *pConn) {
}
}
char *mnodeGetUserFromMsg(void *pMsg) {
SMnodeMsg *pMnodeMsg = pMsg;
if (pMnodeMsg != NULL &&pMnodeMsg->pUser != NULL) {
return pMnodeMsg->pUser->user;
} else {
return "system";
}
}
static int32_t mnodeProcessCreateUserMsg(SMnodeMsg *pMsg) {
int32_t code;
SUserObj *pOperUser = pMsg->pUser;
if (pOperUser->superAuth) {
SCMCreateUserMsg *pCreate = pMsg->rpcMsg.pCont;
code = mnodeCreateUser(pOperUser->pAcct, pCreate->user, pCreate->pass, pMsg);
if (code == TSDB_CODE_SUCCESS) {
mLPrint("user:%s, is created by %s", pCreate->user, pOperUser->user);
}
return mnodeCreateUser(pOperUser->pAcct, pCreate->user, pCreate->pass, pMsg);
} else {
mError("user:%s, no rights to create user", pOperUser->user);
code = TSDB_CODE_MND_NO_RIGHTS;
return TSDB_CODE_MND_NO_RIGHTS;
}
return code;
}
static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) {
......@@ -413,8 +421,7 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) {
if (hasRight) {
memset(pUser->pass, 0, sizeof(pUser->pass));
taosEncryptPass((uint8_t*)pAlter->pass, strlen(pAlter->pass), pUser->pass);
code = mnodeUpdateUser(pUser);
mLPrint("user:%s, password is altered by %s, result:%s", pUser->user, pOperUser->user, tstrerror(code));
code = mnodeUpdateUser(pUser, pMsg);
} else {
mError("user:%s, no rights to alter user", pOperUser->user);
code = TSDB_CODE_MND_NO_RIGHTS;
......@@ -454,8 +461,7 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) {
pUser->writeAuth = 1;
}
code = mnodeUpdateUser(pUser);
mLPrint("user:%s, privilege is altered by %s, result:%s", pUser->user, pOperUser->user, tstrerror(code));
code = mnodeUpdateUser(pUser, pMsg);
} else {
mError("user:%s, no rights to alter user", pOperUser->user);
code = TSDB_CODE_MND_NO_RIGHTS;
......@@ -501,10 +507,7 @@ static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg) {
}
if (hasRight) {
code = mnodeDropUser(pUser);
if (code == TSDB_CODE_SUCCESS) {
mLPrint("user:%s, is dropped by %s, result:%s", pUser->user, pOperUser->user, tstrerror(code));
}
code = mnodeDropUser(pUser, pMsg);
} else {
code = TSDB_CODE_MND_NO_RIGHTS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册