diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f879838d63bd6965d5aa259ecff73571125c089a..d1ec07e28503b90ff5890e876a8ac309934f1dba 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -583,8 +583,8 @@ bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) { } void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { - SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle; - assert(pMsg->ahandle != NULL); + SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; + assert(pMsg->info.ahandle != NULL); if (pSendInfo->requestObjRefId != 0) { SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); @@ -615,7 +615,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); } - SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle}; + SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle}; if (pMsg->contLen > 0) { buf.pData = taosMemoryCalloc(1, pMsg->contLen); @@ -956,7 +956,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de void* clientRpc = NULL; SServerStatusRsp statusRsp = {0}; SEpSet epSet = {.inUse = 0, .numOfEps = 1}; - SRpcMsg rpcMsg = {.ahandle = (void*)0x9526, .msgType = TDMT_DND_SERVER_STATUS}; + SRpcMsg rpcMsg = {.info.ahandle = (void*)0x9526, .msgType = TDMT_DND_SERVER_STATUS}; SRpcMsg rpcRsp = {0}; SRpcInit rpcInit = {0}; char pass[TSDB_PASSWORD_LEN + 1] = {0}; diff --git a/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h b/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h index 1923d049a581742c5e44e93d7bf137283cdf99e8..847df28f78a45b8019b9c26c1a98665762ad6f6e 100644 --- a/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h +++ b/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h @@ -36,15 +36,15 @@ typedef struct SBnodeMgmt { // bmHandle.c SArray *bmGetMsgHandles(); -int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg); -int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SNodeMsg *pReq); +int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq); +int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq); // bmWorker.c int32_t bmStartWorker(SBnodeMgmt *pMgmt); void bmStopWorker(SBnodeMgmt *pMgmt); -int32_t bmPutNodeMsgToWriteQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t bmPutNodeMsgToMonitorQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t bmPutNodeMsgToWriteQueue(SBnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t bmPutNodeMsgToMonitorQueue(SBnodeMgmt *pMgmt, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c index ab2ca890928c0753251fcdbfde1d37ef3ef9407c..407d698faae7a9f233478bb0b1c5f15d2b37fa43 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c @@ -18,7 +18,7 @@ static void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {} -int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SNodeMsg *pReq) { +int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) { SMonBmInfo bmInfo = {0}; bmGetMonitorInfo(pMgmt, &bmInfo); dmGetMonitorSystemInfo(&bmInfo.sys); @@ -37,14 +37,14 @@ int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SNodeMsg *pReq) { } tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo); - pReq->pRsp = pRsp; - pReq->rspLen = rspLen; + pReq->info.rsp = pRsp; + pReq->info.rspLen = rspLen; tFreeSMonBmInfo(&bmInfo); return 0; } -int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; +int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { + SRpcMsg *pReq = pMsg; SDCreateBnodeReq createReq = {0}; if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { @@ -67,8 +67,8 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { return 0; } -int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; +int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SRpcMsg *pReq = pMsg; SDDropBnodeReq dropReq = {0}; if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c index 1ab8b5ef3454928dc0d63024708c7de947cd54e4..b8400e1197d95c83deefead2a25574ca52acd045 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c @@ -16,23 +16,23 @@ #define _DEFAULT_SOURCE #include "bmInt.h" -static void bmSendErrorRsp(SNodeMsg *pMsg, int32_t code) { +static void bmSendErrorRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rpcRsp = { - .handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, + .info.handle = pMsg->info.handle, + .info.ahandle = pMsg->info.ahandle, .code = code, - .refId = pMsg->rpcMsg.refId, + .info.refId = pMsg->info.refId, }; tmsgSendRsp(&rpcRsp); dTrace("msg:%p, is freed", pMsg); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) { for (int32_t i = 0; i < numOfMsgs; ++i) { - SNodeMsg *pMsg = NULL; + SRpcMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); if (pMsg != NULL) { bmSendErrorRsp(pMsg, code); @@ -40,24 +40,24 @@ static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) { } } -static inline void bmSendRsp(SNodeMsg *pMsg, int32_t code) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, - .refId = pMsg->rpcMsg.refId, +static inline void bmSendRsp(SRpcMsg *pMsg, int32_t code) { + SRpcMsg rsp = {.info.handle = pMsg->info.handle, + .info.ahandle = pMsg->info.ahandle, + .info.refId = pMsg->info.refId, .code = code, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen}; + .pCont = pMsg->info.rsp, + .contLen = pMsg->info.rspLen,}; tmsgSendRsp(&rsp); } -static void bmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SBnodeMgmt *pMgmt = pInfo->ahandle; dTrace("msg:%p, get from bnode-monitor queue", pMsg); - SRpcMsg *pRpc = &pMsg->rpcMsg; + SRpcMsg *pRpc = pMsg; int32_t code = -1; - if (pMsg->rpcMsg.msgType == TDMT_MON_BM_INFO) { + if (pMsg->msgType == TDMT_MON_BM_INFO) { code = bmProcessGetMonBmInfoReq(pMgmt, pMsg); } else { terrno = TSDB_CODE_MSG_NOT_PROCESSED; @@ -76,14 +76,14 @@ static void bmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SBnodeMgmt *pMgmt = pInfo->ahandle; - SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); if (pArray == NULL) { bmSendErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); return; } for (int32_t i = 0; i < numOfMsgs; ++i) { - SNodeMsg *pMsg = NULL; + SRpcMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); if (pMsg != NULL) { dTrace("msg:%p, get from bnode-write queue", pMsg); @@ -96,17 +96,17 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO bndProcessWMsgs(pMgmt->pBnode, pArray); for (size_t i = 0; i < numOfMsgs; i++) { - SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); + SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); if (pMsg != NULL) { dTrace("msg:%p, is freed", pMsg); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } } taosArrayDestroy(pArray); } -int32_t bmPutNodeMsgToWriteQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t bmPutNodeMsgToWriteQueue(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMultiWorker *pWorker = &pMgmt->writeWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); @@ -114,7 +114,7 @@ int32_t bmPutNodeMsgToWriteQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { return 0; } -int32_t bmPutNodeMsgToMonitorQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t bmPutNodeMsgToMonitorQueue(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->monitorWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index a01e895913ec06c04cd7c2c279931f94495586be..1dd78c5a409831b268227da0443405139f84694a 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -39,10 +39,10 @@ typedef struct SDnodeMgmt { // dmHandle.c SArray *dmGetMsgHandles(); void dmSendStatusReq(SDnodeMgmt *pMgmt); -int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); // dmMonitor.c void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo); @@ -50,7 +50,7 @@ void dmGetMnodeLoads(SDnodeMgmt *pMgmt, SMonMloadInfo *pInfo); void dmSendMonitorReport(SDnodeMgmt *pMgmt); // dmWorker.c -int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmStartStatusThread(SDnodeMgmt *pMgmt); void dmStopStatusThread(SDnodeMgmt *pMgmt); int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 80adacbf5a97625e376d4d70182ee61e2d85b47a..ddc22b9d0531283e48293ce7d6a1b18afaad7d45 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -83,29 +83,25 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { tSerializeSStatusReq(pHead, contLen, &req); tFreeSStatusReq(&req); - SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)0x9527}; + SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527}; SRpcMsg rpcRsp = {0}; - dTrace("send status msg to mnode, app:%p", rpcMsg.ahandle); + dTrace("send status msg to mnode, app:%p", rpcMsg.info.ahandle); tmsgSendMnodeRecv(&rpcMsg, &rpcRsp); dmProcessStatusRsp(pMgmt, &rpcRsp); } -int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SRpcMsg *pRsp = &pMsg->rpcMsg; +int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { dError("auth rsp is received, but not supported yet"); return 0; } -int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SRpcMsg *pRsp = &pMsg->rpcMsg; +int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { dError("grant rsp is received, but not supported yet"); return 0; } -int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; - SDCfgDnodeReq *pCfg = pReq->pCont; +int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { dError("config req is received, but not supported yet"); return TSDB_CODE_OPS_NOT_SUPPORT; } @@ -139,12 +135,12 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) { taosArrayDestroy(vinfo.pVloads); } -int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { dDebug("server run status req is received"); SServerStatusRsp statusRsp = {0}; dmGetServerRunStatus(pMgmt, &statusRsp); - SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .refId = pMsg->rpcMsg.refId}; + SRpcMsg rspMsg = {.info.handle = pMsg->info.handle, .info.ahandle = pMsg->info.ahandle, .info.refId = pMsg->info.refId}; int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp); if (rspLen < 0) { rspMsg.code = TSDB_CODE_OUT_OF_MEMORY; @@ -158,8 +154,8 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { } tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp); - pMsg->pRsp = pRsp; - pMsg->rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; return 0; } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 7f36beea0de8062b3d1aa20b030315fea6536fcc..a7f08b8cef99f63fdb655de86269c5c4aa232c09 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -98,10 +98,10 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) { } } -static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SDnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; - tmsg_t msgType = pMsg->rpcMsg.msgType; + tmsg_t msgType = pMsg->msgType; bool isRequest = msgType & 1u; dTrace("msg:%p, will be processed in dnode-mgmt queue, type:%s", pMsg, TMSG_INFO(msgType)); @@ -150,18 +150,18 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { if (isRequest) { if (code != 0 && terrno != 0) code = terrno; SRpcMsg rsp = { - .handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, + .info.handle = pMsg->info.handle, + .info.ahandle = pMsg->info.ahandle, .code = code, - .refId = pMsg->rpcMsg.refId, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen, + .info.refId = pMsg->info.refId, + .pCont = pMsg->info.rsp, + .contLen = pMsg->info.rspLen, }; rpcSendResponse(&rsp); } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -187,7 +187,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) { dDebug("dnode workers are closed"); } -int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->mgmtWorker; dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 87dbe702beb45948d61bd44025ab3d3363c801e6..2a8c12a9098a1ecad0792b10d477f19ad3725d3e 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -48,20 +48,20 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq); // mmHandle.c SArray *mmGetMsgHandles(); -int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg); -int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq); -int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq); +int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); +int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq); // mmWorker.c int32_t mmStartWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt); -int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 0629dc91236b37ad7e12700b8a86bd31bb34ba6c..44a54c27403fd712c98e1966465304360927a6f2 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -20,7 +20,7 @@ static void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *mmInfo) { mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant); } -int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) { +int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { SMonMmInfo mmInfo = {0}; mmGetMonitorInfo(pMgmt, &mmInfo); dmGetMonitorSystemInfo(&mmInfo.sys); @@ -39,8 +39,8 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) { } tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo); - pReq->pRsp = pRsp; - pReq->rspLen = rspLen; + pReq->info.rsp = pRsp; + pReq->info.rspLen = rspLen; tFreeSMonMmInfo(&mmInfo); return 0; } @@ -50,7 +50,7 @@ static void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { mndGetLoad(pMgmt->pMnode, &pInfo->load); } -int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) { +int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { SMonMloadInfo mloads = {0}; mmGetMnodeLoads(pMgmt, &mloads); @@ -67,13 +67,13 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) { } tSerializeSMonMloadInfo(pRsp, rspLen, &mloads); - pReq->pRsp = pRsp; - pReq->rspLen = rspLen; + pReq->info.rsp = pRsp; + pReq->info.rspLen = rspLen; return 0; } -int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; +int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { + SRpcMsg *pReq = pMsg; SDCreateMnodeReq createReq = {0}; if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { @@ -100,8 +100,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { return 0; } -int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; +int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SRpcMsg *pReq = pMsg; SDDropMnodeReq dropReq = {0}; if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { @@ -124,8 +124,8 @@ int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return 0; } -int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; +int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SRpcMsg *pReq = pMsg; SDAlterMnodeReq alterReq = {0}; if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 622b8332fce15a61af82d3bcd64612dcd40960fe..ee65335382a79e66220e5c10d42240738c56af20 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -16,26 +16,22 @@ #define _DEFAULT_SOURCE #include "mmInt.h" -static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) { +static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rsp = { - .handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, - .refId = pMsg->rpcMsg.refId, .code = code, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen, + .info = pMsg->info, + .pCont = pMsg->info.rsp, + .contLen = pMsg->info.rspLen, }; tmsgSendRsp(&rsp); } -static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; - tmsg_t msgType = pMsg->rpcMsg.msgType; - bool isRequest = msgType & 1U; - dTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(msgType)); + dTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - switch (msgType) { + switch (pMsg->msgType) { case TDMT_DND_ALTER_MNODE: code = mmProcessAlterReq(pMgmt, pMsg); break; @@ -46,69 +42,67 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { code = mmProcessGetLoadsReq(pMgmt, pMsg); break; default: - pMsg->pNode = pMgmt->pMnode; + pMsg->info.node = pMgmt->pMnode; code = mndProcessMsg(pMsg); } - if (isRequest) { - if (pMsg->rpcMsg.handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { - if (code != 0 && terrno != 0) code = terrno; - mmSendRsp(pMsg, code); - } + if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0 && terrno != 0) code = terrno; + mmSendRsp(pMsg, code); } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } -static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void mmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; - tmsg_t msgType = pMsg->rpcMsg.msgType; + tmsg_t msgType = pMsg->msgType; bool isRequest = msgType & 1U; dTrace("msg:%p, get from mnode-query queue", pMsg); - pMsg->pNode = pMgmt->pMnode; + pMsg->info.node = pMgmt->pMnode; code = mndProcessMsg(pMsg); if (isRequest) { - if (pMsg->rpcMsg.handle != NULL && code != 0) { + if (pMsg->info.handle != NULL && code != 0) { if (code != 0 && terrno != 0) code = terrno; mmSendRsp(pMsg, code); } } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } -static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { - dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->rpcMsg.msgType)); +static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { + dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pWorker->queue, pMsg); return 0; } -int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); } +int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); } -int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); } +int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); } -int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); } +int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); } -int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); +int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); } -int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg); } static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) { - SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM); + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); if (pMsg == NULL) return -1; dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); - pMsg->rpcMsg = *pRpc; + memcpy(pMsg, pRpc, sizeof(SRpcMsg)); taosWriteQitem(pWorker->queue, pMsg); return 0; } diff --git a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h index 8b48113dd31f3bdfd42887a75c386ced1cba85e9..66950030206300dd77a82fdeb91fa7c5229b5aa6 100644 --- a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h +++ b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h @@ -37,9 +37,9 @@ typedef struct SQnodeMgmt { // qmHandle.c SArray *qmGetMsgHandles(); -int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg); -int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SNodeMsg *pReq); +int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); +int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq); // qmWorker.c int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); @@ -48,9 +48,9 @@ int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype); int32_t qmStartWorker(SQnodeMgmt *pMgmt); void qmStopWorker(SQnodeMgmt *pMgmt); -int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index 06649b835e265f05cc41c8487c54cc40e4031d10..25993e2d5b37b42f7c93996b6ed0ae041027271a 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -18,7 +18,7 @@ static void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {} -int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SNodeMsg *pReq) { +int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) { SMonQmInfo qmInfo = {0}; qmGetMonitorInfo(pMgmt, &qmInfo); dmGetMonitorSystemInfo(&qmInfo.sys); @@ -37,14 +37,14 @@ int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SNodeMsg *pReq) { } tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo); - pReq->pRsp = pRsp; - pReq->rspLen = rspLen; + pReq->info.rsp = pRsp; + pReq->info.rspLen = rspLen; tFreeSMonQmInfo(&qmInfo); return 0; } -int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; +int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { + SRpcMsg *pReq = pMsg; SDCreateQnodeReq createReq = {0}; if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { @@ -67,8 +67,8 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { return 0; } -int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; +int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SRpcMsg *pReq = pMsg; SDDropQnodeReq dropReq = {0}; if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index daac7f80bbc847fcb6e5d0113bb7637a2610a754..f0f7a07f44fc411ada0fe2d51922ca950bcd6084 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -16,26 +16,26 @@ #define _DEFAULT_SOURCE #include "qmInt.h" -static inline void qmSendRsp(SNodeMsg *pMsg, int32_t code) { +static inline void qmSendRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rsp = { - .handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, - .refId = pMsg->rpcMsg.refId, + .info.handle = pMsg->info.handle, + .info.ahandle = pMsg->info.ahandle, + .info.refId = pMsg->info.refId, .code = code, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen, + .pCont = pMsg->info.rsp, + .contLen = pMsg->info.rspLen, }; tmsgSendRsp(&rsp); } -static void qmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void qmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SQnodeMgmt *pMgmt = pInfo->ahandle; dTrace("msg:%p, get from qnode-monitor queue", pMsg); - SRpcMsg *pRpc = &pMsg->rpcMsg; + SRpcMsg *pRpc = pMsg; int32_t code = -1; - if (pMsg->rpcMsg.msgType == TDMT_MON_QM_INFO) { + if (pMsg->msgType == TDMT_MON_QM_INFO) { code = qmProcessGetMonitorInfoReq(pMgmt, pMsg); } else { terrno = TSDB_CODE_MSG_NOT_PROCESSED; @@ -51,11 +51,11 @@ static void qmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void qmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SQnodeMgmt *pMgmt = pInfo->ahandle; dTrace("msg:%p, get from qnode-query queue", pMsg); - SRpcMsg *pRpc = &pMsg->rpcMsg; + SRpcMsg *pRpc = pMsg; int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pRpc); if (pRpc->msgType & 1U && code != 0) { @@ -63,15 +63,15 @@ static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } -static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void qmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SQnodeMgmt *pMgmt = pInfo->ahandle; dTrace("msg:%p, get from qnode-fetch queue", pMsg); - SRpcMsg *pRpc = &pMsg->rpcMsg; + SRpcMsg *pRpc = pMsg; int32_t code = qndProcessFetchMsg(pMgmt->pQnode, pRpc); if (pRpc->msgType & 1U && code != 0) { @@ -79,36 +79,36 @@ static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } -static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { +static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); return 0; } -int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { return qmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); } -int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { return qmPutNodeMsgToWorker(&pMgmt->fetchWorker, pMsg); } -int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { return qmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg); } static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { - SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM); + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); if (pMsg == NULL) { return -1; } dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); - pMsg->rpcMsg = *pRpc; + memcpy(pMsg, pRpc, sizeof(SRpcMsg)); taosWriteQitem(pWorker->queue, pMsg); return 0; } diff --git a/source/dnode/mgmt/mgmt_snode/inc/smInt.h b/source/dnode/mgmt/mgmt_snode/inc/smInt.h index a1ab9ba0776e206fd439e5389fb2b97d2d4899f1..5d112f51a44420aa6d041ddf14b300234db35855 100644 --- a/source/dnode/mgmt/mgmt_snode/inc/smInt.h +++ b/source/dnode/mgmt/mgmt_snode/inc/smInt.h @@ -39,18 +39,18 @@ typedef struct SSnodeMgmt { // smHandle.c SArray *smGetMsgHandles(); -int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg); -int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq); +int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); +int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq); // smWorker.c int32_t smStartWorker(SSnodeMgmt *pMgmt); void smStopWorker(SSnodeMgmt *pMgmt); -int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index d43129b27178993184d293c2e198be6d115bc059..76f25af994d791bff1e13ca1bfb9a9f8172989d9 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -18,7 +18,7 @@ static void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {} -int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq) { +int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) { SMonSmInfo smInfo = {0}; smGetMonitorInfo(pMgmt, &smInfo); dmGetMonitorSystemInfo(&smInfo.sys); @@ -37,14 +37,14 @@ int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq) { } tSerializeSMonSmInfo(pRsp, rspLen, &smInfo); - pReq->pRsp = pRsp; - pReq->rspLen = rspLen; + pReq->info.rsp = pRsp; + pReq->info.rspLen = rspLen; tFreeSMonSmInfo(&smInfo); return 0; } -int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; +int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { + SRpcMsg *pReq = pMsg; SDCreateSnodeReq createReq = {0}; if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { @@ -67,8 +67,8 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) { return 0; } -int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; +int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SRpcMsg *pReq = pMsg; SDDropSnodeReq dropReq = {0}; if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 90e20f5fc5d0c6fbed35b940c5066b069ded84e5..1204e6884c66835d403ddec1ae223589cfc31d61 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -16,26 +16,26 @@ #define _DEFAULT_SOURCE #include "smInt.h" -static inline void smSendRsp(SNodeMsg *pMsg, int32_t code) { +static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rsp = { - .handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, - .refId = pMsg->rpcMsg.refId, + .info.handle = pMsg->info.handle, + .info.ahandle = pMsg->info.ahandle, + .info.refId = pMsg->info.refId, .code = code, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen, + .pCont = pMsg->info.rsp, + .contLen = pMsg->info.rspLen, }; tmsgSendRsp(&rsp); } -static void smProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SSnodeMgmt *pMgmt = pInfo->ahandle; dTrace("msg:%p, get from snode-monitor queue", pMsg); - SRpcMsg *pRpc = &pMsg->rpcMsg; + SRpcMsg *pRpc = pMsg; int32_t code = -1; - if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) { + if (pMsg->msgType == TDMT_MON_SM_INFO) { code = smProcessGetMonitorInfoReq(pMgmt, pMsg); } else { terrno = TSDB_CODE_MSG_NOT_PROCESSED; @@ -55,26 +55,26 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num SSnodeMgmt *pMgmt = pInfo->ahandle; for (int32_t i = 0; i < numOfMsgs; i++) { - SNodeMsg *pMsg = NULL; + SRpcMsg *pMsg = NULL; taosGetQitem(qall, (void **)&pMsg); dTrace("msg:%p, get from snode-unique queue", pMsg); - sndProcessUMsg(pMgmt->pSnode, &pMsg->rpcMsg); + sndProcessUMsg(pMgmt->pSnode, pMsg); dTrace("msg:%p, is freed", pMsg); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } } -static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void smProcessSharedQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SSnodeMgmt *pMgmt = pInfo->ahandle; dTrace("msg:%p, get from snode-shared queue", pMsg); - sndProcessSMsg(pMgmt->pSnode, &pMsg->rpcMsg); + sndProcessSMsg(pMgmt->pSnode, pMsg); dTrace("msg:%p, is freed", pMsg); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -161,7 +161,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { return 0; } -int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; @@ -173,7 +173,7 @@ int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { return 0; } -int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->monitorWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); @@ -181,8 +181,8 @@ int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { return 0; } -int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); +int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { + int32_t index = smGetSWIdFromMsg(pMsg); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; @@ -194,7 +194,7 @@ int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { return 0; } -int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->sharedWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); @@ -202,8 +202,8 @@ int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { return 0; } -int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - int32_t workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg); +int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { + int32_t workerType = smGetSWTypeFromMsg(pMsg); if (workerType == SND_WORKER_TYPE__SHARED) { return smPutNodeMsgToSharedQueue(pMgmt, pMsg); } else { diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index ca43ef8c223a9675ee4b150655623e37fccf4957..db72781be1fc6a2bcb79fc02f7fd3b2889ad361b 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -84,10 +84,10 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); // vmHandle.c SArray *vmGetMsgHandles(); -int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq); -int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq); -int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq); -int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq); +int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); // vmFile.c int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); @@ -108,13 +108,13 @@ int32_t vmPutRpcMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutRpcMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype); -int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0814568b7324af50ad7847b4ab910e6d42752d92..423c7671910b805383b60f278fc77652ad3bd532 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -82,7 +82,7 @@ static void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { taosArrayDestroy(pVloads); } -int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq) { +int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { SMonVmInfo vmInfo = {0}; vmGetMonitorInfo(pMgmt, &vmInfo); dmGetMonitorSystemInfo(&vmInfo.sys); @@ -101,13 +101,13 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq) { } tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo); - pReq->pRsp = pRsp; - pReq->rspLen = rspLen; + pReq->info.rsp = pRsp; + pReq->info.rspLen = rspLen; tFreeSMonVmInfo(&vmInfo); return 0; } -int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq) { +int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { SMonVloadInfo vloads = {0}; vmGetVnodeLoads(pMgmt, &vloads); @@ -124,8 +124,8 @@ int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq) { } tSerializeSMonVloadInfo(pRsp, rspLen, &vloads); - pReq->pRsp = pRsp; - pReq->rspLen = rspLen; + pReq->info.rsp = pRsp; + pReq->info.rspLen = rspLen; tFreeSMonVloadInfo(&vloads); return 0; } @@ -173,8 +173,8 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId); } -int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; +int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SRpcMsg *pReq = pMsg; SCreateVnodeReq createReq = {0}; int32_t code = -1; char path[TSDB_FILENAME_LEN] = {0}; @@ -241,8 +241,8 @@ _OVER: return code; } -int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; +int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SRpcMsg *pReq = pMsg; SDropVnodeReq dropReq = {0}; if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index d6e99c0899b24e6b312960ba2fa86f2b070e5b9c..813c0ed0fb42c97114e1cbd6bca73b4dd9642851 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -19,24 +19,24 @@ #include "sync.h" #include "syncTools.h" -static inline void vmSendRsp(SNodeMsg *pMsg, int32_t code) { +static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rsp = { - .handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, - .refId = pMsg->rpcMsg.refId, + .info.handle = pMsg->info.handle, + .info.ahandle = pMsg->info.ahandle, + .info.refId = pMsg->info.refId, .code = code, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen, + .pCont = pMsg->info.rsp, + .contLen = pMsg->info.rspLen, }; tmsgSendRsp(&rsp); } -static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; - tmsg_t msgType = pMsg->rpcMsg.msgType; - dTrace("msg:%p, will be processed in vnode-mgmt/monitor queue", pMsg); + tmsg_t msgType = pMsg->msgType; + dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(msgType)); switch (msgType) { case TDMT_MON_VM_INFO: @@ -62,36 +62,36 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } -static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeObj *pVnode = pInfo->ahandle; dTrace("msg:%p, will be processed in vnode-query queue", pMsg); - int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); + int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg); if (code != 0) { if (terrno != 0) code = terrno; vmSendRsp(pMsg, code); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } } -static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { +static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeObj *pVnode = pInfo->ahandle; dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg); - int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo); + int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; vmSendRsp(pMsg, code); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } } @@ -100,14 +100,14 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO SVnodeObj *pVnode = pInfo->ahandle; SRpcMsg rsp; - SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); if (pArray == NULL) { dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); return; } for (int32_t i = 0; i < numOfMsgs; ++i) { - SNodeMsg *pMsg = NULL; + SRpcMsg *pMsg = NULL; if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; dTrace("msg:%p, will be processed in vnode-write queue", pMsg); @@ -118,15 +118,15 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } for (int i = 0; i < taosArrayGetSize(pArray); i++) { - SNodeMsg *pMsg; + SRpcMsg *pMsg; SRpcMsg *pRpc; - pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); - pRpc = &pMsg->rpcMsg; + pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); + pRpc = pMsg; - rsp.ahandle = pRpc->ahandle; - rsp.handle = pRpc->handle; - rsp.refId = pRpc->refId; + rsp.info.ahandle = pRpc->info.ahandle; + rsp.info.handle = pRpc->info.handle; + rsp.info.refId = pRpc->info.refId; rsp.pCont = NULL; rsp.contLen = 0; @@ -153,9 +153,9 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } for (int32_t i = 0; i < numOfMsgs; i++) { - SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); + SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); dTrace("msg:%p, is freed", pMsg); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -164,7 +164,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SNodeMsg *pMsg = NULL; + SRpcMsg *pMsg = NULL; SRpcMsg rsp; for (int32_t i = 0; i < numOfMsgs; ++i) { @@ -176,8 +176,8 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rsp.contLen = 0; // get original rpc msg - assert(pMsg->rpcMsg.msgType == TDMT_VND_SYNC_APPLY_MSG); - SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(&pMsg->rpcMsg); + assert(pMsg->msgType == TDMT_VND_SYNC_APPLY_MSG); + SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg); syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg); SRpcMsg originalRpcMsg; syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg); @@ -192,56 +192,56 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO rpcFreeCont(originalRpcMsg.pCont); // if leader, send response - if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) { - rsp.ahandle = pMsg->rpcMsg.ahandle; - rsp.handle = pMsg->rpcMsg.handle; - rsp.refId = pMsg->rpcMsg.refId; + if (pMsg->info.handle != NULL && pMsg->info.ahandle != NULL) { + rsp.info.ahandle = pMsg->info.ahandle; + rsp.info.handle = pMsg->info.handle; + rsp.info.refId = pMsg->info.refId; tmsgSendRsp(&rsp); } - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } } static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SNodeMsg *pMsg = NULL; + SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); // todo SRpcMsg *pRsp = NULL; - (void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); + (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } } static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnodeObj *pVnode = pInfo->ahandle; - SNodeMsg *pMsg = NULL; + SRpcMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); dTrace("msg:%p, will be processed in vnode-merge queue", pMsg); - int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo); + int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; vmSendRsp(pMsg, code); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pMsg->rpcMsg.pCont); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } } } -static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) { - SRpcMsg *pRpc = &pMsg->rpcMsg; +static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { + SRpcMsg *pRpc = pMsg; SMsgHead *pHead = pRpc->pCont; int32_t code = 0; @@ -285,34 +285,34 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg, EQueueType return code; } -int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); } -int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); } -int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); } -int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); } -int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, MERGE_QUEUE); } -int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->mgmtWorker; dTrace("msg:%p, will be put into vnode-mgmt queue, worker:%s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); return 0; } -int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->monitorWorker; dTrace("msg:%p, will be put into vnode-monitor queue, worker:%s", pMsg, pWorker->name); @@ -326,13 +326,13 @@ static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType q SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) return -1; - SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM); + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); int32_t code = 0; if (pMsg != NULL) { dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); - pMsg->rpcMsg = *pRpc; - // if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0); + memcpy(pMsg, pRpc, sizeof(SRpcMsg)); + // if (pMsg->handle != NULL) assert(pMsg->refId != 0); switch (qtype) { case WRITE_QUEUE: dTrace("msg:%p, will be put into vnode-write queue", pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 8156e6c512132337be9de46497b1327661cc3947..b994b0f8cb59fe61449ab7ccd3da2bf9f7e31ea9 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -107,9 +107,9 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp); _exit: - rpcMsg.handle = pMsg->handle; - rpcMsg.ahandle = pMsg->ahandle; - rpcMsg.refId = pMsg->refId; + rpcMsg.info.handle = pMsg->info.handle; + rpcMsg.info.ahandle = pMsg->info.ahandle; + rpcMsg.info.refId = pMsg->info.refId; rpcMsg.pCont = pRsp; rpcMsg.contLen = rspLen; rpcMsg.code = code; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 4f76bc5386d88b56ebe7575ef848066591a01614..7414da6bbc4085ae67bdd67168a2436a9dfbd47c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -25,13 +25,13 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { #if 0 - SNodeMsg *pMsg; + SRpcMsg *pMsg; SRpcMsg *pRpc; *version = pVnode->state.processed; for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { - pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i); - pRpc = &pMsg->rpcMsg; + pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); + pRpc = pMsg; // set request version if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index f0f5338c4d22e711ade5d474c69fd917378c785f..f6809c7d8b78fb14f59644b403bcc78db9a71738 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -72,7 +72,7 @@ int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t ret = 0; SMsgCb *pMsgCb = rpcHandle; if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) { - pMsg->noResp = 1; + pMsg->info.noResp = 1; tmsgSendReq(rpcHandle, pEpSet, pMsg); } else { vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); @@ -127,12 +127,12 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb SRpcMsg saveRpcMsg; int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg); if (ret == 1 && cbMeta.state == TAOS_SYNC_STATE_LEADER) { - applyMsg.handle = saveRpcMsg.handle; - applyMsg.ahandle = saveRpcMsg.ahandle; - applyMsg.refId = saveRpcMsg.refId; + applyMsg.info.handle = saveRpcMsg.info.handle; + applyMsg.info.ahandle = saveRpcMsg.info.ahandle; + applyMsg.info.refId = saveRpcMsg.info.refId; } else { - applyMsg.handle = NULL; - applyMsg.ahandle = NULL; + applyMsg.info.handle = NULL; + applyMsg.info.ahandle = NULL; } // put to applyQ diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 556f094528003921589443b784814419335c1066..e12040749deaf21287ce968c0b9dea089d968622 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2826,8 +2826,8 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { } void qProcessFetchRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { - SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle; - assert(pMsg->ahandle != NULL); + SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; + assert(pMsg->info.ahandle != NULL); SDataBuf buf = {.len = pMsg->contLen, .pData = NULL}; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 4c05358a13cbbf4195363b231f7854dcc0405ab3..58886d706af0e1091ddb9ac805deb50510d85a37 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -149,9 +149,9 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra SRpcMsg rpcMsg = {.msgType = pInfo->msgType, .pCont = pMsg, .contLen = pInfo->msgInfo.len, - .ahandle = (void*)pInfo, - .handle = pInfo->msgInfo.handle, - .persistHandle = persistHandle, + .info.ahandle = (void*)pInfo, + .info.handle = pInfo->msgInfo.handle, + .info.persistHandle = persistHandle, .code = 0}; assert(pInfo->fp != NULL); diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index d4f6c2fd004f33018a74f9dd6ea779c5d7cfea8c..02707ace6a3c51df27826bea35049aba21c551cf 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -52,9 +52,9 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { SRpcMsg rpcRsp = { .msgType = TDMT_VND_QUERY_RSP, - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .refId = pConn->refId, + .info.handle = pConn->handle, + .info.ahandle = pConn->ahandle, + .info.refId = pConn->refId, .pCont = msg, .contLen = contLen, .code = code, @@ -71,9 +71,9 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { SRpcMsg rpcRsp = { .msgType = TDMT_VND_RES_READY_RSP, - .handle = pConn->handle, - .refId = pConn->refId, - .ahandle = NULL, + .info.handle = pConn->handle, + .info.refId = pConn->refId, + .info.ahandle = NULL, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = code, @@ -93,9 +93,9 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, SRpcMsg rpcRsp = { .msgType = TDMT_VND_EXPLAIN_RSP, - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .refId = pConn->refId, + .info.handle = pConn->handle, + .info.ahandle = pConn->ahandle, + .info.refId = pConn->refId, .pCont = pRsp, .contLen = contLen, .code = 0, @@ -113,9 +113,9 @@ int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_ SRpcMsg rpcRsp = { .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP, - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .refId = pConn->refId, + .info.handle = pConn->handle, + .info.ahandle = pConn->ahandle, + .info.refId = pConn->refId, .pCont = pRsp, .contLen = contLen, .code = code, @@ -135,9 +135,9 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3 SRpcMsg rpcRsp = { .msgType = TDMT_VND_FETCH_RSP, - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .refId = pConn->refId, + .info.handle = pConn->handle, + .info.ahandle = pConn->ahandle, + .info.refId = pConn->refId, .pCont = pRsp, .contLen = sizeof(*pRsp) + dataLength, .code = code, @@ -154,9 +154,9 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { SRpcMsg rpcRsp = { .msgType = TDMT_VND_CANCEL_TASK_RSP, - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .refId = pConn->refId, + .info.handle = pConn->handle, + .info.ahandle = pConn->ahandle, + .info.refId = pConn->refId, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = code, @@ -172,9 +172,9 @@ int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) { SRpcMsg rpcRsp = { .msgType = TDMT_VND_DROP_TASK_RSP, - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .refId = pConn->refId, + .info.handle = pConn->handle, + .info.ahandle = pConn->ahandle, + .info.refId = pConn->refId, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = code, @@ -228,9 +228,9 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { tSerializeSShowRsp(pBuf, bufLen, &showRsp); SRpcMsg rpcMsg = { - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, - .refId = pMsg->refId, + .info.handle = pMsg->info.handle, + .info.ahandle = pMsg->info.ahandle, + .info.refId = pMsg->info.refId, .pCont = pBuf, .contLen = bufLen, .code = code, @@ -246,9 +246,9 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchRe pRsp->numOfRows = 0; SRpcMsg rpcMsg = { - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, - .refId = pMsg->refId, + .info.handle = pMsg->info.handle, + .info.ahandle = pMsg->info.ahandle, + .info.refId = pMsg->info.refId, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = 0, @@ -271,10 +271,10 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { req->taskId = tId; SRpcMsg pNewMsg = { - .handle = pConn->handle, - .ahandle = pConn->ahandle, + .info.handle = pConn->handle, + .info.ahandle = pConn->ahandle, .msgType = TDMT_VND_QUERY_CONTINUE, - .refId = pConn->refId, + .info.refId = pConn->refId, .pCont = req, .contLen = sizeof(SQueryContinueReq), .code = 0, @@ -306,9 +306,9 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { req->refId = htobe64(rId); SRpcMsg pMsg = { - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .refId = pConn->refId, + .info.handle = pConn->handle, + .info.ahandle = pConn->ahandle, + .info.refId = pConn->refId, .msgType = TDMT_VND_DROP_TASK, .pCont = req, .contLen = sizeof(STaskDropReq), @@ -342,9 +342,9 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SQWConnInfo *pCo } SRpcMsg pMsg = { - .handle = pConn->handle, - .ahandle = pConn->ahandle, - .refId = pConn->refId, + .info.handle = pConn->handle, + .info.ahandle = pConn->ahandle, + .info.refId = pConn->refId, .msgType = TDMT_VND_QUERY_HEARTBEAT, .pCont = msg, .contLen = msgSize, @@ -383,12 +383,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int64_t rId = msg->refId; SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen}; - qwMsg.connInfo.handle = pMsg->handle; - qwMsg.connInfo.ahandle = pMsg->ahandle; - qwMsg.connInfo.refId = pMsg->refId; + qwMsg.connInfo.handle = pMsg->info.handle; + qwMsg.connInfo.ahandle = pMsg->info.ahandle; + qwMsg.connInfo.refId = pMsg->info.refId; char *sql = strndup(msg->msg, msg->sqlLen); - QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql); + QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql); taosMemoryFreeClear(sql); QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain)); @@ -418,11 +418,11 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int64_t rId = 0; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; - qwMsg.connInfo.handle = pMsg->handle; - qwMsg.connInfo.ahandle = pMsg->ahandle; - qwMsg.connInfo.refId = pMsg->refId; + qwMsg.connInfo.handle = pMsg->info.handle; + qwMsg.connInfo.ahandle = pMsg->info.ahandle; + qwMsg.connInfo.refId = pMsg->info.refId; - QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->handle); + QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle); QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg)); @@ -453,11 +453,11 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int64_t rId = 0; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; - qwMsg.connInfo.handle = pMsg->handle; - qwMsg.connInfo.ahandle = pMsg->ahandle; - qwMsg.connInfo.refId = pMsg->refId; + qwMsg.connInfo.handle = pMsg->info.handle; + qwMsg.connInfo.ahandle = pMsg->info.ahandle; + qwMsg.connInfo.refId = pMsg->info.refId; - QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->handle); + QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->info.handle); QW_ERR_RET(qwProcessReady(QW_FPARAMS(), &qwMsg)); @@ -516,11 +516,11 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int64_t rId = 0; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; - qwMsg.connInfo.handle = pMsg->handle; - qwMsg.connInfo.ahandle = pMsg->ahandle; - qwMsg.connInfo.refId = pMsg->refId; + qwMsg.connInfo.handle = pMsg->info.handle; + qwMsg.connInfo.ahandle = pMsg->info.ahandle; + qwMsg.connInfo.refId = pMsg->info.refId; - QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->handle); + QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle); QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg)); @@ -558,9 +558,9 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int64_t rId = msg->refId; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; - qwMsg.connInfo.handle = pMsg->handle; - qwMsg.connInfo.ahandle = pMsg->ahandle; - qwMsg.connInfo.refId = pMsg->refId; + qwMsg.connInfo.handle = pMsg->info.handle; + qwMsg.connInfo.ahandle = pMsg->info.ahandle; + qwMsg.connInfo.refId = pMsg->info.refId; // QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); @@ -597,15 +597,15 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int64_t rId = msg->refId; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code}; - qwMsg.connInfo.handle = pMsg->handle; - qwMsg.connInfo.ahandle = pMsg->ahandle; - qwMsg.connInfo.refId = pMsg->refId; + qwMsg.connInfo.handle = pMsg->info.handle; + qwMsg.connInfo.ahandle = pMsg->info.ahandle; + qwMsg.connInfo.refId = pMsg->info.refId; if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) { QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code)); } - QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->handle); + QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle); QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg)); @@ -636,15 +636,15 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t sId = req.sId; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code}; - qwMsg.connInfo.handle = pMsg->handle; - qwMsg.connInfo.ahandle = pMsg->ahandle; - qwMsg.connInfo.refId = pMsg->refId; + qwMsg.connInfo.handle = pMsg->info.handle; + qwMsg.connInfo.ahandle = pMsg->info.ahandle; + qwMsg.connInfo.refId = pMsg->info.refId; if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) { QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code)); } - QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle); + QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle); QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req)); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 08093c8b184a47fa5e65d77ce4f5a56fe5d86ff6..82b4a6a466e395fa21966f28aedb172217a6a072 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -75,7 +75,7 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg->contLen = tlen; pMsg->code = 0; pMsg->msgType = pTask->dispatchMsgType; - pMsg->noResp = 1; + pMsg->info.noResp = 1; return 0; } diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index a3d1717c3bcabb22f910a821755c046fd7dcd6b0..e0551132773f52dc1502c15b8d975b5e15303118 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -80,8 +80,8 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { syncUtilMsgHtoN(pMsg->pCont); } - pMsg->handle = NULL; - pMsg->noResp = 1; + pMsg->info.handle = NULL; + pMsg->info.noResp = 1; rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); return ret; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index da23d8415bdc6f482f3db915487d8029954e96e2..5bc7d04dc9789a0a5297d8cd41b8b11f09483325 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -119,7 +119,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { char *configChange = syncCfg2Str((SSyncCfg*)pSyncCfg); SRpcMsg rpcMsg = {0}; rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE; - rpcMsg.noResp = 1; + rpcMsg.info.noResp = 1; rpcMsg.contLen = strlen(configChange) + 1; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", configChange); @@ -667,7 +667,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); if (pSyncNode->FpSendMsg != NULL) { - pMsg->noResp = 1; + pMsg->info.noResp = 1; // htonl syncUtilMsgHtoN(pMsg->pCont); @@ -682,7 +682,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S SEpSet epSet; syncUtilnodeInfo2EpSet(nodeInfo, &epSet); if (pSyncNode->FpSendMsg != NULL) { - pMsg->noResp = 1; + pMsg->info.noResp = 1; // htonl syncUtilMsgHtoN(pMsg->pCont); diff --git a/source/libs/sync/test/syncRespMgrTest.cpp b/source/libs/sync/test/syncRespMgrTest.cpp index d22c37d75df081bbb17ce8c47af07e5864562540..495b82bed746567b1eaaad453ef5dc4ce829badd 100644 --- a/source/libs/sync/test/syncRespMgrTest.cpp +++ b/source/libs/sync/test/syncRespMgrTest.cpp @@ -20,8 +20,8 @@ void syncRespMgrInsert(uint64_t count) { memset(&stub, 0, sizeof(SRespStub)); stub.createTime = taosGetTimestampMs(); stub.rpcMsg.code = (pMgr->seqNum + 1); - stub.rpcMsg.ahandle = (void *)(200 + i); - stub.rpcMsg.handle = (void *)(300 + i); + stub.rpcMsg.info.ahandle = (void *)(200 + i); + stub.rpcMsg.info.handle = (void *)(300 + i); uint64_t ret = syncRespMgrAdd(pMgr, &stub); printf("insert %lu \n", ret); } @@ -36,7 +36,7 @@ void syncRespMgrDelTest(uint64_t begin, uint64_t end) { void printStub(SRespStub *p) { printf("createTime:%ld, rpcMsg.code:%d rpcMsg.ahandle:%ld rpcMsg.handle:%ld \n", p->createTime, p->rpcMsg.code, - (int64_t)(p->rpcMsg.ahandle), (int64_t)(p->rpcMsg.handle)); + (int64_t)(p->rpcMsg.info.ahandle), (int64_t)(p->rpcMsg.info.handle)); } void syncRespMgrPrint() { printf("\n----------------syncRespMgrPrint--------------\n"); diff --git a/source/libs/transport/test/syncClient.c b/source/libs/transport/test/syncClient.c index 3f1a7805b43d7d4d18c065e59338cd3b6b648709..801aa0fd74bd1bbb61020be51efe2f0170ac1d24 100644 --- a/source/libs/transport/test/syncClient.c +++ b/source/libs/transport/test/syncClient.c @@ -32,7 +32,7 @@ typedef struct { void * pRpc; } SInfo; static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - SInfo *pInfo = (SInfo *)pMsg->ahandle; + SInfo *pInfo = (SInfo *)pMsg->info.ahandle; tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); @@ -61,7 +61,7 @@ static void *sendRequest(void *param) { pInfo->num++; rpcMsg.pCont = rpcMallocCont(pInfo->msgSize); rpcMsg.contLen = pInfo->msgSize; - rpcMsg.ahandle = pInfo; + rpcMsg.info.ahandle = pInfo; rpcMsg.msgType = 1; // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); int64_t start = taosGetTimestampUs();