From 585d95e91c317fa7bbc4299a6a09e3066b5e9ed6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 13 Jun 2020 03:30:39 +0000 Subject: [PATCH] [TD-570] add user/db into sdb write queue --- src/mnode/inc/mnodeSdb.h | 5 ++- src/mnode/inc/mnodeUser.h | 1 + src/mnode/src/mnodeDb.c | 93 ++++++++++++++++++++------------------- src/mnode/src/mnodeSdb.c | 5 ++- src/mnode/src/mnodeUser.c | 67 ++++++++++++++-------------- 5 files changed, 92 insertions(+), 79 deletions(-) diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index 88578f79ca..df9dbf843e 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -20,6 +20,8 @@ extern "C" { #endif +struct SMnodeMsg; + typedef enum { SDB_TABLE_DNODE = 0, SDB_TABLE_MNODE = 1, @@ -48,10 +50,11 @@ typedef struct { ESdbOper type; void * table; void * pObj; - void * pMnodeMsg; void * rowData; int32_t rowSize; int32_t retCode; // for callback in sdb queue + void (*cb)(struct SMnodeMsg *pMsg, int32_t code); + struct SMnodeMsg *pMsg; } SSdbOper; typedef struct { diff --git a/src/mnode/inc/mnodeUser.h b/src/mnode/inc/mnodeUser.h index 2ff0b4be81..073460f9d3 100644 --- a/src/mnode/inc/mnodeUser.h +++ b/src/mnode/inc/mnodeUser.h @@ -28,6 +28,7 @@ void * mnodeGetNextUser(void *pIter, SUserObj **pUser); void mnodeIncUserRef(SUserObj *pUser); void mnodeDecUserRef(SUserObj *pUser); SUserObj *mnodeGetUserFromConn(void *pConn); +char * mnodeGetUserFromMsg(void *pMnodeMsg); int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg); void mnodeDropAllUsers(SAcctObj *pAcct); diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 3666d61adc..21fcdc102c 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -41,7 +41,7 @@ static void * tsDbSdb = NULL; static int32_t tsDbUpdateSize; -static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate); +static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMsg); static int32_t mnodeDropDb(SMnodeMsg *newMsg); static int32_t mnodeSetDbDropping(SDbObj *pDb); static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); @@ -308,7 +308,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->replications < 0) pCfg->replications = tsReplications; } -static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { +static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMsg) { int32_t code = acctCheck(pAcct, ACCT_GRANT_DB); if (code != 0) return code; @@ -361,15 +361,17 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) { .table = tsDbSdb, .pObj = pDb, .rowSize = sizeof(SDbObj), + .pMsg = pMsg }; code = sdbInsertRow(&oper); if (code != TSDB_CODE_SUCCESS) { tfree(pDb); - code = TSDB_CODE_MND_SDB_ERROR; + return code; + } else { + mLPrint("db:%s, is created by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); + return TSDB_CODE_MND_ACTION_IN_PROGRESS; } - - return code; } bool mnodeCheckIsMonitorDB(char *db, char *monitordb) { @@ -768,12 +770,7 @@ static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg) { } else if (!pMsg->pUser->writeAuth) { code = TSDB_CODE_MND_NO_RIGHTS; } else { - code = mnodeCreateDb(pMsg->pUser->pAcct, pCreate); - if (code == TSDB_CODE_SUCCESS) { - mLPrint("db:%s, is created by %s", pCreate->db, pMsg->pUser->user); - } else { - mError("db:%s, failed to create, reason:%s", pCreate->db, tstrerror(code)); - } + code = mnodeCreateDb(pMsg->pUser->pAcct, pCreate, pMsg); } return code; @@ -890,7 +887,29 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SCMAlterDbMsg *pAlter) { return newCfg; } -static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { +static void mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) { + if (code != TSDB_CODE_SUCCESS) return; + SDbObj *pDb = pMsg->pDb; + + void *pIter = NULL; + while (1) { + SVgObj *pVgroup = NULL; + pIter = mnodeGetNextVgroup(pIter, &pVgroup); + if (pVgroup == NULL) break; + if (pVgroup->pDb == pDb) { + mnodeSendCreateVgroupMsg(pVgroup, NULL); + } + mnodeDecVgroupRef(pVgroup); + } + sdbFreeIter(pIter); + + mTrace("db:%s, all vgroups is altered", pDb->name); + mLPrint("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); + + balanceNotify(); +} + +static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter, void *pMsg) { SDbCfg newCfg = mnodeGetAlterDbOption(pDb, pAlter); if (terrno != TSDB_CODE_SUCCESS) { return terrno; @@ -901,37 +920,25 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) { return code; } - int32_t oldReplica = pDb->cfg.replications; - if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) { pDb->cfg = newCfg; pDb->cfgVersion++; SSdbOper oper = { - .type = SDB_OPER_GLOBAL, + .type = SDB_OPER_GLOBAL, .table = tsDbSdb, - .pObj = pDb + .pObj = pDb, + .pMsg = pMsg, + .cb = mnodeAlterDbCb }; int32_t code = sdbUpdateRow(&oper); if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_MND_SDB_ERROR; + return code; + } else { + return TSDB_CODE_MND_ACTION_IN_PROGRESS; } } - void *pIter = NULL; - while (1) { - SVgObj *pVgroup = NULL; - pIter = mnodeGetNextVgroup(pIter, &pVgroup); - if (pVgroup == NULL) break; - mnodeSendCreateVgroupMsg(pVgroup, NULL); - mnodeDecVgroupRef(pVgroup); - } - sdbFreeIter(pIter); - - if (oldReplica != pDb->cfg.replications) { - balanceNotify(); - } - return TSDB_CODE_SUCCESS; } @@ -945,14 +952,7 @@ static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_INVALID_DB; } - int32_t code = mnodeAlterDb(pMsg->pDb, pAlter); - if (code != TSDB_CODE_SUCCESS) { - mError("db:%s, failed to alter, invalid db option", pAlter->db); - return code; - } - - mTrace("db:%s, all vgroups is altered", pMsg->pDb->name); - return TSDB_CODE_SUCCESS; + return mnodeAlterDb(pMsg->pDb, pAlter, pMsg); } static int32_t mnodeDropDb(SMnodeMsg *pMsg) { @@ -960,16 +960,19 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) { mPrint("db:%s, drop db from sdb", pDb->name); SSdbOper oper = { - .type = SDB_OPER_GLOBAL, + .type = SDB_OPER_GLOBAL, .table = tsDbSdb, - .pObj = pDb + .pObj = pDb, + .pMsg = pMsg }; + int32_t code = sdbDeleteRow(&oper); - if (code != 0) { - code = TSDB_CODE_MND_SDB_ERROR; + if (code != TSDB_CODE_SUCCESS) { + return code; + } else { + mLPrint("db:%s, is dropped by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); + return TSDB_CODE_MND_ACTION_IN_PROGRESS; } - - return code; } static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) { diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index d4daefe25f..380a992473 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -975,7 +975,10 @@ static void *sdbWorkerFp(void *param) { taosGetQitem(tsSdbWriteQall, &type, &item); if (type == TAOS_QTYPE_RPC) { pOper = (SSdbOper *)item; - dnodeSendRpcMnodeWriteRsp(pOper->pMnodeMsg, pOper->retCode); + if (pOper->cb) { + (*pOper->cb)(pOper->pMsg, pOper->retCode); + } + dnodeSendRpcMnodeWriteRsp(pOper->pMsg, pOper->retCode); } taosFreeQitem(item); } diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index 24b8a67dc4..57587fb65b 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -172,19 +172,21 @@ void mnodeDecUserRef(SUserObj *pUser) { return sdbDecRef(tsUserSdb, pUser); } -static int32_t mnodeUpdateUser(SUserObj *pUser) { +static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) { SSdbOper oper = { - .type = SDB_OPER_GLOBAL, + .type = SDB_OPER_GLOBAL, .table = tsUserSdb, - .pObj = pUser + .pObj = pUser, + .pMsg = pMsg }; int32_t code = sdbUpdateRow(&oper); if (code != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_SDB_ERROR; + return code; + } else { + mLPrint("user:%s, is altered by %s", pUser->user, mnodeGetUserFromMsg(pMsg)); + return TSDB_CODE_MND_ACTION_IN_PROGRESS; } - - return code; } int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) { @@ -225,11 +227,11 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) { } SSdbOper oper = { - .type = SDB_OPER_GLOBAL, - .table = tsUserSdb, - .pObj = pUser, + .type = SDB_OPER_GLOBAL, + .table = tsUserSdb, + .pObj = pUser, .rowSize = sizeof(SUserObj), - .pMnodeMsg = pMsg + .pMsg = pMsg }; code = sdbInsertRow(&oper); @@ -237,23 +239,26 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) { tfree(pUser); return code; } else { + mLPrint("user:%s, is created by %s", pUser->user, mnodeGetUserFromMsg(pMsg)); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } } -static int32_t mnodeDropUser(SUserObj *pUser) { +static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) { SSdbOper oper = { - .type = SDB_OPER_GLOBAL, + .type = SDB_OPER_GLOBAL, .table = tsUserSdb, - .pObj = pUser + .pObj = pUser, + .pMsg = pMsg }; int32_t code = sdbDeleteRow(&oper); if (code != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_MND_SDB_ERROR; + return code; + } else { + mLPrint("user:%s, is dropped by %s", pUser->user, mnodeGetUserFromMsg(pMsg)); + return TSDB_CODE_MND_ACTION_IN_PROGRESS; } - - return code; } static int32_t mnodeGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { @@ -361,22 +366,25 @@ SUserObj *mnodeGetUserFromConn(void *pConn) { } } +char *mnodeGetUserFromMsg(void *pMsg) { + SMnodeMsg *pMnodeMsg = pMsg; + if (pMnodeMsg != NULL &&pMnodeMsg->pUser != NULL) { + return pMnodeMsg->pUser->user; + } else { + return "system"; + } +} + static int32_t mnodeProcessCreateUserMsg(SMnodeMsg *pMsg) { - int32_t code; SUserObj *pOperUser = pMsg->pUser; if (pOperUser->superAuth) { SCMCreateUserMsg *pCreate = pMsg->rpcMsg.pCont; - code = mnodeCreateUser(pOperUser->pAcct, pCreate->user, pCreate->pass, pMsg); - if (code == TSDB_CODE_SUCCESS) { - mLPrint("user:%s, is created by %s", pCreate->user, pOperUser->user); - } + return mnodeCreateUser(pOperUser->pAcct, pCreate->user, pCreate->pass, pMsg); } else { mError("user:%s, no rights to create user", pOperUser->user); - code = TSDB_CODE_MND_NO_RIGHTS; + return TSDB_CODE_MND_NO_RIGHTS; } - - return code; } static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) { @@ -413,8 +421,7 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) { if (hasRight) { memset(pUser->pass, 0, sizeof(pUser->pass)); taosEncryptPass((uint8_t*)pAlter->pass, strlen(pAlter->pass), pUser->pass); - code = mnodeUpdateUser(pUser); - mLPrint("user:%s, password is altered by %s, result:%s", pUser->user, pOperUser->user, tstrerror(code)); + code = mnodeUpdateUser(pUser, pMsg); } else { mError("user:%s, no rights to alter user", pOperUser->user); code = TSDB_CODE_MND_NO_RIGHTS; @@ -454,8 +461,7 @@ static int32_t mnodeProcessAlterUserMsg(SMnodeMsg *pMsg) { pUser->writeAuth = 1; } - code = mnodeUpdateUser(pUser); - mLPrint("user:%s, privilege is altered by %s, result:%s", pUser->user, pOperUser->user, tstrerror(code)); + code = mnodeUpdateUser(pUser, pMsg); } else { mError("user:%s, no rights to alter user", pOperUser->user); code = TSDB_CODE_MND_NO_RIGHTS; @@ -501,10 +507,7 @@ static int32_t mnodeProcessDropUserMsg(SMnodeMsg *pMsg) { } if (hasRight) { - code = mnodeDropUser(pUser); - if (code == TSDB_CODE_SUCCESS) { - mLPrint("user:%s, is dropped by %s, result:%s", pUser->user, pOperUser->user, tstrerror(code)); - } + code = mnodeDropUser(pUser, pMsg); } else { code = TSDB_CODE_MND_NO_RIGHTS; } -- GitLab