diff --git a/source/dnode/mnode/impl/inc/mndSma.h b/source/dnode/mnode/impl/inc/mndSma.h index 4a80f619d3026c82abeb2c2e9c777073dbf2fa6a..5d13f8b74dd88e6304b309b6e18a9f943288ae45 100644 --- a/source/dnode/mnode/impl/inc/mndSma.h +++ b/source/dnode/mnode/impl/inc/mndSma.h @@ -26,6 +26,8 @@ int32_t mndInitSma(SMnode *pMnode); void mndCleanupSma(SMnode *pMnode); SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName); void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma); +int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb); +int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index cfe363fdf0e1b3f90410566d676a9de17567bfb8..43263af5733d89cca8cac82b1a584b6e07d31cc1 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -935,6 +935,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER; + if (mndDropSmasByDb(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER; SUserObj *pUser = mndAcquireUser(pMnode, pDb->createUser); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index c19b558f198913a88def6c172d8427ad28f7fd2b..cc57076c0a04c4f300a782a66704e941d5f650f1 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -38,10 +38,10 @@ static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma); static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb); static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew); static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups); -static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq); -static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq); +static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq); +static int32_t mndProcessDropSmaReq(SRpcMsg *pReq); static int32_t mndProcessGetSmaReq(SRpcMsg *pReq); -static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq); +static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq); static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextSma(SMnode *pMnode, void *pIter); @@ -56,8 +56,8 @@ int32_t mndInitSma(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndSmaActionDelete, }; - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessMCreateSmaReq); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessMDropSmaReq); + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropSmaReq); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq); @@ -79,7 +79,6 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { if (pRaw == NULL) goto _OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER) @@ -100,6 +99,7 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER) SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER) SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER) + if (pSma->exprLen > 0) { SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER) } @@ -115,6 +115,7 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) + terrno = 0; _OVER: @@ -193,6 +194,7 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) { } SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) + terrno = 0; _OVER: @@ -383,6 +385,25 @@ static int32_t mndSetCreateSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, S return 0; } +static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { + SStbObj stbObj = {0}; + taosRLockLatch(&pStb->lock); + memcpy(&stbObj, pStb, sizeof(SStbObj)); + taosRUnLockLatch(&pStb->lock); + stbObj.pColumns = NULL; + stbObj.pTags = NULL; + stbObj.updateTime = taosGetTimestampMs(); + stbObj.lock = 0; + stbObj.smaVer++; + + SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; @@ -457,7 +478,6 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, pSma->schemaTag.pSchema[0].flags = 0; snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId"); - int32_t smaContLen = 0; void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen); if (pSmaReq == NULL) return -1; @@ -559,6 +579,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; + if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER; if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER; @@ -599,7 +620,7 @@ static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) { return 0; } -static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq) { +static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SStbObj *pStb = NULL; @@ -781,13 +802,17 @@ static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SD } static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) { - int32_t code = -1; - SVgObj *pVgroup = NULL; - STrans *pTrans = NULL; + int32_t code = -1; + SVgObj *pVgroup = NULL; + SStbObj *pStb = NULL; + STrans *pTrans = NULL; pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); if (pVgroup == NULL) goto _OVER; + pStb = mndAcquireStb(pMnode, pSma->stb); + if (pStb == NULL) goto _OVER; + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq); if (pTrans == NULL) goto _OVER; @@ -798,6 +823,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; + if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER; if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; @@ -807,10 +833,78 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p _OVER: mndTransDrop(pTrans); mndReleaseVgroup(pMnode, pVgroup); + mndReleaseStb(pMnode, pStb); + return code; +} + +int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { + SSdb *pSdb = pMnode->pSdb; + SSmaObj *pSma = NULL; + void *pIter = NULL; + SVgObj *pVgroup = NULL; + int32_t code = -1; + + while (1) { + pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); + if (pIter == NULL) break; + + if (pSma->stbUid == pStb->uid) { + pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); + if (pVgroup == NULL) goto _OVER; + if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; + if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; + if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; + if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER; + mndReleaseVgroup(pMnode, pVgroup); + pVgroup = NULL; + } + + sdbRelease(pSdb, pSma); + } + + code = 0; + +_OVER: + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pSma); + mndReleaseVgroup(pMnode, pVgroup); + return code; +} + +int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdb *pSdb = pMnode->pSdb; + SSmaObj *pSma = NULL; + void *pIter = NULL; + SVgObj *pVgroup = NULL; + int32_t code = -1; + + while (1) { + pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); + if (pIter == NULL) break; + + if (pSma->dbUid == pDb->uid) { + pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); + if (pVgroup == NULL) goto _OVER; + if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; + if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; + if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; + mndReleaseVgroup(pMnode, pVgroup); + pVgroup = NULL; + } + + sdbRelease(pSdb, pSma); + } + + code = 0; + +_OVER: + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pSma); + mndReleaseVgroup(pMnode, pVgroup); return code; } -static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq) { +static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SUserObj *pUser = NULL; @@ -900,10 +994,10 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp } static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIndexRsp *rsp, bool *exist) { - int32_t code = 0; - SSmaObj *pSma = NULL; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; + int32_t code = 0; + SSmaObj *pSma = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; STableIndexInfo info; while (1) { @@ -922,14 +1016,14 @@ static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIn info.dstTbUid = pSma->dstTbUid; info.dstVgId = pSma->dstVgId; - SVgObj* pVg = mndAcquireVgroup(pMnode, pSma->dstVgId); + SVgObj *pVg = mndAcquireVgroup(pMnode, pSma->dstVgId); if (pVg == NULL) { code = -1; sdbRelease(pSdb, pSma); return code; } info.epSet = mndGetVgroupEpset(pMnode, pVg); - + info.expr = taosMemoryMalloc(pSma->exprLen + 1); if (info.expr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -953,7 +1047,7 @@ static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIn sdbRelease(pSdb, pSma); } - + return code; } @@ -1005,10 +1099,10 @@ _OVER: static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) { STableIndexReq indexReq = {0}; - SMnode *pMnode = pReq->info.node; - int32_t code = -1; + SMnode *pMnode = pReq->info.node; + int32_t code = -1; STableIndexRsp rsp = {0}; - bool exist = false; + bool exist = false; if (tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -1055,7 +1149,6 @@ _OVER: return code; } - static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index ad0c913fb8c27470fc8b27fbb32fff9e3c932a56..cb0a893436fa45beb8e5aed0e9a5fc2fba4758c3 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -23,6 +23,7 @@ #include "mndPerfSchema.h" #include "mndScheduler.h" #include "mndShow.h" +#include "mndSma.h" #include "mndTopic.h" #include "mndTrans.h" #include "mndUser.h" @@ -36,9 +37,9 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew); -static int32_t mndProcessMCreateStbReq(SRpcMsg *pReq); -static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq); -static int32_t mndProcessMDropStbReq(SRpcMsg *pReq); +static int32_t mndProcessCreateStbReq(SRpcMsg *pReq); +static int32_t mndProcessAlterStbReq(SRpcMsg *pReq); +static int32_t mndProcessDropStbReq(SRpcMsg *pReq); static int32_t mndProcessTableMetaReq(SRpcMsg *pReq); static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); @@ -54,9 +55,9 @@ int32_t mndInitStb(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndStbActionDelete, }; - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq); - mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessCreateStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp); @@ -318,6 +319,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) { pOld->updateTime = pNew->updateTime; pOld->tagVer = pNew->tagVer; pOld->colVer = pNew->colVer; + pOld->smaVer = pNew->smaVer; pOld->nextColId = pNew->nextColId; pOld->ttl = pNew->ttl; pOld->numOfColumns = pNew->numOfColumns; @@ -361,7 +363,7 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) { return mndAcquireDb(pMnode, db); } -static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSchema) { +static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void *pSchema) { if (*(col_id_t *)colId < ((SSchema *)pSchema)->colId) { return -1; } else if (*(col_id_t *)colId > ((SSchema *)pSchema)->colId) { @@ -395,14 +397,14 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.pRSmaParam.xFilesFactor = pStb->xFilesFactor; req.pRSmaParam.delay = pStb->delay; if (pStb->ast1Len > 0) { - if (mndConvertRsmaTask(&req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, pStb->pAst1, pStb->uid, STREAM_TRIGGER_AT_ONCE, 0, - req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { + if (mndConvertRsmaTask(&req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, pStb->pAst1, pStb->uid, + STREAM_TRIGGER_AT_ONCE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { return NULL; } } if (pStb->ast2Len > 0) { - if (mndConvertRsmaTask(&req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, pStb->pAst2, pStb->uid, STREAM_TRIGGER_AT_ONCE, 0, - req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { + if (mndConvertRsmaTask(&req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, pStb->pAst2, pStb->uid, + STREAM_TRIGGER_AT_ONCE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { return NULL; } } @@ -761,7 +763,7 @@ int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p return 0; } -static int32_t mndProcessMCreateStbReq(SRpcMsg *pReq) { +static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SStbObj *pStb = NULL; @@ -1296,7 +1298,7 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont, int32_t *pLen) { - int ret; + int32_t ret; SEncoder ec = {0}; uint32_t contLen = 0; SMAlterStbRsp alterRsp = {0}; @@ -1415,7 +1417,7 @@ _OVER: return code; } -static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq) { +static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SDbObj *pDb = NULL; @@ -1545,6 +1547,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; + if (mndDropSmasByStb(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -1554,7 +1557,7 @@ _OVER: return code; } -static int32_t mndProcessMDropStbReq(SRpcMsg *pReq) { +static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SUserObj *pUser = NULL;