未验证 提交 7982dc96 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12705 from taosdata/fix/dnode

refactor: adjust msgcb
...@@ -36,9 +36,9 @@ typedef struct SBnodeMgmt { ...@@ -36,9 +36,9 @@ typedef struct SBnodeMgmt {
// bmHandle.c // bmHandle.c
SArray *bmGetMsgHandles(); SArray *bmGetMsgHandles();
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq); int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq); int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg);
// bmWorker.c // bmWorker.c
int32_t bmStartWorker(SBnodeMgmt *pMgmt); int32_t bmStartWorker(SBnodeMgmt *pMgmt);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {} void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonBmInfo bmInfo = {0}; SMonBmInfo bmInfo = {0};
bmGetMonitorInfo(pMgmt, &bmInfo); bmGetMonitorInfo(pMgmt, &bmInfo);
dmGetMonitorSystemInfo(&bmInfo.sys); dmGetMonitorSystemInfo(&bmInfo.sys);
...@@ -37,17 +37,15 @@ int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -37,17 +37,15 @@ int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo); tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
tFreeSMonBmInfo(&bmInfo); tFreeSMonBmInfo(&bmInfo);
return 0; return 0;
} }
int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateBnodeReq createReq = {0}; SDCreateBnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -68,10 +66,8 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -68,10 +66,8 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
} }
int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropBnodeReq dropReq = {0}; SDDropBnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
......
...@@ -41,18 +41,18 @@ typedef struct SMnodeMgmt { ...@@ -41,18 +41,18 @@ typedef struct SMnodeMgmt {
// mmFile.c // mmFile.c
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed); int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed); int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
// mmInt.c // mmInt.c
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq); int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg);
// mmHandle.c // mmHandle.c
SArray *mmGetMsgHandles(); SArray *mmGetMsgHandles();
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
// mmWorker.c // mmWorker.c
int32_t mmStartWorker(SMnodeMgmt *pMgmt); int32_t mmStartWorker(SMnodeMgmt *pMgmt);
...@@ -62,10 +62,10 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); ...@@ -62,10 +62,10 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -104,7 +104,7 @@ _OVER: ...@@ -104,7 +104,7 @@ _OVER:
return code; return code;
} }
int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) { int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
char file[PATH_MAX] = {0}; char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
...@@ -124,11 +124,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) { ...@@ -124,11 +124,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) {
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica); int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica);
for (int32_t i = 0; i < replica; ++i) { for (int32_t i = 0; i < replica; ++i) {
SReplica *pReplica = &pMgmt->replicas[i]; SReplica *pReplica = &pMgmt->replicas[i];
if (pReq != NULL) { if (pMsg != NULL) {
pReplica = &pReq->replicas[i]; pReplica = &pMsg->replicas[i];
} }
len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
......
...@@ -25,7 +25,7 @@ void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { ...@@ -25,7 +25,7 @@ void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
mndGetLoad(pMgmt->pMnode, &pInfo->load); mndGetLoad(pMgmt->pMnode, &pInfo->load);
} }
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonMmInfo mmInfo = {0}; SMonMmInfo mmInfo = {0};
mmGetMonitorInfo(pMgmt, &mmInfo); mmGetMonitorInfo(pMgmt, &mmInfo);
dmGetMonitorSystemInfo(&mmInfo.sys); dmGetMonitorSystemInfo(&mmInfo.sys);
...@@ -44,13 +44,13 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -44,13 +44,13 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo); tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
tFreeSMonMmInfo(&mmInfo); tFreeSMonMmInfo(&mmInfo);
return 0; return 0;
} }
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonMloadInfo mloads = {0}; SMonMloadInfo mloads = {0};
mmGetMnodeLoads(pMgmt, &mloads); mmGetMnodeLoads(pMgmt, &mloads);
...@@ -67,16 +67,14 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -67,16 +67,14 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonMloadInfo(pRsp, rspLen, &mloads); tSerializeSMonMloadInfo(pRsp, rspLen, &mloads);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
return 0; return 0;
} }
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateMnodeReq createReq = {0}; SDCreateMnodeReq createReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -101,10 +99,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -101,10 +99,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
} }
int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropMnodeReq dropReq = {0}; SDDropMnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -129,10 +125,8 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -129,10 +125,8 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
} }
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDAlterMnodeReq alterReq = {0}; SDAlterMnodeReq alterReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
......
...@@ -87,9 +87,9 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre ...@@ -87,9 +87,9 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
return 0; return 0;
} }
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) { int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) {
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) { if (mmBuildOptionFromReq(pMgmt, &option, pMsg) != 0) {
return -1; return -1;
} }
...@@ -98,7 +98,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) { ...@@ -98,7 +98,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
} }
bool deployed = true; bool deployed = true;
if (mmWriteFile(pMgmt, pReq, deployed) != 0) { if (mmWriteFile(pMgmt, pMsg, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr()); dError("failed to write mnode file since %s", terrstr());
return -1; return -1;
} }
...@@ -135,7 +135,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -135,7 +135,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue;
pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue; pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue;
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue;
pMgmt->msgCb.mgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
bool deployed = false; bool deployed = false;
......
...@@ -126,7 +126,7 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -126,7 +126,7 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg); return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg);
} }
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); } int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) { int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg qCfg = { SSingleWorkerCfg qCfg = {
......
...@@ -39,7 +39,7 @@ typedef struct SQnodeMgmt { ...@@ -39,7 +39,7 @@ typedef struct SQnodeMgmt {
SArray *qmGetMsgHandles(); SArray *qmGetMsgHandles();
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
// qmWorker.c // qmWorker.c
int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {} void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonQmInfo qmInfo = {0}; SMonQmInfo qmInfo = {0};
qmGetMonitorInfo(pMgmt, &qmInfo); qmGetMonitorInfo(pMgmt, &qmInfo);
dmGetMonitorSystemInfo(&qmInfo.sys); dmGetMonitorSystemInfo(&qmInfo.sys);
...@@ -37,17 +37,15 @@ int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -37,17 +37,15 @@ int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo); tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
tFreeSMonQmInfo(&qmInfo); tFreeSMonQmInfo(&qmInfo);
return 0; return 0;
} }
int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateQnodeReq createReq = {0}; SDCreateQnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -68,10 +66,8 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -68,10 +66,8 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
} }
int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropQnodeReq dropReq = {0}; SDDropQnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
......
...@@ -40,7 +40,7 @@ typedef struct SSnodeMgmt { ...@@ -40,7 +40,7 @@ typedef struct SSnodeMgmt {
SArray *smGetMsgHandles(); SArray *smGetMsgHandles();
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
// smWorker.c // smWorker.c
int32_t smStartWorker(SSnodeMgmt *pMgmt); int32_t smStartWorker(SSnodeMgmt *pMgmt);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {} void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonSmInfo smInfo = {0}; SMonSmInfo smInfo = {0};
smGetMonitorInfo(pMgmt, &smInfo); smGetMonitorInfo(pMgmt, &smInfo);
dmGetMonitorSystemInfo(&smInfo.sys); dmGetMonitorSystemInfo(&smInfo.sys);
...@@ -37,17 +37,15 @@ int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -37,17 +37,15 @@ int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonSmInfo(pRsp, rspLen, &smInfo); tSerializeSMonSmInfo(pRsp, rspLen, &smInfo);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
tFreeSMonSmInfo(&smInfo); tFreeSMonSmInfo(&smInfo);
return 0; return 0;
} }
int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDCreateSnodeReq createReq = {0}; SDCreateSnodeReq createReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -68,10 +66,8 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { ...@@ -68,10 +66,8 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
} }
int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDDropSnodeReq dropReq = {0}; SDDropSnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
......
...@@ -84,10 +84,10 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); ...@@ -84,10 +84,10 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
// vmHandle.c // vmHandle.c
SArray *vmGetMsgHandles(); SArray *vmGetMsgHandles();
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
// vmFile.c // vmFile.c
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
......
...@@ -82,7 +82,7 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { ...@@ -82,7 +82,7 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
taosArrayDestroy(pVloads); taosArrayDestroy(pVloads);
} }
int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonVmInfo vmInfo = {0}; SMonVmInfo vmInfo = {0};
vmGetMonitorInfo(pMgmt, &vmInfo); vmGetMonitorInfo(pMgmt, &vmInfo);
dmGetMonitorSystemInfo(&vmInfo.sys); dmGetMonitorSystemInfo(&vmInfo.sys);
...@@ -101,13 +101,13 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -101,13 +101,13 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo); tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
tFreeSMonVmInfo(&vmInfo); tFreeSMonVmInfo(&vmInfo);
return 0; return 0;
} }
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonVloadInfo vloads = {0}; SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pMgmt, &vloads); vmGetVnodeLoads(pMgmt, &vloads);
...@@ -124,8 +124,8 @@ int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { ...@@ -124,8 +124,8 @@ int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) {
} }
tSerializeSMonVloadInfo(pRsp, rspLen, &vloads); tSerializeSMonVloadInfo(pRsp, rspLen, &vloads);
pReq->info.rsp = pRsp; pMsg->info.rsp = pRsp;
pReq->info.rspLen = rspLen; pMsg->info.rspLen = rspLen;
tFreeSMonVloadInfo(&vloads); tFreeSMonVloadInfo(&vloads);
return 0; return 0;
} }
...@@ -174,12 +174,11 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW ...@@ -174,12 +174,11 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW
} }
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SCreateVnodeReq createReq = {0}; SCreateVnodeReq createReq = {0};
int32_t code = -1; int32_t code = -1;
char path[TSDB_FILENAME_LEN] = {0}; char path[TSDB_FILENAME_LEN] = {0};
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -242,9 +241,8 @@ _OVER: ...@@ -242,9 +241,8 @@ _OVER:
} }
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SRpcMsg *pReq = pMsg;
SDropVnodeReq dropReq = {0}; SDropVnodeReq dropReq = {0};
if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
......
...@@ -200,12 +200,12 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { ...@@ -200,12 +200,12 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
return 0; return 0;
} }
static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) { static void dmSendRpcRedirectRsp(const SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance(); SDnode *pDnode = dmInstance();
SEpSet epSet = {0}; SEpSet epSet = {0};
dmGetMnodeEpSet(&pDnode->data, &epSet); dmGetMnodeEpSet(&pDnode->data, &epSet);
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse); dDebug("RPC %p, req is redirected, num:%d use:%d", pMsg->info.handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) { if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
...@@ -220,12 +220,14 @@ static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) { ...@@ -220,12 +220,14 @@ static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) {
SRpcMsg rsp = { SRpcMsg rsp = {
.code = TSDB_CODE_RPC_REDIRECT, .code = TSDB_CODE_RPC_REDIRECT,
.info = pReq->info, .info = pMsg->info,
.contLen = len, .contLen = len,
}; };
rsp.pCont = rpcMallocCont(len); rsp.pCont = rpcMallocCont(len);
tSerializeSMEpSet(rsp.pCont, len, &msg); tSerializeSMEpSet(rsp.pCont, len, &msg);
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
rpcFreeCont(pMsg->pCont);
} }
static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
...@@ -239,16 +241,16 @@ static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { ...@@ -239,16 +241,16 @@ static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
} }
} }
static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) { static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
SDnode *pDnode = dmInstance(); SDnode *pDnode = dmInstance();
if (pDnode->status != DND_STAT_RUNNING) { if (pDnode->status != DND_STAT_RUNNING) {
rpcFreeCont(pReq->pCont); rpcFreeCont(pMsg->pCont);
pReq->pCont = NULL; pMsg->pCont = NULL;
terrno = TSDB_CODE_NODE_OFFLINE; terrno = TSDB_CODE_NODE_OFFLINE;
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->info.handle); dError("failed to send rpc msg since %s, handle:%p", terrstr(), pMsg->info.handle);
return -1; return -1;
} else { } else {
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL); rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL);
return 0; return 0;
} }
} }
......
...@@ -624,14 +624,12 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { ...@@ -624,14 +624,12 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info}; SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info};
mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.info.ahandle, cfgReq.config); mDebug("dnode:%d, send config req to dnode, app:%p", cfgReq.dnodeId, rpcMsg.info.ahandle);
tmsgSendReq(&epSet, &rpcMsg); return tmsgSendReq(&epSet, &rpcMsg);
return 0;
} }
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) { static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
mInfo("app:%p config rsp from dnode", pRsp->info.ahandle); mDebug("config rsp from dnode, app:%p", pRsp->info.ahandle);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册