提交 fc94b71c 编写于 作者: S Shengliang Guan

refactor: adjust SRpcMsg

上级 96b7f269
......@@ -583,8 +583,8 @@ bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
assert(pMsg->ahandle != NULL);
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
assert(pMsg->info.ahandle != NULL);
if (pSendInfo->requestObjRefId != 0) {
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
......@@ -615,7 +615,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
}
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle};
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle};
if (pMsg->contLen > 0) {
buf.pData = taosMemoryCalloc(1, pMsg->contLen);
......@@ -956,7 +956,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
void* clientRpc = NULL;
SServerStatusRsp statusRsp = {0};
SEpSet epSet = {.inUse = 0, .numOfEps = 1};
SRpcMsg rpcMsg = {.ahandle = (void*)0x9526, .msgType = TDMT_DND_SERVER_STATUS};
SRpcMsg rpcMsg = {.info.ahandle = (void*)0x9526, .msgType = TDMT_DND_SERVER_STATUS};
SRpcMsg rpcRsp = {0};
SRpcInit rpcInit = {0};
char pass[TSDB_PASSWORD_LEN + 1] = {0};
......
......@@ -36,15 +36,15 @@ typedef struct SBnodeMgmt {
// bmHandle.c
SArray *bmGetMsgHandles();
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg);
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq);
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq);
// bmWorker.c
int32_t bmStartWorker(SBnodeMgmt *pMgmt);
void bmStopWorker(SBnodeMgmt *pMgmt);
int32_t bmPutNodeMsgToWriteQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t bmPutNodeMsgToMonitorQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t bmPutNodeMsgToWriteQueue(SBnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t bmPutNodeMsgToMonitorQueue(SBnodeMgmt *pMgmt, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -18,7 +18,7 @@
static void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SNodeMsg *pReq) {
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonBmInfo bmInfo = {0};
bmGetMonitorInfo(pMgmt, &bmInfo);
dmGetMonitorSystemInfo(&bmInfo.sys);
......@@ -37,14 +37,14 @@ int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SNodeMsg *pReq) {
}
tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
tFreeSMonBmInfo(&bmInfo);
return 0;
}
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateBnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
......@@ -67,8 +67,8 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
return 0;
}
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropBnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
......
......@@ -16,23 +16,23 @@
#define _DEFAULT_SOURCE
#include "bmInt.h"
static void bmSendErrorRsp(SNodeMsg *pMsg, int32_t code) {
static void bmSendErrorRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.info.handle = pMsg->info.handle,
.info.ahandle = pMsg->info.ahandle,
.code = code,
.refId = pMsg->rpcMsg.refId,
.info.refId = pMsg->info.refId,
};
tmsgSendRsp(&rpcRsp);
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) {
for (int32_t i = 0; i < numOfMsgs; ++i) {
SNodeMsg *pMsg = NULL;
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
if (pMsg != NULL) {
bmSendErrorRsp(pMsg, code);
......@@ -40,24 +40,24 @@ static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) {
}
}
static inline void bmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
static inline void bmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.info.handle = pMsg->info.handle,
.info.ahandle = pMsg->info.ahandle,
.info.refId = pMsg->info.refId,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen};
.pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen,};
tmsgSendRsp(&rsp);
}
static void bmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SBnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from bnode-monitor queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
SRpcMsg *pRpc = pMsg;
int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_MON_BM_INFO) {
if (pMsg->msgType == TDMT_MON_BM_INFO) {
code = bmProcessGetMonBmInfoReq(pMgmt, pMsg);
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
......@@ -76,14 +76,14 @@ static void bmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SBnodeMgmt *pMgmt = pInfo->ahandle;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
if (pArray == NULL) {
bmSendErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
return;
}
for (int32_t i = 0; i < numOfMsgs; ++i) {
SNodeMsg *pMsg = NULL;
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
if (pMsg != NULL) {
dTrace("msg:%p, get from bnode-write queue", pMsg);
......@@ -96,17 +96,17 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
bndProcessWMsgs(pMgmt->pBnode, pArray);
for (size_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
if (pMsg != NULL) {
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
taosArrayDestroy(pArray);
}
int32_t bmPutNodeMsgToWriteQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t bmPutNodeMsgToWriteQueue(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMultiWorker *pWorker = &pMgmt->writeWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
......@@ -114,7 +114,7 @@ int32_t bmPutNodeMsgToWriteQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return 0;
}
int32_t bmPutNodeMsgToMonitorQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t bmPutNodeMsgToMonitorQueue(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
......
......@@ -39,10 +39,10 @@ typedef struct SDnodeMgmt {
// dmHandle.c
SArray *dmGetMsgHandles();
void dmSendStatusReq(SDnodeMgmt *pMgmt);
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
// dmMonitor.c
void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo);
......@@ -50,7 +50,7 @@ void dmGetMnodeLoads(SDnodeMgmt *pMgmt, SMonMloadInfo *pInfo);
void dmSendMonitorReport(SDnodeMgmt *pMgmt);
// dmWorker.c
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt);
void dmStopStatusThread(SDnodeMgmt *pMgmt);
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt);
......
......@@ -83,29 +83,25 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
tSerializeSStatusReq(pHead, contLen, &req);
tFreeSStatusReq(&req);
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)0x9527};
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527};
SRpcMsg rpcRsp = {0};
dTrace("send status msg to mnode, app:%p", rpcMsg.ahandle);
dTrace("send status msg to mnode, app:%p", rpcMsg.info.ahandle);
tmsgSendMnodeRecv(&rpcMsg, &rpcRsp);
dmProcessStatusRsp(pMgmt, &rpcRsp);
}
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pRsp = &pMsg->rpcMsg;
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dError("auth rsp is received, but not supported yet");
return 0;
}
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pRsp = &pMsg->rpcMsg;
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dError("grant rsp is received, but not supported yet");
return 0;
}
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SDCfgDnodeReq *pCfg = pReq->pCont;
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dError("config req is received, but not supported yet");
return TSDB_CODE_OPS_NOT_SUPPORT;
}
......@@ -139,12 +135,12 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
taosArrayDestroy(vinfo.pVloads);
}
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dDebug("server run status req is received");
SServerStatusRsp statusRsp = {0};
dmGetServerRunStatus(pMgmt, &statusRsp);
SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .refId = pMsg->rpcMsg.refId};
SRpcMsg rspMsg = {.info.handle = pMsg->info.handle, .info.ahandle = pMsg->info.ahandle, .info.refId = pMsg->info.refId};
int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
if (rspLen < 0) {
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -158,8 +154,8 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
pMsg->pRsp = pRsp;
pMsg->rspLen = rspLen;
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspLen;
return 0;
}
......
......@@ -98,10 +98,10 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
}
}
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SDnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
tmsg_t msgType = pMsg->msgType;
bool isRequest = msgType & 1u;
dTrace("msg:%p, will be processed in dnode-mgmt queue, type:%s", pMsg, TMSG_INFO(msgType));
......@@ -150,18 +150,18 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
if (isRequest) {
if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rsp = {
.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.info.handle = pMsg->info.handle,
.info.ahandle = pMsg->info.ahandle,
.code = code,
.refId = pMsg->rpcMsg.refId,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen,
.info.refId = pMsg->info.refId,
.pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen,
};
rpcSendResponse(&rsp);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......@@ -187,7 +187,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
dDebug("dnode workers are closed");
}
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
......
......@@ -48,20 +48,20 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq);
// mmHandle.c
SArray *mmGetMsgHandles();
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg);
int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq);
// mmWorker.c
int32_t mmStartWorker(SMnodeMgmt *pMgmt);
void mmStopWorker(SMnodeMgmt *pMgmt);
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
......
......@@ -20,7 +20,7 @@ static void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *mmInfo) {
mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant);
}
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) {
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonMmInfo mmInfo = {0};
mmGetMonitorInfo(pMgmt, &mmInfo);
dmGetMonitorSystemInfo(&mmInfo.sys);
......@@ -39,8 +39,8 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) {
}
tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
tFreeSMonMmInfo(&mmInfo);
return 0;
}
......@@ -50,7 +50,7 @@ static void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
mndGetLoad(pMgmt->pMnode, &pInfo->load);
}
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) {
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonMloadInfo mloads = {0};
mmGetMnodeLoads(pMgmt, &mloads);
......@@ -67,13 +67,13 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) {
}
tSerializeSMonMloadInfo(pRsp, rspLen, &mloads);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
return 0;
}
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateMnodeReq createReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
......@@ -100,8 +100,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
return 0;
}
int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropMnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
......@@ -124,8 +124,8 @@ int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return 0;
}
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDAlterMnodeReq alterReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
......
......@@ -16,26 +16,22 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) {
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = {
.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen,
.info = pMsg->info,
.pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen,
};
tmsgSendRsp(&rsp);
}
static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
bool isRequest = msgType & 1U;
dTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(msgType));
dTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
switch (msgType) {
switch (pMsg->msgType) {
case TDMT_DND_ALTER_MNODE:
code = mmProcessAlterReq(pMgmt, pMsg);
break;
......@@ -46,69 +42,67 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
code = mmProcessGetLoadsReq(pMgmt, pMsg);
break;
default:
pMsg->pNode = pMgmt->pMnode;
pMsg->info.node = pMgmt->pMnode;
code = mndProcessMsg(pMsg);
}
if (isRequest) {
if (pMsg->rpcMsg.handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0 && terrno != 0) code = terrno;
mmSendRsp(pMsg, code);
}
if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0 && terrno != 0) code = terrno;
mmSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void mmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
tmsg_t msgType = pMsg->msgType;
bool isRequest = msgType & 1U;
dTrace("msg:%p, get from mnode-query queue", pMsg);
pMsg->pNode = pMgmt->pMnode;
pMsg->info.node = pMgmt->pMnode;
code = mndProcessMsg(pMsg);
if (isRequest) {
if (pMsg->rpcMsg.handle != NULL && code != 0) {
if (pMsg->info.handle != NULL && code != 0) {
if (code != 0 && terrno != 0) code = terrno;
mmSendRsp(pMsg, code);
}
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->rpcMsg.msgType));
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); }
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); }
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); }
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); }
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); }
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); }
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
}
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg);
}
static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) return -1;
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
pMsg->rpcMsg = *pRpc;
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
......
......@@ -37,9 +37,9 @@ typedef struct SQnodeMgmt {
// qmHandle.c
SArray *qmGetMsgHandles();
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg);
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq);
// qmWorker.c
int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
......@@ -48,9 +48,9 @@ int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype);
int32_t qmStartWorker(SQnodeMgmt *pMgmt);
void qmStopWorker(SQnodeMgmt *pMgmt);
int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -18,7 +18,7 @@
static void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SNodeMsg *pReq) {
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonQmInfo qmInfo = {0};
qmGetMonitorInfo(pMgmt, &qmInfo);
dmGetMonitorSystemInfo(&qmInfo.sys);
......@@ -37,14 +37,14 @@ int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SNodeMsg *pReq) {
}
tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
tFreeSMonQmInfo(&qmInfo);
return 0;
}
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateQnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
......@@ -67,8 +67,8 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
return 0;
}
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropQnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
......
......@@ -16,26 +16,26 @@
#define _DEFAULT_SOURCE
#include "qmInt.h"
static inline void qmSendRsp(SNodeMsg *pMsg, int32_t code) {
static inline void qmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = {
.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.info.handle = pMsg->info.handle,
.info.ahandle = pMsg->info.ahandle,
.info.refId = pMsg->info.refId,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen,
.pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen,
};
tmsgSendRsp(&rsp);
}
static void qmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void qmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SQnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from qnode-monitor queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
SRpcMsg *pRpc = pMsg;
int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_MON_QM_INFO) {
if (pMsg->msgType == TDMT_MON_QM_INFO) {
code = qmProcessGetMonitorInfoReq(pMgmt, pMsg);
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
......@@ -51,11 +51,11 @@ static void qmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
taosFreeQitem(pMsg);
}
static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void qmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SQnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from qnode-query queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
SRpcMsg *pRpc = pMsg;
int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pRpc);
if (pRpc->msgType & 1U && code != 0) {
......@@ -63,15 +63,15 @@ static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void qmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SQnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from qnode-fetch queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
SRpcMsg *pRpc = pMsg;
int32_t code = qndProcessFetchMsg(pMgmt->pQnode, pRpc);
if (pRpc->msgType & 1U && code != 0) {
......@@ -79,36 +79,36 @@ static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) {
static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return qmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
}
int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return qmPutNodeMsgToWorker(&pMgmt->fetchWorker, pMsg);
}
int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return qmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg);
}
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) {
return -1;
}
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
pMsg->rpcMsg = *pRpc;
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
......
......@@ -39,18 +39,18 @@ typedef struct SSnodeMgmt {
// smHandle.c
SArray *smGetMsgHandles();
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg);
int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq);
// smWorker.c
int32_t smStartWorker(SSnodeMgmt *pMgmt);
void smStopWorker(SSnodeMgmt *pMgmt);
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -18,7 +18,7 @@
static void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq) {
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonSmInfo smInfo = {0};
smGetMonitorInfo(pMgmt, &smInfo);
dmGetMonitorSystemInfo(&smInfo.sys);
......@@ -37,14 +37,14 @@ int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq) {
}
tSerializeSMonSmInfo(pRsp, rspLen, &smInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
tFreeSMonSmInfo(&smInfo);
return 0;
}
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateSnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
......@@ -67,8 +67,8 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SNodeMsg *pMsg) {
return 0;
}
int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropSnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
......
......@@ -16,26 +16,26 @@
#define _DEFAULT_SOURCE
#include "smInt.h"
static inline void smSendRsp(SNodeMsg *pMsg, int32_t code) {
static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = {
.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.info.handle = pMsg->info.handle,
.info.ahandle = pMsg->info.ahandle,
.info.refId = pMsg->info.refId,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen,
.pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen,
};
tmsgSendRsp(&rsp);
}
static void smProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SSnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from snode-monitor queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
SRpcMsg *pRpc = pMsg;
int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) {
if (pMsg->msgType == TDMT_MON_SM_INFO) {
code = smProcessGetMonitorInfoReq(pMgmt, pMsg);
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
......@@ -55,26 +55,26 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num
SSnodeMgmt *pMgmt = pInfo->ahandle;
for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = NULL;
SRpcMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, get from snode-unique queue", pMsg);
sndProcessUMsg(pMgmt->pSnode, &pMsg->rpcMsg);
sndProcessUMsg(pMgmt->pSnode, pMsg);
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void smProcessSharedQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SSnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from snode-shared queue", pMsg);
sndProcessSMsg(pMgmt->pSnode, &pMsg->rpcMsg);
sndProcessSMsg(pMgmt->pSnode, pMsg);
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......@@ -161,7 +161,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
return 0;
}
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -173,7 +173,7 @@ int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return 0;
}
int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
......@@ -181,8 +181,8 @@ int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return 0;
}
int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg);
int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t index = smGetSWIdFromMsg(pMsg);
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -194,7 +194,7 @@ int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return 0;
}
int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->sharedWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
......@@ -202,8 +202,8 @@ int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return 0;
}
int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg);
int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t workerType = smGetSWTypeFromMsg(pMsg);
if (workerType == SND_WORKER_TYPE__SHARED) {
return smPutNodeMsgToSharedQueue(pMgmt, pMsg);
} else {
......
......@@ -84,10 +84,10 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
// vmHandle.c
SArray *vmGetMsgHandles();
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq);
// vmFile.c
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
......@@ -108,13 +108,13 @@ int32_t vmPutRpcMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutRpcMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype);
int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -82,7 +82,7 @@ static void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
taosArrayDestroy(pVloads);
}
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq) {
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonVmInfo vmInfo = {0};
vmGetMonitorInfo(pMgmt, &vmInfo);
dmGetMonitorSystemInfo(&vmInfo.sys);
......@@ -101,13 +101,13 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq) {
}
tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
tFreeSMonVmInfo(&vmInfo);
return 0;
}
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq) {
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pMgmt, &vloads);
......@@ -124,8 +124,8 @@ int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SNodeMsg *pReq) {
}
tSerializeSMonVloadInfo(pRsp, rspLen, &vloads);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
pReq->info.rsp = pRsp;
pReq->info.rspLen = rspLen;
tFreeSMonVloadInfo(&vloads);
return 0;
}
......@@ -173,8 +173,8 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW
snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
}
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SCreateVnodeReq createReq = {0};
int32_t code = -1;
char path[TSDB_FILENAME_LEN] = {0};
......@@ -241,8 +241,8 @@ _OVER:
return code;
}
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDropVnodeReq dropReq = {0};
if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......
......@@ -19,24 +19,24 @@
#include "sync.h"
#include "syncTools.h"
static inline void vmSendRsp(SNodeMsg *pMsg, int32_t code) {
static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = {
.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.refId = pMsg->rpcMsg.refId,
.info.handle = pMsg->info.handle,
.info.ahandle = pMsg->info.ahandle,
.info.refId = pMsg->info.refId,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen,
.pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen,
};
tmsgSendRsp(&rsp);
}
static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, will be processed in vnode-mgmt/monitor queue", pMsg);
tmsg_t msgType = pMsg->msgType;
dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(msgType));
switch (msgType) {
case TDMT_MON_VM_INFO:
......@@ -62,36 +62,36 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void vmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle;
dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, pMsg);
if (code != 0) {
if (terrno != 0) code = terrno;
vmSendRsp(pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SVnodeObj *pVnode = pInfo->ahandle;
dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
if (terrno != 0) code = terrno;
vmSendRsp(pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
......@@ -100,14 +100,14 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg rsp;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
if (pArray == NULL) {
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
return;
}
for (int32_t i = 0; i < numOfMsgs; ++i) {
SNodeMsg *pMsg = NULL;
SRpcMsg *pMsg = NULL;
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
dTrace("msg:%p, will be processed in vnode-write queue", pMsg);
......@@ -118,15 +118,15 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
}
for (int i = 0; i < taosArrayGetSize(pArray); i++) {
SNodeMsg *pMsg;
SRpcMsg *pMsg;
SRpcMsg *pRpc;
pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
pRpc = &pMsg->rpcMsg;
pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
pRpc = pMsg;
rsp.ahandle = pRpc->ahandle;
rsp.handle = pRpc->handle;
rsp.refId = pRpc->refId;
rsp.info.ahandle = pRpc->info.ahandle;
rsp.info.handle = pRpc->info.handle;
rsp.info.refId = pRpc->info.refId;
rsp.pCont = NULL;
rsp.contLen = 0;
......@@ -153,9 +153,9 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
}
for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......@@ -164,7 +164,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL;
SRpcMsg *pMsg = NULL;
SRpcMsg rsp;
for (int32_t i = 0; i < numOfMsgs; ++i) {
......@@ -176,8 +176,8 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rsp.contLen = 0;
// get original rpc msg
assert(pMsg->rpcMsg.msgType == TDMT_VND_SYNC_APPLY_MSG);
SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(&pMsg->rpcMsg);
assert(pMsg->msgType == TDMT_VND_SYNC_APPLY_MSG);
SyncApplyMsg *pSyncApplyMsg = syncApplyMsgFromRpcMsg2(pMsg);
syncApplyMsgLog2("==vmProcessApplyQueue==", pSyncApplyMsg);
SRpcMsg originalRpcMsg;
syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg);
......@@ -192,56 +192,56 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rpcFreeCont(originalRpcMsg.pCont);
// if leader, send response
if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) {
rsp.ahandle = pMsg->rpcMsg.ahandle;
rsp.handle = pMsg->rpcMsg.handle;
rsp.refId = pMsg->rpcMsg.refId;
if (pMsg->info.handle != NULL && pMsg->info.ahandle != NULL) {
rsp.info.ahandle = pMsg->info.ahandle;
rsp.info.handle = pMsg->info.handle;
rsp.info.refId = pMsg->info.refId;
tmsgSendRsp(&rsp);
}
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL;
SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg);
// todo
SRpcMsg *pRsp = NULL;
(void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
(void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp);
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL;
SRpcMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, will be processed in vnode-merge queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg, pInfo);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
if (terrno != 0) code = terrno;
vmSendRsp(pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
}
static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
SRpcMsg *pRpc = &pMsg->rpcMsg;
static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
SRpcMsg *pRpc = pMsg;
SMsgHead *pHead = pRpc->pCont;
int32_t code = 0;
......@@ -285,34 +285,34 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg, EQueueType
return code;
}
int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t vmPutNodeMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE);
}
int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t vmPutNodeMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE);
}
int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t vmPutNodeMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE);
}
int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t vmPutNodeMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE);
}
int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t vmPutNodeMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return vmPutNodeMsgToQueue(pMgmt, pMsg, MERGE_QUEUE);
}
int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be put into vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, will be put into vnode-monitor queue, worker:%s", pMsg, pWorker->name);
......@@ -326,13 +326,13 @@ static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType q
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1;
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM);
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
int32_t code = 0;
if (pMsg != NULL) {
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
pMsg->rpcMsg = *pRpc;
// if (pMsg->rpcMsg.handle != NULL) assert(pMsg->rpcMsg.refId != 0);
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
// if (pMsg->handle != NULL) assert(pMsg->refId != 0);
switch (qtype) {
case WRITE_QUEUE:
dTrace("msg:%p, will be put into vnode-write queue", pMsg);
......
......@@ -107,9 +107,9 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
_exit:
rpcMsg.handle = pMsg->handle;
rpcMsg.ahandle = pMsg->ahandle;
rpcMsg.refId = pMsg->refId;
rpcMsg.info.handle = pMsg->info.handle;
rpcMsg.info.ahandle = pMsg->info.ahandle;
rpcMsg.info.refId = pMsg->info.refId;
rpcMsg.pCont = pRsp;
rpcMsg.contLen = rspLen;
rpcMsg.code = code;
......
......@@ -25,13 +25,13 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
#if 0
SNodeMsg *pMsg;
SRpcMsg *pMsg;
SRpcMsg *pRpc;
*version = pVnode->state.processed;
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i);
pRpc = &pMsg->rpcMsg;
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
pRpc = pMsg;
// set request version
if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
......
......@@ -72,7 +72,7 @@ int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t ret = 0;
SMsgCb *pMsgCb = rpcHandle;
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
pMsg->noResp = 1;
pMsg->info.noResp = 1;
tmsgSendReq(rpcHandle, pEpSet, pMsg);
} else {
vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
......@@ -127,12 +127,12 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
SRpcMsg saveRpcMsg;
int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg);
if (ret == 1 && cbMeta.state == TAOS_SYNC_STATE_LEADER) {
applyMsg.handle = saveRpcMsg.handle;
applyMsg.ahandle = saveRpcMsg.ahandle;
applyMsg.refId = saveRpcMsg.refId;
applyMsg.info.handle = saveRpcMsg.info.handle;
applyMsg.info.ahandle = saveRpcMsg.info.ahandle;
applyMsg.info.refId = saveRpcMsg.info.refId;
} else {
applyMsg.handle = NULL;
applyMsg.ahandle = NULL;
applyMsg.info.handle = NULL;
applyMsg.info.ahandle = NULL;
}
// put to applyQ
......
......@@ -2826,8 +2826,8 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
}
void qProcessFetchRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
assert(pMsg->ahandle != NULL);
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
assert(pMsg->info.ahandle != NULL);
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
......
......@@ -149,9 +149,9 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
SRpcMsg rpcMsg = {.msgType = pInfo->msgType,
.pCont = pMsg,
.contLen = pInfo->msgInfo.len,
.ahandle = (void*)pInfo,
.handle = pInfo->msgInfo.handle,
.persistHandle = persistHandle,
.info.ahandle = (void*)pInfo,
.info.handle = pInfo->msgInfo.handle,
.info.persistHandle = persistHandle,
.code = 0};
assert(pInfo->fp != NULL);
......
......@@ -52,9 +52,9 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_RSP,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.info.handle = pConn->handle,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.pCont = msg,
.contLen = contLen,
.code = code,
......@@ -71,9 +71,9 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_RES_READY_RSP,
.handle = pConn->handle,
.refId = pConn->refId,
.ahandle = NULL,
.info.handle = pConn->handle,
.info.refId = pConn->refId,
.info.ahandle = NULL,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
......@@ -93,9 +93,9 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_EXPLAIN_RSP,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.info.handle = pConn->handle,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.pCont = pRsp,
.contLen = contLen,
.code = 0,
......@@ -113,9 +113,9 @@ int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.info.handle = pConn->handle,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.pCont = pRsp,
.contLen = contLen,
.code = code,
......@@ -135,9 +135,9 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_FETCH_RSP,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.info.handle = pConn->handle,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength,
.code = code,
......@@ -154,9 +154,9 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_CANCEL_TASK_RSP,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.info.handle = pConn->handle,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
......@@ -172,9 +172,9 @@ int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_DROP_TASK_RSP,
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.info.handle = pConn->handle,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
......@@ -228,9 +228,9 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
tSerializeSShowRsp(pBuf, bufLen, &showRsp);
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.refId = pMsg->refId,
.info.handle = pMsg->info.handle,
.info.ahandle = pMsg->info.ahandle,
.info.refId = pMsg->info.refId,
.pCont = pBuf,
.contLen = bufLen,
.code = code,
......@@ -246,9 +246,9 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchRe
pRsp->numOfRows = 0;
SRpcMsg rpcMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.refId = pMsg->refId,
.info.handle = pMsg->info.handle,
.info.ahandle = pMsg->info.ahandle,
.info.refId = pMsg->info.refId,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = 0,
......@@ -271,10 +271,10 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
req->taskId = tId;
SRpcMsg pNewMsg = {
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.info.handle = pConn->handle,
.info.ahandle = pConn->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.refId = pConn->refId,
.info.refId = pConn->refId,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
......@@ -306,9 +306,9 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
req->refId = htobe64(rId);
SRpcMsg pMsg = {
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.info.handle = pConn->handle,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.msgType = TDMT_VND_DROP_TASK,
.pCont = req,
.contLen = sizeof(STaskDropReq),
......@@ -342,9 +342,9 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SQWConnInfo *pCo
}
SRpcMsg pMsg = {
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.refId = pConn->refId,
.info.handle = pConn->handle,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.msgType = TDMT_VND_QUERY_HEARTBEAT,
.pCont = msg,
.contLen = msgSize,
......@@ -383,12 +383,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
char *sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql);
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql);
taosMemoryFreeClear(sql);
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain));
......@@ -418,11 +418,11 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->handle);
QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessCQuery(QW_FPARAMS(), &qwMsg));
......@@ -453,11 +453,11 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->handle);
QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessReady(QW_FPARAMS(), &qwMsg));
......@@ -516,11 +516,11 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->handle);
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg));
......@@ -558,9 +558,9 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
// QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
......@@ -597,15 +597,15 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
}
QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->handle);
QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg));
......@@ -636,15 +636,15 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t sId = req.sId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle;
qwMsg.connInfo.refId = pMsg->refId;
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
}
QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle);
QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle);
QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));
......
......@@ -75,7 +75,7 @@ static int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg*
pMsg->contLen = tlen;
pMsg->code = 0;
pMsg->msgType = pTask->dispatchMsgType;
pMsg->noResp = 1;
pMsg->info.noResp = 1;
return 0;
}
......
......@@ -80,8 +80,8 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
syncUtilMsgHtoN(pMsg->pCont);
}
pMsg->handle = NULL;
pMsg->noResp = 1;
pMsg->info.handle = NULL;
pMsg->info.noResp = 1;
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
return ret;
}
......
......@@ -119,7 +119,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
char *configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_VND_SYNC_CONFIG_CHANGE;
rpcMsg.noResp = 1;
rpcMsg.info.noResp = 1;
rpcMsg.contLen = strlen(configChange) + 1;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", configChange);
......@@ -667,7 +667,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
if (pSyncNode->FpSendMsg != NULL) {
pMsg->noResp = 1;
pMsg->info.noResp = 1;
// htonl
syncUtilMsgHtoN(pMsg->pCont);
......@@ -682,7 +682,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
if (pSyncNode->FpSendMsg != NULL) {
pMsg->noResp = 1;
pMsg->info.noResp = 1;
// htonl
syncUtilMsgHtoN(pMsg->pCont);
......
......@@ -20,8 +20,8 @@ void syncRespMgrInsert(uint64_t count) {
memset(&stub, 0, sizeof(SRespStub));
stub.createTime = taosGetTimestampMs();
stub.rpcMsg.code = (pMgr->seqNum + 1);
stub.rpcMsg.ahandle = (void *)(200 + i);
stub.rpcMsg.handle = (void *)(300 + i);
stub.rpcMsg.info.ahandle = (void *)(200 + i);
stub.rpcMsg.info.handle = (void *)(300 + i);
uint64_t ret = syncRespMgrAdd(pMgr, &stub);
printf("insert %lu \n", ret);
}
......@@ -36,7 +36,7 @@ void syncRespMgrDelTest(uint64_t begin, uint64_t end) {
void printStub(SRespStub *p) {
printf("createTime:%ld, rpcMsg.code:%d rpcMsg.ahandle:%ld rpcMsg.handle:%ld \n", p->createTime, p->rpcMsg.code,
(int64_t)(p->rpcMsg.ahandle), (int64_t)(p->rpcMsg.handle));
(int64_t)(p->rpcMsg.info.ahandle), (int64_t)(p->rpcMsg.info.handle));
}
void syncRespMgrPrint() {
printf("\n----------------syncRespMgrPrint--------------\n");
......
......@@ -32,7 +32,7 @@ typedef struct {
void * pRpc;
} SInfo;
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SInfo *pInfo = (SInfo *)pMsg->ahandle;
SInfo *pInfo = (SInfo *)pMsg->info.ahandle;
tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
pMsg->code);
......@@ -61,7 +61,7 @@ static void *sendRequest(void *param) {
pInfo->num++;
rpcMsg.pCont = rpcMallocCont(pInfo->msgSize);
rpcMsg.contLen = pInfo->msgSize;
rpcMsg.ahandle = pInfo;
rpcMsg.info.ahandle = pInfo;
rpcMsg.msgType = 1;
// tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num);
int64_t start = taosGetTimestampUs();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册