提交 6296188f 编写于 作者: S Shengliang

refactor: node mgmt

上级 ace5940c
...@@ -37,8 +37,8 @@ typedef enum { ...@@ -37,8 +37,8 @@ typedef enum {
QUEUE_MAX, QUEUE_MAX,
} EQueueType; } EQueueType;
typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*PutToQueueFp)(void *pMgmt, SRpcMsg* pReq);
typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype); typedef int32_t (*GetQueueSizeFp)(void *pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp);
...@@ -49,6 +49,8 @@ typedef void (*ReportStartup)(SMgmtWrapper* pWrapper, const char* name, const ch ...@@ -49,6 +49,8 @@ typedef void (*ReportStartup)(SMgmtWrapper* pWrapper, const char* name, const ch
typedef struct { typedef struct {
SMgmtWrapper* pWrapper; SMgmtWrapper* pWrapper;
void* pMgmt;
void* clientRpc;
PutToQueueFp queueFps[QUEUE_MAX]; PutToQueueFp queueFps[QUEUE_MAX];
GetQueueSizeFp qsizeFp; GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp; SendReqFp sendReqFp;
...@@ -57,7 +59,6 @@ typedef struct { ...@@ -57,7 +59,6 @@ typedef struct {
RegisterBrokenLinkArgFp registerBrokenLinkArgFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp; ReleaseHandleFp releaseHandleFp;
ReportStartup reportStartupFp; ReportStartup reportStartupFp;
void* clientRpc;
} SMsgCb; } SMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
......
...@@ -24,7 +24,7 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; } ...@@ -24,7 +24,7 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
PutToQueueFp fp = pMsgCb->queueFps[qtype]; PutToQueueFp fp = pMsgCb->queueFps[qtype];
if (fp != NULL) { if (fp != NULL) {
return (*fp)(pMsgCb->pWrapper, pReq); return (*fp)(pMsgCb->pMgmt, pReq);
} else { } else {
terrno = TSDB_CODE_INVALID_PTR; terrno = TSDB_CODE_INVALID_PTR;
return -1; return -1;
...@@ -34,7 +34,7 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { ...@@ -34,7 +34,7 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
GetQueueSizeFp fp = pMsgCb->qsizeFp; GetQueueSizeFp fp = pMsgCb->qsizeFp;
if (fp != NULL) { if (fp != NULL) {
return (*fp)(pMsgCb->pWrapper, vgId, qtype); return (*fp)(pMsgCb->pMgmt, vgId, qtype);
} else { } else {
terrno = TSDB_CODE_INVALID_PTR; terrno = TSDB_CODE_INVALID_PTR;
return -1; return -1;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
#define _TD_DND_MNODE_INT_H_ #define _TD_DND_MNODE_INT_H_
#include "dmInt.h" #include "dmInt.h"
#include "mnode.h" #include "mnode.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -26,9 +25,10 @@ extern "C" { ...@@ -26,9 +25,10 @@ extern "C" {
typedef struct SMnodeMgmt { typedef struct SMnodeMgmt {
SMnode *pMnode; SMnode *pMnode;
SDnode *pDnode; SMsgCb msgCb;
SMgmtWrapper *pWrapper;
const char *path; const char *path;
const char *name;
int32_t dnodeId;
SSingleWorker queryWorker; SSingleWorker queryWorker;
SSingleWorker readWorker; SSingleWorker readWorker;
SSingleWorker writeWorker; SSingleWorker writeWorker;
...@@ -41,33 +41,31 @@ typedef struct SMnodeMgmt { ...@@ -41,33 +41,31 @@ typedef struct SMnodeMgmt {
// mmFile.c // mmFile.c
int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed); int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
int32_t mmWriteFile(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq, bool deployed); int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed);
// mmInt.c // mmInt.c
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq); int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq);
// mmHandle.c // mmHandle.c
void mmInitMsgHandle(SMgmtWrapper *pWrapper); SArray *mmGetMsgHandles();
int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t mmProcessGetMnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq);
void mmGetMnodeLoads(SMgmtWrapper *pWrapper, SMonMloadInfo *pInfo);
// mmWorker.c // mmWorker.c
int32_t mmStartWorker(SMnodeMgmt *pMgmt); int32_t mmStartWorker(SMnodeMgmt *pMgmt);
void mmStopWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt);
int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc);
int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -28,7 +28,6 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { ...@@ -28,7 +28,6 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
pFile = taosOpenFile(file, TD_FILE_READ); pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) { if (pFile == NULL) {
// dDebug("file %s not exist", file);
code = 0; code = 0;
goto _OVER; goto _OVER;
} }
...@@ -105,11 +104,11 @@ _OVER: ...@@ -105,11 +104,11 @@ _OVER:
return code; return code;
} }
int32_t mmWriteFile(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq, bool deployed) { int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) {
char file[PATH_MAX] = {0}; char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%smnode.json.bak", pWrapper->path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%smnode.json", pWrapper->path, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
...@@ -125,7 +124,6 @@ int32_t mmWriteFile(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq, bool deploye ...@@ -125,7 +124,6 @@ int32_t mmWriteFile(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq, bool deploye
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pReq != NULL || pMgmt != NULL) { if (pReq != NULL || pMgmt != NULL) {
int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica); int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica);
for (int32_t i = 0; i < replica; ++i) { for (int32_t i = 0; i < replica; ++i) {
......
...@@ -16,15 +16,14 @@ ...@@ -16,15 +16,14 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmInt.h" #include "mmInt.h"
void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo) { static void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *mmInfo) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant); mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant);
} }
int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMonMmInfo mmInfo = {0}; SMonMmInfo mmInfo = {0};
mmGetMonitorInfo(pWrapper, &mmInfo); mmGetMonitorInfo(pMgmt, &mmInfo);
dmGetMonitorSysInfo(&mmInfo.sys); dmGetSystemInfo(&mmInfo.sys);
monGetLogs(&mmInfo.log); monGetLogs(&mmInfo.log);
int32_t rspLen = tSerializeSMonMmInfo(NULL, 0, &mmInfo); int32_t rspLen = tSerializeSMonMmInfo(NULL, 0, &mmInfo);
...@@ -46,15 +45,14 @@ int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { ...@@ -46,15 +45,14 @@ int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
return 0; return 0;
} }
void mmGetMnodeLoads(SMgmtWrapper *pWrapper, SMonMloadInfo *pInfo) { static void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
pInfo->isMnode = 1; pInfo->isMnode = 1;
mndGetLoad(pMgmt->pMnode, &pInfo->load); mndGetLoad(pMgmt->pMnode, &pInfo->load);
} }
int32_t mmProcessGetMnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMonMloadInfo mloads = {0}; SMonMloadInfo mloads = {0};
mmGetMnodeLoads(pWrapper, &mloads); mmGetMnodeLoads(pMgmt, &mloads);
int32_t rspLen = tSerializeSMonMloadInfo(NULL, 0, &mloads); int32_t rspLen = tSerializeSMonMloadInfo(NULL, 0, &mloads);
if (rspLen < 0) { if (rspLen < 0) {
...@@ -74,8 +72,7 @@ int32_t mmProcessGetMnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { ...@@ -74,8 +72,7 @@ int32_t mmProcessGetMnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
return 0; return 0;
} }
int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessCreateReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
SDCreateMnodeReq createReq = {0}; SDCreateMnodeReq createReq = {0};
...@@ -84,14 +81,14 @@ int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -84,14 +81,14 @@ int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (createReq.replica <= 1 || createReq.dnodeId != pDnode->data.dnodeId) { if (createReq.replica <= 1 || createReq.dnodeId != pMgmt->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr()); dError("failed to create mnode since %s", terrstr());
return -1; return -1;
} }
bool deployed = true; bool deployed = true;
if (mmWriteFile(pWrapper, &createReq, deployed) != 0) { if (mmWriteFile(pMgmt, &createReq, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr()); dError("failed to write mnode file since %s", terrstr());
return -1; return -1;
} }
...@@ -99,8 +96,7 @@ int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -99,8 +96,7 @@ int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessDropReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
SDDropMnodeReq dropReq = {0}; SDDropMnodeReq dropReq = {0};
...@@ -109,14 +105,14 @@ int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -109,14 +105,14 @@ int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (dropReq.dnodeId != pDnode->data.dnodeId) { if (dropReq.dnodeId != pMgmt->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop mnode since %s", terrstr()); dError("failed to drop mnode since %s", terrstr());
return -1; return -1;
} }
bool deployed = false; bool deployed = false;
if (mmWriteFile(pWrapper, NULL, deployed) != 0) { if (mmWriteFile(pMgmt, NULL, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr()); dError("failed to write mnode file since %s", terrstr());
return -1; return -1;
} }
...@@ -125,7 +121,6 @@ int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -125,7 +121,6 @@ int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pMgmt->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
SDAlterMnodeReq alterReq = {0}; SDAlterMnodeReq alterReq = {0};
...@@ -134,104 +129,118 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -134,104 +129,118 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (pDnode->data.dnodeId != 0 && alterReq.dnodeId != pDnode->data.dnodeId) { if (pMgmt->dnodeId != 0 && alterReq.dnodeId != pMgmt->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pDnode->data.dnodeId); dError("failed to alter mnode since %s, input:%d cur:%d", terrstr(), alterReq.dnodeId, pMgmt->dnodeId);
return -1; return -1;
} else { } else {
return mmAlter(pMgmt, &alterReq); return mmAlter(pMgmt, &alterReq);
} }
} }
void mmInitMsgHandle(SMgmtWrapper *pWrapper) { SArray *mmGetMsgHandles() {
dmSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, mmProcessMonitorMsg, 0); int32_t code = -1;
dmSetMsgHandle(pWrapper, TDMT_MON_MM_LOAD, mmProcessMonitorMsg, 0); SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle));
if (pArray == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
// Requests handled by DNODE // Requests handled by DNODE
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_SNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
// Requests handled by MNODE // Requests handled by MNODE
dmSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_USER, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_QNODE_LIST, mmProcessReadMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_QNODE_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_BNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_BNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_MQ_ASK_EP, mmProcessReadMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_MND_GET_INDEX, mmProcessReadMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
// Requests handled by VNODE // Requests handled by VNODE
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, mmProcessQueryMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, mmProcessQueryMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_VNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE_RSP, mmProcessWriteMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
code = 0;
_OVER:
if (code != 0) {
taosArrayDestroy(pArray);
return NULL;
} else {
return pArray;
}
} }
...@@ -17,45 +17,35 @@ ...@@ -17,45 +17,35 @@
#include "mmInt.h" #include "mmInt.h"
#include "wal.h" #include "wal.h"
static bool mmDeployRequired(SDnode *pDnode) { static bool mmDeployRequired(const SMgmtInputOpt *pInput) {
if (pDnode->data.dnodeId > 0) return false; if (pInput->dnodeId > 0) return false;
if (pDnode->data.clusterId > 0) return false; if (pInput->clusterId > 0) return false;
if (strcmp(pDnode->data.localEp, pDnode->data.firstEp) != 0) return false; if (strcmp(pInput->localEp, pInput->firstEp) != 0) return false;
return true; return true;
} }
static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) { static int32_t mmRequire(const SMgmtInputOpt *pInput, bool *required) {
SMnodeMgmt mgmt = {0}; SMnodeMgmt mgmt = {0};
mgmt.path = pWrapper->path; mgmt.path = pInput->path;
if (mmReadFile(&mgmt, required) != 0) { if (mmReadFile(&mgmt, required) != 0) {
return -1; return -1;
} }
if (!(*required)) { if (!(*required)) {
*required = mmDeployRequired(pWrapper->pDnode); *required = mmDeployRequired(pInput);
} }
return 0; return 0;
} }
static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInput, SMnodeOpt *pOption) {
SMsgCb msgCb = pMgmt->pDnode->data.msgCb; pOption->msgCb = pMgmt->msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue;
msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue;
msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue;
msgCb.queueFps[SYNC_QUEUE] = mmPutMsgToWriteQueue;
pOption->msgCb = msgCb;
}
static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
mmInitOption(pMgmt, pOption);
pOption->replica = 1; pOption->replica = 1;
pOption->selfIndex = 0; pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0]; SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1; pReplica->id = 1;
pReplica->port = pMgmt->pDnode->data.serverPort; pReplica->port = pInput->serverPort;
tstrncpy(pReplica->fqdn, pMgmt->pDnode->data.localFqdn, TSDB_FQDN_LEN); tstrncpy(pReplica->fqdn, pInput->localFqdn, TSDB_FQDN_LEN);
pOption->deploy = true; pOption->deploy = true;
pMgmt->selfIndex = pOption->selfIndex; pMgmt->selfIndex = pOption->selfIndex;
...@@ -64,7 +54,7 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { ...@@ -64,7 +54,7 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
} }
static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
mmInitOption(pMgmt, pOption); pOption->msgCb = pMgmt->msgCb;
pOption->selfIndex = pMgmt->selfIndex; pOption->selfIndex = pMgmt->selfIndex;
pOption->replica = pMgmt->replica; pOption->replica = pMgmt->replica;
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
...@@ -72,8 +62,7 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { ...@@ -72,8 +62,7 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
} }
static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
mmInitOption(pMgmt, pOption); pOption->msgCb = pMgmt->msgCb;
pOption->replica = pCreate->replica; pOption->replica = pCreate->replica;
pOption->selfIndex = -1; pOption->selfIndex = -1;
for (int32_t i = 0; i < pCreate->replica; ++i) { for (int32_t i = 0; i < pCreate->replica; ++i) {
...@@ -81,7 +70,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre ...@@ -81,7 +70,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
pReplica->id = pCreate->replicas[i].id; pReplica->id = pCreate->replicas[i].id;
pReplica->port = pCreate->replicas[i].port; pReplica->port = pCreate->replicas[i].port;
memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pMgmt->pDnode->data.dnodeId) { if (pReplica->id == pMgmt->dnodeId) {
pOption->selfIndex = i; pOption->selfIndex = i;
} }
} }
...@@ -109,7 +98,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) { ...@@ -109,7 +98,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
} }
bool deployed = true; bool deployed = true;
if (mmWriteFile(pMgmt->pWrapper, pReq, deployed) != 0) { if (mmWriteFile(pMgmt, pReq, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr()); dError("failed to write mnode file since %s", terrstr());
return -1; return -1;
} }
...@@ -117,10 +106,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) { ...@@ -117,10 +106,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
return 0; return 0;
} }
static void mmClose(SMgmtWrapper *pWrapper) { static void mmClose(SMnodeMgmt *pMgmt) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("mnode-mgmt start to cleanup"); dInfo("mnode-mgmt start to cleanup");
if (pMgmt->pMnode != NULL) { if (pMgmt->pMnode != NULL) {
mmStopWorker(pMgmt); mmStopWorker(pMgmt);
...@@ -128,12 +114,11 @@ static void mmClose(SMgmtWrapper *pWrapper) { ...@@ -128,12 +114,11 @@ static void mmClose(SMgmtWrapper *pWrapper) {
pMgmt->pMnode = NULL; pMgmt->pMnode = NULL;
} }
pWrapper->pMgmt = NULL;
taosMemoryFree(pMgmt); taosMemoryFree(pMgmt);
dInfo("mnode-mgmt is cleaned up"); dInfo("mnode-mgmt is cleaned up");
} }
static int32_t mmOpen(SMgmtWrapper *pWrapper) { static int32_t mmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
dInfo("mnode-mgmt start to init"); dInfo("mnode-mgmt start to init");
if (walInit() != 0) { if (walInit() != 0) {
dError("failed to init wal since %s", terrstr()); dError("failed to init wal since %s", terrstr());
...@@ -146,23 +131,28 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) { ...@@ -146,23 +131,28 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) {
return -1; return -1;
} }
pMgmt->path = pWrapper->path; pMgmt->path = pInput->path;
pMgmt->pDnode = pWrapper->pDnode; pMgmt->name = pInput->name;
pMgmt->pWrapper = pWrapper; pMgmt->dnodeId = pInput->dnodeId;
pWrapper->pMgmt = pMgmt; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue;
pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue;
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.pMgmt = pMgmt;
bool deployed = false; bool deployed = false;
if (mmReadFile(pMgmt, &deployed) != 0) { if (mmReadFile(pMgmt, &deployed) != 0) {
dError("failed to read file since %s", terrstr()); dError("failed to read file since %s", terrstr());
mmClose(pWrapper); mmClose(pMgmt);
return -1; return -1;
} }
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if (!deployed) { if (!deployed) {
dInfo("mnode start to deploy"); dInfo("mnode start to deploy");
pWrapper->pDnode->data.dnodeId = 1; pMgmt->dnodeId = 1;
mmBuildOptionForDeploy(pMgmt, &option); mmBuildOptionForDeploy(pMgmt, pInput, &option);
} else { } else {
dInfo("mnode start to open"); dInfo("mnode start to open");
mmBuildOptionForOpen(pMgmt, &option); mmBuildOptionForOpen(pMgmt, &option);
...@@ -171,55 +161,52 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) { ...@@ -171,55 +161,52 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) {
pMgmt->pMnode = mndOpen(pMgmt->path, &option); pMgmt->pMnode = mndOpen(pMgmt->path, &option);
if (pMgmt->pMnode == NULL) { if (pMgmt->pMnode == NULL) {
dError("failed to open mnode since %s", terrstr()); dError("failed to open mnode since %s", terrstr());
mmClose(pWrapper); mmClose(pMgmt);
return -1; return -1;
} }
dmReportStartup(pWrapper->pDnode, "mnode-impl", "initialized"); tmsgReportStartup("mnode-impl", "initialized");
if (mmStartWorker(pMgmt) != 0) { if (mmStartWorker(pMgmt) != 0) {
dError("failed to start mnode worker since %s", terrstr()); dError("failed to start mnode worker since %s", terrstr());
mmClose(pWrapper); mmClose(pMgmt);
return -1; return -1;
} }
dmReportStartup(pWrapper->pDnode, "mnode-worker", "initialized"); tmsgReportStartup("mnode-worker", "initialized");
if (!deployed) { if (!deployed) {
deployed = true; deployed = true;
if (mmWriteFile(pWrapper, NULL, deployed) != 0) { if (mmWriteFile(pMgmt, NULL, deployed) != 0) {
dError("failed to write mnode file since %s", terrstr()); dError("failed to write mnode file since %s", terrstr());
return -1; return -1;
} }
} }
pOutput->dnodeId = pMgmt->dnodeId;
pOutput->pMgmt = pMgmt;
dInfo("mnode-mgmt is initialized"); dInfo("mnode-mgmt is initialized");
return 0; return 0;
} }
static int32_t mmStart(SMgmtWrapper *pWrapper) { static int32_t mmStart(SMnodeMgmt *pMgmt) {
dDebug("mnode-mgmt start to run"); dDebug("mnode-mgmt start to run");
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mndStart(pMgmt->pMnode); return mndStart(pMgmt->pMnode);
} }
static void mmStop(SMgmtWrapper *pWrapper) { static void mmStop(SMnodeMgmt *pMgmt) {
dDebug("mnode-mgmt start to stop"); dDebug("mnode-mgmt start to stop");
SMnodeMgmt *pMgmt = pWrapper->pMgmt; mndStop(pMgmt->pMnode);
if (pMgmt != NULL) {
mndStop(pMgmt->pMnode);
}
} }
void mmInitWrapper(SMgmtWrapper *pWrapper) { SMgmtFunc mmGetMgmtFunc() {
SMgmtFp mgmtFp = {0}; SMgmtFunc mgmtFunc = {0};
mgmtFp.openFp = mmOpen; mgmtFunc.openFp = mmOpen;
mgmtFp.closeFp = mmClose; mgmtFunc.closeFp = (NodeCloseFp)mmClose;
mgmtFp.startFp = mmStart; mgmtFunc.startFp = (NodeStartFp)mmStart;
mgmtFp.stopFp = mmStop; mgmtFunc.stopFp = (NodeStopFp)mmStop;
mgmtFp.createFp = mmProcessCreateReq; mgmtFunc.createFp = (NodeCreateFp)mmProcessCreateReq;
mgmtFp.dropFp = mmProcessDropReq; mgmtFunc.dropFp = (NodeDropFp)mmProcessDropReq;
mgmtFp.requiredFp = mmRequire; mgmtFunc.requiredFp = mmRequire;
mgmtFunc.getHandlesFp = mmGetMsgHandles;
mmInitMsgHandle(pWrapper);
pWrapper->name = "mnode"; return mgmtFunc;
pWrapper->fp = mgmtFp;
} }
...@@ -30,9 +30,9 @@ static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) { ...@@ -30,9 +30,9 @@ static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) {
static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle; SMnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
int32_t code = -1; tmsg_t msgType = pMsg->rpcMsg.msgType;
tmsg_t msgType = pMsg->rpcMsg.msgType; bool isRequest = msgType & 1U;
dTrace("msg:%p, get from mnode queue", pMsg); dTrace("msg:%p, get from mnode queue", pMsg);
switch (msgType) { switch (msgType) {
...@@ -40,17 +40,17 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -40,17 +40,17 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
code = mmProcessAlterReq(pMgmt, pMsg); code = mmProcessAlterReq(pMgmt, pMsg);
break; break;
case TDMT_MON_MM_INFO: case TDMT_MON_MM_INFO:
code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg); code = mmProcessGetMonitorInfoReq(pMgmt, pMsg);
break; break;
case TDMT_MON_MM_LOAD: case TDMT_MON_MM_LOAD:
code = mmProcessGetMnodeLoadsReq(pMgmt->pWrapper, pMsg); code = mmProcessGetLoadsReq(pMgmt, pMsg);
break; break;
default: default:
pMsg->pNode = pMgmt->pMnode; pMsg->pNode = pMgmt->pMnode;
code = mndProcessMsg(pMsg); code = mndProcessMsg(pMsg);
} }
if (msgType & 1U) { if (isRequest) {
if (pMsg->rpcMsg.handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (pMsg->rpcMsg.handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
mmSendRsp(pMsg, code); mmSendRsp(pMsg, code);
...@@ -64,62 +64,46 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -64,62 +64,46 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle; SMnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
bool isRequest = msgType & 1U;
dTrace("msg:%p, get from mnode-query queue", pMsg); dTrace("msg:%p, get from mnode-query queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
pMsg->pNode = pMgmt->pMnode; pMsg->pNode = pMgmt->pMnode;
code = mndProcessMsg(pMsg); code = mndProcessMsg(pMsg);
if (pRpc->msgType & 1U) { if (isRequest) {
if (pRpc->handle != NULL && code != 0) { if (pMsg->rpcMsg.handle != NULL && code != 0) {
dError("msg:%p, failed to process since %s", pMsg, terrstr()); if (code != 0 && terrno != 0) code = terrno;
mmSendRsp(pMsg, code); mmSendRsp(pMsg, code);
} }
} }
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pRpc->pCont); rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void mmPutNodeMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->rpcMsg.msgType));
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
}
int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg);
return 0; return 0;
} }
int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); }
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg);
return 0;
}
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); }
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg);
return 0;
}
int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); }
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
return 0;
} }
int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; return mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg);
mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg);
return 0;
} }
static int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) { static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
...@@ -129,25 +113,19 @@ static int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) { ...@@ -129,25 +113,19 @@ static int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) {
return 0; return 0;
} }
int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutRpcMsgToWorker(&pMgmt->queryWorker, pRpc); return mmPutRpcMsgToWorker(&pMgmt->queryWorker, pRpc);
} }
int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutRpcMsgToWorker(&pMgmt->writeWorker, pRpc); return mmPutRpcMsgToWorker(&pMgmt->writeWorker, pRpc);
} }
int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutRpcMsgToWorker(&pMgmt->readWorker, pRpc); return mmPutRpcMsgToWorker(&pMgmt->readWorker, pRpc);
} }
int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pRpc); }
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pRpc);
}
int32_t mmStartWorker(SMnodeMgmt *pMgmt) { int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg qCfg = { SSingleWorkerCfg qCfg = {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册