diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index a01355c099d16f298ba6cf4330b6ab2714e47473..e1c068d88ac5dcc8f5094ea3cd5f0eae9032f509 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -25,7 +25,7 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SDnode SDnode; typedef struct SMnode SMnode; -typedef struct SMnodeMsg SMnodeMsg; +typedef struct SMndMsg SMndMsg; typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); @@ -33,7 +33,7 @@ typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef struct SMnodeMsg { +typedef struct SMndMsg { char user[TSDB_USER_LEN]; char db[TSDB_DB_FNAME_LEN]; int32_t acctId; @@ -42,7 +42,7 @@ typedef struct SMnodeMsg { SRpcMsg rpcMsg; int32_t contLen; void* pCont; -} SMnodeMsg; +} SMndMsg; typedef struct { int32_t dnodeId; @@ -122,7 +122,7 @@ int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, cha * @param pMsg The request msg. * @param code The error code. */ -void mndSendRsp(SMnodeMsg *pMsg, int32_t code); +void mndSendRsp(SMndMsg *pMsg, int32_t code); /** * @brief Process the read, write, sync request. @@ -130,7 +130,7 @@ void mndSendRsp(SMnodeMsg *pMsg, int32_t code); * @param pMsg The request msg. * @return int32_t 0 for success, -1 for failure. */ -void mndProcessMsg(SMnodeMsg *pMsg); +void mndProcessMsg(SMndMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index d31c2d7312e83003dbf290e89131fc9c96b8cb5d..32f17ab5a5d22452eb73c47ef91534a53b6a0c51 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -81,8 +81,6 @@ typedef struct { MndMsgFp msgFp[TDMT_MAX]; SProcObj *pProcess; bool singleProc; - bool isChild; - bool testFlag; } SMnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index d7147bc6eab66a1e2ce98f1dbd40e6cb543dbe33..985ce4942844e06e67993020bc57fa6bd5e36dc3 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -61,7 +61,7 @@ typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType; typedef enum { DND_ENV_INIT = 0, DND_ENV_READY = 1, DND_ENV_CLEANUP = 2 } EEnvStat; typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps); -typedef int32_t (*MndMsgFp)(SDnode *pDnode, SMnodeMsg *pMnodeMsg); +typedef int32_t (*MndMsgFp)(SDnode *pDnode, SMndMsg *pMnodeMsg); EStat dndGetStat(SDnode *pDnode); void dndSetStat(SDnode *pDnode, EStat stat); diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c index d335cc1c664344ad2f78f82343b6bdaeed122246..13443a94b6c3bd29545ac6c9e173afa70fb26b53 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c @@ -118,8 +118,6 @@ void mmRelease(SDnode *pDnode, SMnode *pMnode) { int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; pMgmt->singleProc = false; - pMgmt->isChild = false; - pMgmt->testFlag = true; int32_t code = mmOpenImp(pDnode, pOption); diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c index 2f32099229514fa14272634229d06161a65ca275..8bcbea4f4d362125d1cdfa7138b077cf1af051db 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c @@ -20,12 +20,12 @@ #include "dndTransport.h" #include "dndWorker.h" -static int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMsg); -static int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMsg); -static int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMsg); -static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMsg); +static int32_t mmProcessWriteMsg(SDnode *pDnode, SMndMsg *pMsg); +static int32_t mmProcessSyncMsg(SDnode *pDnode, SMndMsg *pMsg); +static int32_t mmProcessReadMsg(SDnode *pDnode, SMndMsg *pMsg); +static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMsg *pMsg); static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc); -static void mmConsumeQueue(SDnode *pDnode, SMnodeMsg *pMsg); +static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg); int32_t mmStartWorker(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; @@ -139,7 +139,7 @@ void mmInitMsgFp(SMnodeMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = mmProcessWriteMsg; } -static int32_t mmBuildMsg(SMnodeMsg *pMsg, SRpcMsg *pRpc) { +static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) { SRpcConnInfo connInfo = {0}; if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) { terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; @@ -151,7 +151,7 @@ static int32_t mmBuildMsg(SMnodeMsg *pMsg, SRpcMsg *pRpc) { pMsg->rpcMsg = *pRpc; pMsg->createdTime = taosGetTimestampSec(); - char *pCont = (char *)pMsg + sizeof(SMnodeMsg); + char *pCont = (char *)pMsg + sizeof(SMndMsg); memcpy(pCont, pRpc->pCont, pRpc->contLen); pMsg->rpcMsg = *pRpc; pMsg->rpcMsg.pCont = pCont; @@ -164,7 +164,7 @@ static int32_t mmBuildMsg(SMnodeMsg *pMsg, SRpcMsg *pRpc) { void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t code = -1; - SMnodeMsg *pMsg = NULL; + SMndMsg *pMsg = NULL; MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)]; if (msgFp == NULL) { @@ -172,7 +172,7 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { goto _OVER; } - int32_t contLen = sizeof(SMnodeMsg) + pRpc->contLen; + int32_t contLen = sizeof(SMndMsg) + pRpc->contLen; pMsg = taosAllocateQitem(contLen); if (pMsg == NULL) { goto _OVER; @@ -211,16 +211,16 @@ _OVER: rpcFreeCont(pRpc->pCont); } -int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMsg) { - return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg); +int32_t mmProcessWriteMsg(SDnode *pDnode, SMndMsg *pMsg) { + return mmPutMndMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg); } -int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMsg) { - return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg); +int32_t mmProcessSyncMsg(SDnode *pDnode, SMndMsg *pMsg) { + return mmPutMndMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg); } -int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMsg) { - return mmPutMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg); +int32_t mmProcessReadMsg(SDnode *pDnode, SMndMsg *pMsg) { + return mmPutMndMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg); } int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpc) { @@ -231,7 +231,7 @@ int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpc) { return mmPutRpcMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpc); } -static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMsg) { +static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMsg *pMsg) { SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) return -1; @@ -243,18 +243,18 @@ static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg } static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc) { - int32_t contLen = sizeof(SMnodeMsg) + pRpc->contLen; - SMnodeMsg *pMsg = taosAllocateQitem(contLen); + int32_t contLen = sizeof(SMndMsg) + pRpc->contLen; + SMndMsg *pMsg = taosAllocateQitem(contLen); if (pMsg == NULL) { return -1; } pMsg->contLen = pRpc->contLen; - pMsg->pCont = (char *)pMsg + sizeof(SMnodeMsg); + pMsg->pCont = (char *)pMsg + sizeof(SMndMsg); memcpy(pMsg->pCont, pRpc->pCont, pRpc->contLen); rpcFreeCont(pRpc->pCont); - int32_t code = mmPutMsgToWorker(pDnode, pWorker, pMsg); + int32_t code = mmPutMndMsgToWorker(pDnode, pWorker, pMsg); if (code != 0) { taosFreeQitem(pMsg); } @@ -263,16 +263,33 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs } void mmConsumeChildQueue(SDnode *pDnode, SBlockItem *pBlock) { - SMnodeMsg *pMsg = (SMnodeMsg *)pBlock->pCont; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + SMndMsg *pMsg = (SMndMsg *)pBlock->pCont; + + SRpcMsg *pRpc = &pMsg->rpcMsg; + pRpc->pCont = (char *)pMsg + sizeof(SMndMsg); + + MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)]; + int32_t code = (*msgFp)(pDnode, pMsg); + + if (code == 0) return; - if (mmPutMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg) != 0) { - // todo + bool isReq = (pRpc->msgType & 1U); + + if (isReq) { + if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) { + dndSendRedirectRsp(pDnode, pRpc); + } else { + SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; + rpcSendResponse(&rsp); + } } + taosFreeQitem(pMsg); } void mmConsumeParentQueue(SMnodeMgmt *pMgmt, SBlockItem *pBlock) {} -static void mmConsumeQueue(SDnode *pDnode, SMnodeMsg *pMsg) { +static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = mmAcquire(pDnode); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 0fe24cd7575162a2ecb1bfe41e77ed6df49e2060..b74ddfdf0e88c46ef7f657ffc3add2a3a77b7d91 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -38,11 +38,11 @@ extern "C" { #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} -typedef int32_t (*MndMsgFp)(SMnodeMsg *pMsg); +typedef int32_t (*MndMsgFp)(SMndMsg *pMsg); typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); -typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); -typedef int32_t (*ShowRetrieveFp)(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +typedef int32_t (*ShowMetaFp)(SMndMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); +typedef int32_t (*ShowRetrieveFp)(SMndMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); typedef struct SMnodeLoad { diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index a0218eaf65807a571de7271b55a4ca4ccacf891a..d6258c040c1aac4e9373230d55c7098d4e94cdfd 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -47,7 +47,7 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); -void mndTransProcessRsp(SMnodeMsg *pRsp); +void mndTransProcessRsp(SMndMsg *pRsp); void mndTransPullup(SMnode *pMnode); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index aa87eb43a18a84e04bef9056d3fde8991daa6d2a..0e3bc53219f010b52b88a5d6832af5d94692eff9 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -26,9 +26,9 @@ static SSdbRow *mndAcctActionDecode(SSdbRaw *pRaw); static int32_t mndAcctActionInsert(SSdb *pSdb, SAcctObj *pAcct); static int32_t mndAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct); static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew); -static int32_t mndProcessCreateAcctReq(SMnodeMsg *pReq); -static int32_t mndProcessAlterAcctReq(SMnodeMsg *pReq); -static int32_t mndProcessDropAcctReq(SMnodeMsg *pReq); +static int32_t mndProcessCreateAcctReq(SMndMsg *pReq); +static int32_t mndProcessAlterAcctReq(SMndMsg *pReq); +static int32_t mndProcessDropAcctReq(SMndMsg *pReq); int32_t mndInitAcct(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_ACCT, @@ -185,19 +185,19 @@ static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew) { return 0; } -static int32_t mndProcessCreateAcctReq(SMnodeMsg *pReq) { +static int32_t mndProcessCreateAcctReq(SMndMsg *pReq) { terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED; mError("failed to process create acct request since %s", terrstr()); return -1; } -static int32_t mndProcessAlterAcctReq(SMnodeMsg *pReq) { +static int32_t mndProcessAlterAcctReq(SMndMsg *pReq) { terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED; mError("failed to process create acct request since %s", terrstr()); return -1; } -static int32_t mndProcessDropAcctReq(SMnodeMsg *pReq) { +static int32_t mndProcessDropAcctReq(SMndMsg *pReq) { terrno = TSDB_CODE_MND_MSG_NOT_PROCESSED; mError("failed to process create acct request since %s", terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndAuth.c b/source/dnode/mnode/impl/src/mndAuth.c index ab8b9350b099e01d646b7fb9a43976f10fe93ad3..ea58887af7649908e6a65a5d79d66178b1d8cd64 100644 --- a/source/dnode/mnode/impl/src/mndAuth.c +++ b/source/dnode/mnode/impl/src/mndAuth.c @@ -17,7 +17,7 @@ #include "mndAuth.h" #include "mndUser.h" -static int32_t mndProcessAuthReq(SMnodeMsg *pReq); +static int32_t mndProcessAuthReq(SMndMsg *pReq); int32_t mndInitAuth(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_AUTH, mndProcessAuthReq); @@ -45,7 +45,7 @@ int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, cha return 0; } -static int32_t mndProcessAuthReq(SMnodeMsg *pReq) { +static int32_t mndProcessAuthReq(SMndMsg *pReq) { SAuthReq authReq = {0}; if (tDeserializeSAuthReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &authReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; diff --git a/source/dnode/mnode/impl/src/mndBnode.c b/source/dnode/mnode/impl/src/mndBnode.c index a9921354644c4837305767d0a2f1e2ef568b2654..cea05f16b26fb6ff5cc6d1f1189d487e4f941378 100644 --- a/source/dnode/mnode/impl/src/mndBnode.c +++ b/source/dnode/mnode/impl/src/mndBnode.c @@ -29,12 +29,12 @@ static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw); static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj); static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj); static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew); -static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pReq); -static int32_t mndProcessDropBnodeReq(SMnodeMsg *pReq); -static int32_t mndProcessCreateBnodeRsp(SMnodeMsg *pRsp); -static int32_t mndProcessDropBnodeRsp(SMnodeMsg *pRsp); -static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveBnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateBnodeReq(SMndMsg *pReq); +static int32_t mndProcessDropBnodeReq(SMndMsg *pReq); +static int32_t mndProcessCreateBnodeRsp(SMndMsg *pRsp); +static int32_t mndProcessDropBnodeRsp(SMndMsg *pRsp); +static int32_t mndGetBnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveBnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextBnode(SMnode *pMnode, void *pIter); int32_t mndInitBnode(SMnode *pMnode) { @@ -240,7 +240,7 @@ static int32_t mndSetCreateBnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, S return 0; } -static int32_t mndCreateBnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode, SMCreateBnodeReq *pCreate) { +static int32_t mndCreateBnode(SMnode *pMnode, SMndMsg *pReq, SDnodeObj *pDnode, SMCreateBnodeReq *pCreate) { int32_t code = -1; SBnodeObj bnodeObj = {0}; @@ -266,7 +266,7 @@ CREATE_BNODE_OVER: return code; } -static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pReq) { +static int32_t mndProcessCreateBnodeReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SBnodeObj *pObj = NULL; @@ -363,7 +363,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn return 0; } -static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pReq, SBnodeObj *pObj) { +static int32_t mndDropBnode(SMnode *pMnode, SMndMsg *pReq, SBnodeObj *pObj) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_BNODE, &pReq->rpcMsg); @@ -382,7 +382,7 @@ DROP_BNODE_OVER: return code; } -static int32_t mndProcessDropBnodeReq(SMnodeMsg *pReq) { +static int32_t mndProcessDropBnodeReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SUserObj *pUser = NULL; @@ -430,17 +430,17 @@ DROP_BNODE_OVER: return code; } -static int32_t mndProcessCreateBnodeRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessCreateBnodeRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessDropBnodeRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessDropBnodeRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetBnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -480,7 +480,7 @@ static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * return 0; } -static int32_t mndRetrieveBnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveBnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 3410a386da2d80bafcb35db5f2d97e987f753eb3..a433f70644314e7f85eddfc2b65e619d58f7e597 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -26,8 +26,8 @@ static int32_t mndClusterActionInsert(SSdb *pSdb, SClusterObj *pCluster); static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster); static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOldCluster, SClusterObj *pNewCluster); static int32_t mndCreateDefaultCluster(SMnode *pMnode); -static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetClusterMeta(SMndMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveClusters(SMndMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextCluster(SMnode *pMnode, void *pIter); int32_t mndInitCluster(SMnode *pMnode) { @@ -163,7 +163,7 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { return sdbWrite(pMnode->pSdb, pRaw); } -static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetClusterMeta(SMndMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { int32_t cols = 0; SSchema *pSchema = pMeta->pSchemas; @@ -201,7 +201,7 @@ static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp return 0; } -static int32_t mndRetrieveClusters(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveClusters(SMndMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pMsg->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 0f4b538cde51b85a65089a42072a799b6c8dccc0..50b3f7ca9f058fad7bc63fc1601631c149c9e33e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -34,9 +34,9 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer); -static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg); -static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessConsumerMetaMsg(SMndMsg *pMsg); +static int32_t mndGetConsumerMeta(SMndMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveConsumer(SMndMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); int32_t mndInitConsumer(SMnode *pMnode) { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 9cde9201eda26dd07d23e89244e68214d2c6c7f0..2e7de8bd8b357c8d8837e380b7ab756f9ccb98ae 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -31,14 +31,14 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw); static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew); -static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq); -static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq); -static int32_t mndProcessDropDbReq(SMnodeMsg *pReq); -static int32_t mndProcessUseDbReq(SMnodeMsg *pReq); -static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq); -static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq); -static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateDbReq(SMndMsg *pReq); +static int32_t mndProcessAlterDbReq(SMndMsg *pReq); +static int32_t mndProcessDropDbReq(SMndMsg *pReq); +static int32_t mndProcessUseDbReq(SMndMsg *pReq); +static int32_t mndProcessSyncDbReq(SMndMsg *pReq); +static int32_t mndProcessCompactDbReq(SMndMsg *pReq); +static int32_t mndGetDbMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveDbs(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextDb(SMnode *pMnode, void *pIter); int32_t mndInitDb(SMnode *pMnode) { @@ -384,7 +384,7 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreate, SUserObj *pUser) { +static int32_t mndCreateDb(SMnode *pMnode, SMndMsg *pReq, SCreateDbReq *pCreate, SUserObj *pUser) { SDbObj dbObj = {0}; memcpy(dbObj.name, pCreate->db, TSDB_DB_FNAME_LEN); memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN); @@ -458,7 +458,7 @@ CREATE_DB_OVER: return code; } -static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) { +static int32_t mndProcessCreateDbReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SDbObj *pDb = NULL; @@ -622,7 +622,7 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) { +static int32_t mndUpdateDb(SMnode *pMnode, SMndMsg *pReq, SDbObj *pOld, SDbObj *pNew) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, &pReq->rpcMsg); if (pTrans == NULL) goto UPDATE_DB_OVER; @@ -642,7 +642,7 @@ UPDATE_DB_OVER: return code; } -static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq) { +static int32_t mndProcessAlterDbReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SDbObj *pDb = NULL; @@ -802,7 +802,7 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p return 0; } -static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) { +static int32_t mndDropDb(SMnode *pMnode, SMndMsg *pReq, SDbObj *pDb) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_DB, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_DB_OVER; @@ -837,7 +837,7 @@ DROP_DB_OVER: return code; } -static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) { +static int32_t mndProcessDropDbReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SDbObj *pDb = NULL; @@ -924,7 +924,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { sdbCancelFetch(pSdb, pIter); } -static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { +static int32_t mndProcessUseDbReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SDbObj *pDb = NULL; @@ -1067,7 +1067,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, return 0; } -static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) { +static int32_t mndProcessSyncDbReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SDbObj *pDb = NULL; @@ -1108,7 +1108,7 @@ SYNC_DB_OVER: return code; } -static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq) { +static int32_t mndProcessCompactDbReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SDbObj *pDb = NULL; @@ -1149,7 +1149,7 @@ SYNC_DB_OVER: return code; } -static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetDbMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -1290,7 +1290,7 @@ char *mnGetDbStr(char *src) { return pos; } -static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveDbs(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index e231717e61de7ea717cdda2d4b686610bf1dd632..e3b1c0a177b6f0c478cc0450fcac616bcbece6bb 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -49,17 +49,17 @@ static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode); static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode); static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew); -static int32_t mndProcessCreateDnodeReq(SMnodeMsg *pReq); -static int32_t mndProcessDropDnodeReq(SMnodeMsg *pReq); -static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq); -static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pRsp); -static int32_t mndProcessStatusReq(SMnodeMsg *pReq); - -static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveConfigs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateDnodeReq(SMndMsg *pReq); +static int32_t mndProcessDropDnodeReq(SMndMsg *pReq); +static int32_t mndProcessConfigDnodeReq(SMndMsg *pReq); +static int32_t mndProcessConfigDnodeRsp(SMndMsg *pRsp); +static int32_t mndProcessStatusReq(SMndMsg *pReq); + +static int32_t mndGetConfigMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveConfigs(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter); -static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveDnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetDnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveDnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter); int32_t mndInitDnode(SMnode *pMnode) { @@ -296,7 +296,7 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { return 0; } -static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { +static int32_t mndProcessStatusReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; SStatusReq statusReq = {0}; SDnodeObj *pDnode = NULL; @@ -437,7 +437,7 @@ PROCESS_STATUS_MSG_OVER: return code; } -static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pReq, SCreateDnodeReq *pCreate) { +static int32_t mndCreateDnode(SMnode *pMnode, SMndMsg *pReq, SCreateDnodeReq *pCreate) { SDnodeObj dnodeObj = {0}; dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE); dnodeObj.createdTime = taosGetTimestampMs(); @@ -471,7 +471,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pReq, SCreateDnodeReq * return 0; } -static int32_t mndProcessCreateDnodeReq(SMnodeMsg *pReq) { +static int32_t mndProcessCreateDnodeReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SUserObj *pUser = NULL; @@ -521,7 +521,7 @@ CREATE_DNODE_OVER: return code; } -static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode) { +static int32_t mndDropDnode(SMnode *pMnode, SMndMsg *pReq, SDnodeObj *pDnode) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_DNODE, &pReq->rpcMsg); if (pTrans == NULL) { mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr()); @@ -547,7 +547,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode) return 0; } -static int32_t mndProcessDropDnodeReq(SMnodeMsg *pReq) { +static int32_t mndProcessDropDnodeReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SUserObj *pUser = NULL; @@ -596,7 +596,7 @@ DROP_DNODE_OVER: return code; } -static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq) { +static int32_t mndProcessConfigDnodeReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; SMCfgDnodeReq cfgReq = {0}; @@ -628,11 +628,11 @@ static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq) { return 0; } -static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessConfigDnodeRsp(SMndMsg *pRsp) { mInfo("app:%p config rsp from dnode", pRsp->rpcMsg.ahandle); } -static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetConfigMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { int32_t cols = 0; SSchema *pSchema = pMeta->pSchemas; @@ -663,7 +663,7 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp return 0; } -static int32_t mndRetrieveConfigs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveConfigs(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; int32_t totalRows = 0; int32_t numOfRows = 0; @@ -709,7 +709,7 @@ static int32_t mndRetrieveConfigs(SMnodeMsg *pReq, SShowObj *pShow, char *data, static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter) {} -static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetDnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -773,7 +773,7 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * return 0; } -static int32_t mndRetrieveDnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveDnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index b2daf848c53262f45fc18196398ba84ff4fe1d38..90162d75ab23b436575e984dec2c77950337ed2d 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -29,13 +29,13 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw); static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc); static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc); static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew); -static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate); -static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc); -static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq); -static int32_t mndProcessDropFuncReq(SMnodeMsg *pReq); -static int32_t mndProcessRetrieveFuncReq(SMnodeMsg *pReq); -static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveFuncs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndCreateFunc(SMnode *pMnode, SMndMsg *pReq, SCreateFuncReq *pCreate); +static int32_t mndDropFunc(SMnode *pMnode, SMndMsg *pReq, SFuncObj *pFunc); +static int32_t mndProcessCreateFuncReq(SMndMsg *pReq); +static int32_t mndProcessDropFuncReq(SMndMsg *pReq); +static int32_t mndProcessRetrieveFuncReq(SMndMsg *pReq); +static int32_t mndGetFuncMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveFuncs(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter); int32_t mndInitFunc(SMnode *pMnode) { @@ -181,7 +181,7 @@ static void mndReleaseFunc(SMnode *pMnode, SFuncObj *pFunc) { sdbRelease(pSdb, pFunc); } -static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pCreate) { +static int32_t mndCreateFunc(SMnode *pMnode, SMndMsg *pReq, SCreateFuncReq *pCreate) { int32_t code = -1; STrans *pTrans = NULL; @@ -234,7 +234,7 @@ CREATE_FUNC_OVER: return code; } -static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) { +static int32_t mndDropFunc(SMnode *pMnode, SMndMsg *pReq, SFuncObj *pFunc) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_FUNC, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_FUNC_OVER; @@ -262,7 +262,7 @@ DROP_FUNC_OVER: return code; } -static int32_t mndProcessCreateFuncReq(SMnodeMsg *pReq) { +static int32_t mndProcessCreateFuncReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SUserObj *pUser = NULL; @@ -339,7 +339,7 @@ CREATE_FUNC_OVER: return code; } -static int32_t mndProcessDropFuncReq(SMnodeMsg *pReq) { +static int32_t mndProcessDropFuncReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SUserObj *pUser = NULL; @@ -394,7 +394,7 @@ DROP_FUNC_OVER: return code; } -static int32_t mndProcessRetrieveFuncReq(SMnodeMsg *pReq) { +static int32_t mndProcessRetrieveFuncReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SRetrieveFuncReq retrieveReq = {0}; @@ -463,7 +463,7 @@ RETRIEVE_FUNC_OVER: return code; } -static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetFuncMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -545,7 +545,7 @@ static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int16_t le return tDataTypes[type].name; } -static int32_t mndRetrieveFuncs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveFuncs(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 9a6297a0f47f1293afc416aff67a071ab2cdab85..5f0e0874df3d6b76b117352a49f0019b6e73bf58 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -30,13 +30,13 @@ static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw); static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj); static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj); static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew); -static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pReq); -static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq); -static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pRsp); -static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pRsp); -static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp); -static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveMnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateMnodeReq(SMndMsg *pReq); +static int32_t mndProcessDropMnodeReq(SMndMsg *pReq); +static int32_t mndProcessCreateMnodeRsp(SMndMsg *pRsp); +static int32_t mndProcessAlterMnodeRsp(SMndMsg *pRsp); +static int32_t mndProcessDropMnodeRsp(SMndMsg *pRsp); +static int32_t mndGetMnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveMnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter); int32_t mndInitMnode(SMnode *pMnode) { @@ -355,7 +355,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno return 0; } -static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) { +static int32_t mndCreateMnode(SMnode *pMnode, SMndMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) { int32_t code = -1; SMnodeObj mnodeObj = {0}; @@ -380,7 +380,7 @@ CREATE_MNODE_OVER: return code; } -static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pReq) { +static int32_t mndProcessCreateMnodeReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SMnodeObj *pObj = NULL; @@ -527,7 +527,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode return 0; } -static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pReq, SMnodeObj *pObj) { +static int32_t mndDropMnode(SMnode *pMnode, SMndMsg *pReq, SMnodeObj *pObj) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_MNODE, &pReq->rpcMsg); @@ -547,7 +547,7 @@ DROP_MNODE_OVER: return code; } -static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq) { +static int32_t mndProcessDropMnodeReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SUserObj *pUser = NULL; @@ -595,22 +595,22 @@ DROP_MNODE_OVER: return code; } -static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessCreateMnodeRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessAlterMnodeRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessDropMnodeRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetMnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -663,7 +663,7 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * return 0; } -static int32_t mndRetrieveMnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveMnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index ac9e99ebd43769559c47868275d84da2c4066001..d61840f9dfe54c247a12f9c223aa3200f21a9a62 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -32,7 +32,7 @@ static int32_t mndOffsetActionInsert(SSdb *pSdb, SMqOffsetObj *pOffset); static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset); static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOffset, SMqOffsetObj *pNewOffset); -static int32_t mndProcessCommitOffsetReq(SMnodeMsg *pReq); +static int32_t mndProcessCommitOffsetReq(SMndMsg *pReq); int32_t mndInitOffset(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_OFFSET, @@ -152,7 +152,7 @@ int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicNam return 0; } -static int32_t mndProcessCommitOffsetReq(SMnodeMsg *pMsg) { +static int32_t mndProcessCommitOffsetReq(SMndMsg *pMsg) { char key[TSDB_PARTITION_KEY_LEN]; SMnode *pMnode = pMsg->pMnode; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index dbf299e8da04d30845985414642d6509d3dbd218..6a531ffa78778290134c2359ebf8dcb92ee4f81b 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -50,14 +50,14 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter); static void mndCancelGetNextConn(SMnode *pMnode, void *pIter); -static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq); -static int32_t mndProcessConnectReq(SMnodeMsg *pReq); -static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq); -static int32_t mndProcessKillConnReq(SMnodeMsg *pReq); -static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); -static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessHeartBeatReq(SMndMsg *pReq); +static int32_t mndProcessConnectReq(SMndMsg *pReq); +static int32_t mndProcessKillQueryReq(SMndMsg *pReq); +static int32_t mndProcessKillConnReq(SMndMsg *pReq); +static int32_t mndGetConnsMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveConns(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetQueryMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveQueries(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter); int32_t mndInitProfile(SMnode *pMnode) { @@ -177,7 +177,7 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) { } } -static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { +static int32_t mndProcessConnectReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; SUserObj *pUser = NULL; SDbObj *pDb = NULL; @@ -338,7 +338,7 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) { return NULL; } -static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { +static int32_t mndProcessHeartBeatReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; SClientHbBatchReq batchReq = {0}; @@ -500,7 +500,7 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { #endif } -static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) { +static int32_t mndProcessKillQueryReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; @@ -534,7 +534,7 @@ static int32_t mndProcessKillQueryReq(SMnodeMsg *pReq) { } } -static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) { +static int32_t mndProcessKillConnReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; @@ -566,7 +566,7 @@ static int32_t mndProcessKillConnReq(SMnodeMsg *pReq) { } } -static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetConnsMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; @@ -641,7 +641,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * return 0; } -static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveConns(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; int32_t numOfRows = 0; SConnObj *pConn = NULL; @@ -700,7 +700,7 @@ static int32_t mndRetrieveConns(SMnodeMsg *pReq, SShowObj *pShow, char *data, in return numOfRows; } -static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetQueryMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; @@ -815,7 +815,7 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * return 0; } -static int32_t mndRetrieveQueries(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveQueries(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; int32_t numOfRows = 0; SConnObj *pConn = NULL; diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 0c227b0db9def38096fc636a40ba627d9a6acc17..68e6ed96ebf6ca19d6cd6acc1314ff976d39f602 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -29,12 +29,12 @@ static SSdbRow *mndQnodeActionDecode(SSdbRaw *pRaw); static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj); static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj); static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew); -static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pReq); -static int32_t mndProcessDropQnodeReq(SMnodeMsg *pReq); -static int32_t mndProcessCreateQnodeRsp(SMnodeMsg *pRsp); -static int32_t mndProcessDropQnodeRsp(SMnodeMsg *pRsp); -static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveQnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateQnodeReq(SMndMsg *pReq); +static int32_t mndProcessDropQnodeReq(SMndMsg *pReq); +static int32_t mndProcessCreateQnodeRsp(SMndMsg *pRsp); +static int32_t mndProcessDropQnodeRsp(SMndMsg *pRsp); +static int32_t mndGetQnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveQnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter); int32_t mndInitQnode(SMnode *pMnode) { @@ -240,7 +240,7 @@ static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, S return 0; } -static int32_t mndCreateQnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode, SMCreateQnodeReq *pCreate) { +static int32_t mndCreateQnode(SMnode *pMnode, SMndMsg *pReq, SDnodeObj *pDnode, SMCreateQnodeReq *pCreate) { int32_t code = -1; SQnodeObj qnodeObj = {0}; @@ -266,7 +266,7 @@ CREATE_QNODE_OVER: return code; } -static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pReq) { +static int32_t mndProcessCreateQnodeReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SQnodeObj *pObj = NULL; @@ -363,7 +363,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn return 0; } -static int32_t mndDropQnode(SMnode *pMnode, SMnodeMsg *pReq, SQnodeObj *pObj) { +static int32_t mndDropQnode(SMnode *pMnode, SMndMsg *pReq, SQnodeObj *pObj) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_QNODE, &pReq->rpcMsg); @@ -382,7 +382,7 @@ DROP_QNODE_OVER: return code; } -static int32_t mndProcessDropQnodeReq(SMnodeMsg *pReq) { +static int32_t mndProcessDropQnodeReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SUserObj *pUser = NULL; @@ -430,17 +430,17 @@ DROP_QNODE_OVER: return code; } -static int32_t mndProcessCreateQnodeRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessCreateQnodeRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessDropQnodeRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessDropQnodeRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetQnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -480,7 +480,7 @@ static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * return 0; } -static int32_t mndRetrieveQnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveQnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 8fd0c282e15224a051225f04d911a98d2191e0a0..f9a26f430de9b5a195dbbf369a758bac0bfd1579 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -22,8 +22,8 @@ static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq); static void mndFreeShowObj(SShowObj *pShow); static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); -static int32_t mndProcessShowReq(SMnodeMsg *pReq); -static int32_t mndProcessRetrieveReq(SMnodeMsg *pReq); +static int32_t mndProcessShowReq(SMndMsg *pReq); +static int32_t mndProcessRetrieveReq(SMndMsg *pReq); static bool mndCheckRetrieveFinished(SShowObj *pShow); int32_t mndInitShow(SMnode *pMnode) { @@ -115,7 +115,7 @@ static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) { taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove); } -static int32_t mndProcessShowReq(SMnodeMsg *pReq) { +static int32_t mndProcessShowReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; SShowMgmt *pMgmt = &pMnode->showMgmt; int32_t code = -1; @@ -176,7 +176,7 @@ SHOW_OVER: return code; } -static int32_t mndProcessRetrieveReq(SMnodeMsg *pReq) { +static int32_t mndProcessRetrieveReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; SShowMgmt *pMgmt = &pMnode->showMgmt; int32_t rowsToRead = 0; diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index 6040aa088c8debc93a6149866e28da363762838c..da26ef3e6ade5d759ca9286ae7e584ecfc76f59a 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -29,12 +29,12 @@ static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw); static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj); static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj); static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew); -static int32_t mndProcessCreateSnodeReq(SMnodeMsg *pReq); -static int32_t mndProcessDropSnodeReq(SMnodeMsg *pReq); -static int32_t mndProcessCreateSnodeRsp(SMnodeMsg *pRsp); -static int32_t mndProcessDropSnodeRsp(SMnodeMsg *pRsp); -static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveSnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateSnodeReq(SMndMsg *pReq); +static int32_t mndProcessDropSnodeReq(SMndMsg *pReq); +static int32_t mndProcessCreateSnodeRsp(SMndMsg *pRsp); +static int32_t mndProcessDropSnodeRsp(SMndMsg *pRsp); +static int32_t mndGetSnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveSnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter); int32_t mndInitSnode(SMnode *pMnode) { @@ -240,7 +240,7 @@ static int32_t mndSetCreateSnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, S return 0; } -static int32_t mndCreateSnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode, SMCreateSnodeReq *pCreate) { +static int32_t mndCreateSnode(SMnode *pMnode, SMndMsg *pReq, SDnodeObj *pDnode, SMCreateSnodeReq *pCreate) { int32_t code = -1; SSnodeObj snodeObj = {0}; @@ -267,7 +267,7 @@ CREATE_SNODE_OVER: return code; } -static int32_t mndProcessCreateSnodeReq(SMnodeMsg *pReq) { +static int32_t mndProcessCreateSnodeReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SSnodeObj *pObj = NULL; @@ -365,7 +365,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn return 0; } -static int32_t mndDropSnode(SMnode *pMnode, SMnodeMsg *pReq, SSnodeObj *pObj) { +static int32_t mndDropSnode(SMnode *pMnode, SMndMsg *pReq, SSnodeObj *pObj) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_SNODE, &pReq->rpcMsg); @@ -385,7 +385,7 @@ DROP_SNODE_OVER: return code; } -static int32_t mndProcessDropSnodeReq(SMnodeMsg *pReq) { +static int32_t mndProcessDropSnodeReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SUserObj *pUser = NULL; @@ -433,17 +433,17 @@ DROP_SNODE_OVER: return code; } -static int32_t mndProcessCreateSnodeRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessCreateSnodeRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessDropSnodeRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessDropSnodeRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetSnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -483,7 +483,7 @@ static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * return 0; } -static int32_t mndRetrieveSnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveSnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index d8091dfa760bc939d60867852d74aae459aa207a..f24ba8c1882d04533b20526e46d5616eaf839852 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -33,15 +33,15 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew); -static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq); -static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq); -static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq); -static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp); -static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp); -static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp); -static int32_t mndProcessTableMetaReq(SMnodeMsg *pReq); -static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessMCreateStbReq(SMndMsg *pReq); +static int32_t mndProcessMAlterStbReq(SMndMsg *pReq); +static int32_t mndProcessMDropStbReq(SMndMsg *pReq); +static int32_t mndProcessVCreateStbRsp(SMndMsg *pRsp); +static int32_t mndProcessVAlterStbRsp(SMndMsg *pRsp); +static int32_t mndProcessVDropStbRsp(SMndMsg *pRsp); +static int32_t mndProcessTableMetaReq(SMndMsg *pReq); +static int32_t mndGetStbMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveStb(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); int32_t mndInitStb(SMnode *pMnode) { @@ -490,7 +490,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) { +static int32_t mndCreateStb(SMnode *pMnode, SMndMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) { SStbObj stbObj = {0}; memcpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN); memcpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN); @@ -551,7 +551,7 @@ CREATE_STB_OVER: return code; } -static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) { +static int32_t mndProcessMCreateStbReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SStbObj *pTopicStb = NULL; @@ -623,7 +623,7 @@ CREATE_STB_OVER: return code; } -static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessVCreateStbRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } @@ -980,7 +980,7 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq *pAlter, SDbObj *pDb, SStbObj *pOld) { +static int32_t mndAlterStb(SMnode *pMnode, SMndMsg *pReq, const SMAltertbReq *pAlter, SDbObj *pDb, SStbObj *pOld) { SStbObj stbObj = {0}; taosRLockLatch(&pOld->lock); memcpy(&stbObj, pOld, sizeof(SStbObj)); @@ -1043,7 +1043,7 @@ ALTER_STB_OVER: return code; } -static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) { +static int32_t mndProcessMAlterStbReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SDbObj *pDb = NULL; @@ -1096,7 +1096,7 @@ ALTER_STB_OVER: return code; } -static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessVAlterStbRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } @@ -1160,7 +1160,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * return 0; } -static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) { +static int32_t mndDropStb(SMnode *pMnode, SMndMsg *pReq, SDbObj *pDb, SStbObj *pStb) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_STB, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_STB_OVER; @@ -1180,7 +1180,7 @@ DROP_STB_OVER: return code; } -static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) { +static int32_t mndProcessMDropStbReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SUserObj *pUser = NULL; @@ -1237,7 +1237,7 @@ DROP_STB_OVER: return code; } -static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessVDropStbRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } @@ -1311,7 +1311,7 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char return code; } -static int32_t mndProcessTableMetaReq(SMnodeMsg *pReq) { +static int32_t mndProcessTableMetaReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; STableInfoReq infoReq = {0}; @@ -1436,7 +1436,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs return 0; } -static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetStbMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -1499,7 +1499,7 @@ static void mndExtractTableName(char *tableId, char *name) { } } -static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveStb(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 54ad9cd7e239b20a84eaa6f7c791b1942413598f..cd05710f5173c1c37f35a76c10a2c0fedeffc0ed 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -31,12 +31,12 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); -static int32_t mndProcessCreateStreamReq(SMnodeMsg *pReq); -/*static int32_t mndProcessDropStreamReq(SMnodeMsg *pReq);*/ -/*static int32_t mndProcessDropStreamInRsp(SMnodeMsg *pRsp);*/ -static int32_t mndProcessStreamMetaReq(SMnodeMsg *pReq); -static int32_t mndGetStreamMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveStream(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateStreamReq(SMndMsg *pReq); +/*static int32_t mndProcessDropStreamReq(SMndMsg *pReq);*/ +/*static int32_t mndProcessDropStreamInRsp(SMndMsg *pRsp);*/ +static int32_t mndProcessStreamMetaReq(SMndMsg *pReq); +static int32_t mndGetStreamMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveStream(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); int32_t mndInitStream(SMnode *pMnode) { @@ -208,7 +208,7 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } -static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { +static int32_t mndCreateStream(SMnode *pMnode, SMndMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { mDebug("stream:%s to create", pCreate->name); SStreamObj streamObj = {0}; tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); @@ -247,7 +247,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateStreamR return 0; } -static int32_t mndProcessCreateStreamReq(SMnodeMsg *pReq) { +static int32_t mndProcessCreateStreamReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SStreamObj *pStream = NULL; @@ -339,7 +339,7 @@ static int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfS return 0; } -static int32_t mndGetStreamMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetStreamMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -383,7 +383,7 @@ static int32_t mndGetStreamMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp return 0; } -static int32_t mndRetrieveStream(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveStream(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 1d964c4383a668370aa8c41e226890fc1f46187c..fa0a32d045a9455d947be9bb89041f0c7ff3cd6e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -48,14 +48,14 @@ static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *); static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub); -static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg); -static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg); -static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg); -static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); -static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); -static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg); -static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg); -static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg); +static int32_t mndProcessSubscribeReq(SMndMsg *pMsg); +static int32_t mndProcessSubscribeRsp(SMndMsg *pMsg); +static int32_t mndProcessSubscribeInternalReq(SMndMsg *pMsg); +static int32_t mndProcessSubscribeInternalRsp(SMndMsg *pMsg); +static int32_t mndProcessMqTimerMsg(SMndMsg *pMsg); +static int32_t mndProcessGetSubEpReq(SMndMsg *pMsg); +static int32_t mndProcessDoRebalanceMsg(SMndMsg *pMsg); +static int32_t mndProcessResetOffsetReq(SMndMsg *pMsg); static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, const SMqConsumerEp *pConsumerEp); @@ -211,7 +211,7 @@ static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMq } #if 0 -static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg) { +static int32_t mndProcessResetOffsetReq(SMndMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; uint8_t *str = pMsg->rpcMsg.pCont; SMqCMResetOffsetReq req; @@ -249,7 +249,7 @@ static int32_t mndProcessResetOffsetReq(SMnodeMsg *pMsg) { } #endif -static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { +static int32_t mndProcessGetSubEpReq(SMndMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont; SMqCMGetSubEpRsp rsp = {0}; @@ -356,7 +356,7 @@ static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { return pRebSub; } -static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { +static int32_t mndProcessMqTimerMsg(SMndMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SSdb *pSdb = pMnode->pSdb; SMqConsumerObj *pConsumer; @@ -428,7 +428,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { +static int32_t mndProcessDoRebalanceMsg(SMndMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg); @@ -994,7 +994,7 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { sdbRelease(pSdb, pSub); } -static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { +static int32_t mndProcessSubscribeReq(SMndMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; char *msgStr = pMsg->rpcMsg.pCont; SCMSubscribeReq subscribe; @@ -1156,7 +1156,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessSubscribeInternalRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index 453535c6e762211fcd5d5ff9687197a2af9cd983..0298a2fce11eed851941786edcba7ffeb577ea10 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -79,7 +79,7 @@ static char* mndBuildTelemetryReport(SMnode* pMnode) { return pCont; } -static int32_t mndProcessTelemTimer(SMnodeMsg* pReq) { +static int32_t mndProcessTelemTimer(SMndMsg* pReq) { SMnode* pMnode = pReq->pMnode; STelemMgmt* pMgmt = &pMnode->telemMgmt; if (!pMgmt->enable) return 0; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 7d7c5f9975f3498f34ffb54640dcbb59a0bfd891..8982fa70610e2f9e20b88d4e00a13d5b85eb2a27 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -31,12 +31,12 @@ static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic); -static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq); -static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq); -static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp); -static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq); -static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateTopicReq(SMndMsg *pReq); +static int32_t mndProcessDropTopicReq(SMndMsg *pReq); +static int32_t mndProcessDropTopicInRsp(SMndMsg *pRsp); +static int32_t mndProcessTopicMetaReq(SMndMsg *pReq); +static int32_t mndGetTopicMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveTopic(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); int32_t mndInitTopic(SMnode *pMnode) { @@ -236,7 +236,7 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) { return 0; } -static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) { +static int32_t mndCreateTopic(SMnode *pMnode, SMndMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) { mDebug("topic:%s to create", pCreate->name); SMqTopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); @@ -276,7 +276,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateTopicReq return 0; } -static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) { +static int32_t mndProcessCreateTopicReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SMqTopicObj *pTopic = NULL; @@ -341,7 +341,7 @@ CREATE_TOPIC_OVER: return code; } -static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { +static int32_t mndDropTopic(SMnode *pMnode, SMndMsg *pReq, SMqTopicObj *pTopic) { // TODO: cannot drop when subscribed STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg); if (pTrans == NULL) { @@ -368,7 +368,7 @@ static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic return 0; } -static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq) { +static int32_t mndProcessDropTopicReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; SMDropTopicReq dropReq = {0}; @@ -403,7 +403,7 @@ static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessDropTopicInRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } @@ -435,7 +435,7 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo return 0; } -static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetTopicMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -479,7 +479,7 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * return 0; } -static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveTopic(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index a009e01a52cfd1a535529cd71c47235b000a828a..c5e872f22d04fc63ce51d63e1ea8cad891ff31b7 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -55,11 +55,11 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); static void mndTransExecute(SMnode *pMnode, STrans *pTrans); static void mndTransSendRpcRsp(STrans *pTrans); -static int32_t mndProcessTransReq(SMnodeMsg *pReq); -static int32_t mndProcessKillTransReq(SMnodeMsg *pReq); +static int32_t mndProcessTransReq(SMndMsg *pReq); +static int32_t mndProcessKillTransReq(SMndMsg *pReq); -static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetTransMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveTrans(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter); int32_t mndInitTrans(SMnode *pMnode) { @@ -774,7 +774,7 @@ static void mndTransSendRpcRsp(STrans *pTrans) { } } -void mndTransProcessRsp(SMnodeMsg *pRsp) { +void mndTransProcessRsp(SMndMsg *pRsp) { SMnode *pMnode = pRsp->pMnode; int64_t signature = (int64_t)(pRsp->rpcMsg.ahandle); int32_t transId = (int32_t)(signature >> 32); @@ -1157,7 +1157,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { mndTransSendRpcRsp(pTrans); } -static int32_t mndProcessTransReq(SMnodeMsg *pReq) { +static int32_t mndProcessTransReq(SMndMsg *pReq) { mndTransPullup(pReq->pMnode); return 0; } @@ -1199,7 +1199,7 @@ static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { return 0; } -static int32_t mndProcessKillTransReq(SMnodeMsg *pReq) { +static int32_t mndProcessKillTransReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; SKillTransReq killReq = {0}; int32_t code = -1; @@ -1257,7 +1257,7 @@ void mndTransPullup(SMnode *pMnode) { sdbWriteFile(pMnode->pSdb); } -static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetTransMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -1320,7 +1320,7 @@ static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * return 0; } -static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveTrans(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index b1cdf484b4805a9355da48db15cf5df5ca7ea91b..38af07d46be6fea3cb64ed88a3329ecfd17ba214 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -30,13 +30,13 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw); static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser); static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser); static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew); -static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate, SMnodeMsg *pReq); -static int32_t mndProcessCreateUserReq(SMnodeMsg *pReq); -static int32_t mndProcessAlterUserReq(SMnodeMsg *pReq); -static int32_t mndProcessDropUserReq(SMnodeMsg *pReq); -static int32_t mndProcessGetUserAuthReq(SMnodeMsg *pReq); -static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveUsers(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate, SMndMsg *pReq); +static int32_t mndProcessCreateUserReq(SMndMsg *pReq); +static int32_t mndProcessAlterUserReq(SMndMsg *pReq); +static int32_t mndProcessDropUserReq(SMndMsg *pReq); +static int32_t mndProcessGetUserAuthReq(SMndMsg *pReq); +static int32_t mndGetUserMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveUsers(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextUser(SMnode *pMnode, void *pIter); int32_t mndInitUser(SMnode *pMnode) { @@ -261,7 +261,7 @@ void mndReleaseUser(SMnode *pMnode, SUserObj *pUser) { sdbRelease(pSdb, pUser); } -static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate, SMnodeMsg *pReq) { +static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate, SMndMsg *pReq) { SUserObj userObj = {0}; taosEncryptPass_c((uint8_t *)pCreate->pass, strlen(pCreate->pass), userObj.pass); tstrncpy(userObj.user, pCreate->user, TSDB_USER_LEN); @@ -295,7 +295,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate return 0; } -static int32_t mndProcessCreateUserReq(SMnodeMsg *pReq) { +static int32_t mndProcessCreateUserReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SUserObj *pUser = NULL; @@ -349,7 +349,7 @@ CREATE_USER_OVER: return code; } -static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SMnodeMsg *pReq) { +static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SMndMsg *pReq) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_ALTER_USER,&pReq->rpcMsg); if (pTrans == NULL) { mError("user:%s, failed to update since %s", pOld->user, terrstr()); @@ -397,7 +397,7 @@ static SHashObj *mndDupDbHash(SHashObj *pOld) { return pNew; } -static int32_t mndProcessAlterUserReq(SMnodeMsg *pReq) { +static int32_t mndProcessAlterUserReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SUserObj *pUser = NULL; @@ -510,7 +510,7 @@ ALTER_USER_OVER: return code; } -static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pReq, SUserObj *pUser) { +static int32_t mndDropUser(SMnode *pMnode, SMndMsg *pReq, SUserObj *pUser) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_USER, &pReq->rpcMsg); if (pTrans == NULL) { mError("user:%s, failed to drop since %s", pUser->user, terrstr()); @@ -536,7 +536,7 @@ static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pReq, SUserObj *pUser) { return 0; } -static int32_t mndProcessDropUserReq(SMnodeMsg *pReq) { +static int32_t mndProcessDropUserReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SUserObj *pUser = NULL; @@ -585,7 +585,7 @@ DROP_USER_OVER: return code; } -static int32_t mndProcessGetUserAuthReq(SMnodeMsg *pReq) { +static int32_t mndProcessGetUserAuthReq(SMndMsg *pReq) { SMnode *pMnode = pReq->pMnode; int32_t code = -1; SUserObj *pUser = NULL; @@ -647,7 +647,7 @@ GET_AUTH_OVER: return code; } -static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetUserMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -693,7 +693,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p return 0; } -static int32_t mndRetrieveUsers(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveUsers(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index f7b177f170daa28f499d7b8a8d9fd22fcc98562c..53deff8967d1eba14949e9888eeb7b63a582fe6b 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -29,17 +29,17 @@ static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); -static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pRsp); -static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pRsp); -static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pRsp); -static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pRsp); -static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pRsp); - -static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveVgroups(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndProcessCreateVnodeRsp(SMndMsg *pRsp); +static int32_t mndProcessAlterVnodeRsp(SMndMsg *pRsp); +static int32_t mndProcessDropVnodeRsp(SMndMsg *pRsp); +static int32_t mndProcessSyncVnodeRsp(SMndMsg *pRsp); +static int32_t mndProcessCompactVnodeRsp(SMndMsg *pRsp); + +static int32_t mndGetVgroupMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveVgroups(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter); -static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetVnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveVnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter); int32_t mndInitVgroup(SMnode *pMnode) { @@ -452,24 +452,24 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) { return epset; } -static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessCreateVnodeRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessAlterVnodeRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pRsp) { +static int32_t mndProcessDropVnodeRsp(SMndMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; } -static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pRsp) { return 0; } +static int32_t mndProcessSyncVnodeRsp(SMndMsg *pRsp) { return 0; } -static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pRsp) { return 0; } +static int32_t mndProcessCompactVnodeRsp(SMndMsg *pRsp) { return 0; } static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { SVgObj *pVgroup = pObj; @@ -500,7 +500,7 @@ static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pRep return 0; } -static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetVgroupMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -551,7 +551,7 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp return 0; } -static int32_t mndRetrieveVgroups(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveVgroups(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; @@ -624,7 +624,7 @@ int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId) { return numOfVnodes; } -static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { +static int32_t mndGetVnodeMeta(SMndMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -664,7 +664,7 @@ static int32_t mndGetVnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * return 0; } -static int32_t mndRetrieveVnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveVnodes(SMndMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 92c514f35908dad4ec0e14045ba8d978aad3d33d..b1898c061b77a3b78416b1db5d54595eaf2b903d 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -390,12 +390,12 @@ void mndDestroy(const char *path) { mDebug("mnode is destroyed"); } -void mndSendRsp(SMnodeMsg *pMsg, int32_t code) { +void mndSendRsp(SMndMsg *pMsg, int32_t code) { SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code}; rpcSendResponse(&rpcRsp); } -void mndProcessMsg(SMnodeMsg *pMsg) { +void mndProcessMsg(SMndMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; int32_t code = 0; tmsg_t msgType = pMsg->rpcMsg.msgType; diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 17bc432e8bfa78a3b3296f3da985eca4dfd0a4a8..e445f7fcd0bea92e5c378d11c129bfa8ff6ec861 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -17,6 +17,7 @@ #include "tprocess.h" #include "taoserror.h" #include "tlog.h" +#include "tqueue.h" #define SHM_DEFAULT_SIZE (20 * 1024 * 1024) #define CEIL4(n) (ceil((float)(n) / 4) * 4) @@ -142,7 +143,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, SBlockItem **ppItem) { SBlockItem *pBlock = (SBlockItem *)(pQueue->pBuffer + pQueue->head); - SBlockItem *pItem = malloc(pBlock->contLen); + SBlockItem *pItem = taosAllocateQitem(pBlock->contLen); if (pItem == NULL) { pthread_mutex_unlock(&pQueue->mutex); tsem_post(&pQueue->sem);