提交 e00f74cc 编写于 作者: S Shengliang Guan

refact dm

上级 7e361ebc
......@@ -109,7 +109,7 @@ int32_t bmOpen(SMgmtWrapper *pWrapper) {
return code;
}
void bmGetMgmtFp(SMgmtWrapper *pWrapper) {
void bmSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = bmOpen;
mgmtFp.closeFp = bmClose;
......
......@@ -39,18 +39,25 @@ typedef struct SDnodeMgmt {
SMgmtWrapper *pWrapper;
} SDnodeMgmt;
// dmInt.c
void dmSetMgmtFp(SMgmtWrapper *pWrapper);
void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg);
// dmFile.c
int32_t dmReadFile(SDnodeMgmt *pMgmt);
int32_t dmWriteFile(SDnodeMgmt *pMgmt);
void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps);
// dmMsg.c
// dmHandle.c
void dmInitMsgHandle(SMgmtWrapper *pWrapper);
void dmSendStatusReq(SDnodeMgmt *pMgmt);
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessCDnodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
// dmWorker.c
int32_t dmStartThread(SDnodeMgmt *pMgmt);
......
......@@ -118,7 +118,7 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
static int32_t dndProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper != NULL) {
dndReleaseWrapper(pWrapper);
......@@ -146,7 +146,7 @@ static int32_t dndProcessCreateNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg
return code;
}
static int32_t dndProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
static int32_t dmProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper == NULL) {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
......@@ -171,24 +171,24 @@ static int32_t dndProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *
return code;
}
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
int32_t dmProcessCDnodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
switch (pMsg->rpcMsg.msgType) {
case TDMT_DND_CREATE_MNODE:
return dndProcessCreateNodeMsg(pDnode, MNODE, pMsg);
return dmProcessCreateNodeMsg(pDnode, MNODE, pMsg);
case TDMT_DND_DROP_MNODE:
return dndProcessDropNodeMsg(pDnode, MNODE, pMsg);
return dmProcessDropNodeMsg(pDnode, MNODE, pMsg);
case TDMT_DND_CREATE_QNODE:
return dndProcessCreateNodeMsg(pDnode, QNODE, pMsg);
return dmProcessCreateNodeMsg(pDnode, QNODE, pMsg);
case TDMT_DND_DROP_QNODE:
return dndProcessDropNodeMsg(pDnode, QNODE, pMsg);
return dmProcessDropNodeMsg(pDnode, QNODE, pMsg);
case TDMT_DND_CREATE_SNODE:
return dndProcessCreateNodeMsg(pDnode, SNODE, pMsg);
return dmProcessCreateNodeMsg(pDnode, SNODE, pMsg);
case TDMT_DND_DROP_SNODE:
return dndProcessDropNodeMsg(pDnode, SNODE, pMsg);
return dmProcessDropNodeMsg(pDnode, SNODE, pMsg);
case TDMT_DND_CREATE_BNODE:
return dndProcessCreateNodeMsg(pDnode, BNODE, pMsg);
return dmProcessCreateNodeMsg(pDnode, BNODE, pMsg);
case TDMT_DND_DROP_BNODE:
return dndProcessDropNodeMsg(pDnode, BNODE, pMsg);
return dmProcessDropNodeMsg(pDnode, BNODE, pMsg);
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1;
......
......@@ -78,7 +78,7 @@ static int32_t dmStart(SMgmtWrapper *pWrapper) {
return dmStartThread(pWrapper->pMgmt);
}
int32_t dmInit(SMgmtWrapper *pWrapper) {
static int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SDnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeMgmt));
dInfo("dnode-mgmt start to init");
......@@ -124,7 +124,7 @@ int32_t dmInit(SMgmtWrapper *pWrapper) {
return 0;
}
void dmCleanup(SMgmtWrapper *pWrapper) {
static void dmCleanup(SMgmtWrapper *pWrapper) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
......@@ -153,12 +153,12 @@ void dmCleanup(SMgmtWrapper *pWrapper) {
dInfo("dnode-mgmt is cleaned up");
}
int32_t dmRequire(SMgmtWrapper *pWrapper, bool *required) {
static int32_t dmRequire(SMgmtWrapper *pWrapper, bool *required) {
*required = true;
return 0;
}
void dmGetMgmtFp(SMgmtWrapper *pWrapper) {
void dmSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = dmInit;
mgmtFp.closeFp = dmCleanup;
......
......@@ -66,16 +66,6 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed in dnode queue", pMsg);
switch (pRpc->msgType) {
case TDMT_DND_CREATE_MNODE:
case TDMT_DND_CREATE_QNODE:
case TDMT_DND_CREATE_SNODE:
case TDMT_DND_CREATE_BNODE:
case TDMT_DND_DROP_MNODE:
case TDMT_DND_DROP_QNODE:
case TDMT_DND_DROP_SNODE:
case TDMT_DND_DROP_BNODE:
code = dndProcessNodeMsg(pMgmt->pDnode, pMsg);
break;
case TDMT_DND_CONFIG_DNODE:
code = dmProcessConfigReq(pMgmt, pMsg);
break;
......@@ -89,8 +79,8 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
code = dmProcessGrantRsp(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
dError("msg:%p, type:%s not processed in dnode queue", pRpc->handle, TMSG_INFO(pRpc->msgType));
code = dmProcessCDnodeMsg(pMgmt->pDnode, pMsg);
break;
}
if (pRpc->msgType & 1u) {
......
......@@ -27,13 +27,13 @@
#include "tlockfree.h"
#include "tlog.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "tprocess.h"
#include "tqueue.h"
#include "trpc.h"
#include "tthread.h"
#include "ttime.h"
#include "tworker.h"
#include "tmsgcb.h"
#include "dnode.h"
#include "monitor.h"
......@@ -42,12 +42,42 @@
extern "C" {
#endif
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
#define dFatal(...) \
{ \
if (dDebugFlag & DEBUG_FATAL) { \
taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
} \
}
#define dError(...) \
{ \
if (dDebugFlag & DEBUG_ERROR) { \
taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
} \
}
#define dWarn(...) \
{ \
if (dDebugFlag & DEBUG_WARN) { \
taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \
}
#define dInfo(...) \
{ \
if (dDebugFlag & DEBUG_INFO) { \
taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); \
} \
}
#define dDebug(...) \
{ \
if (dDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); \
} \
}
#define dTrace(...) \
{ \
if (dDebugFlag & DEBUG_TRACE) { \
taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); \
} \
}
typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType;
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
......@@ -73,7 +103,7 @@ typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required);
typedef struct SMsgHandle {
SMgmtWrapper *pQndWrapper;
SMgmtWrapper *pQndWrapper;
SMgmtWrapper *pMndWrapper;
SMgmtWrapper *pWrapper;
} SMsgHandle;
......@@ -146,14 +176,11 @@ void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
// dndMonitor.h
void dndSendMonitorReport(SDnode *pDnode);
// dndMsg.h
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
// dndStr.h
const char *dndStatStr(EDndStatus stat);
const char *dndNodeLogStr(ENodeType ntype);
......@@ -168,12 +195,12 @@ SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper);
int32_t dndInitMsgHandle(SDnode *pDnode);
// mgmt
void dmGetMgmtFp(SMgmtWrapper *pWrapper);
void bmGetMgmtFp(SMgmtWrapper *pWrapper);
void qmGetMgmtFp(SMgmtWrapper *pMgmt);
void smGetMgmtFp(SMgmtWrapper *pWrapper);
void vmGetMgmtFp(SMgmtWrapper *pWrapper);
void mmGetMgmtFp(SMgmtWrapper *pMgmt);
void dmSetMgmtFp(SMgmtWrapper *pWrapper);
void bmSetMgmtFp(SMgmtWrapper *pWrapper);
void qmSetMgmtFp(SMgmtWrapper *pMgmt);
void smSetMgmtFp(SMgmtWrapper *pWrapper);
void vmSetMgmtFp(SMgmtWrapper *pWrapper);
void mmSetMgmtFp(SMgmtWrapper *pMgmt);
void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
......@@ -194,8 +221,8 @@ void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads);
int32_t vmMonitorTfsInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo);
void vmMonitorVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo);
int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo);
SMonGrantInfo *pGrantInfo);
#ifdef __cplusplus
}
#endif
......
......@@ -82,12 +82,12 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
}
dndSetStatus(pDnode, DND_STAT_INIT);
dmGetMgmtFp(&pDnode->wrappers[DNODE]);
mmGetMgmtFp(&pDnode->wrappers[MNODE]);
vmGetMgmtFp(&pDnode->wrappers[VNODES]);
qmGetMgmtFp(&pDnode->wrappers[QNODE]);
smGetMgmtFp(&pDnode->wrappers[SNODE]);
bmGetMgmtFp(&pDnode->wrappers[BNODE]);
dmSetMgmtFp(&pDnode->wrappers[DNODE]);
mmSetMgmtFp(&pDnode->wrappers[MNODE]);
vmSetMgmtFp(&pDnode->wrappers[VNODES]);
qmSetMgmtFp(&pDnode->wrappers[QNODE]);
smSetMgmtFp(&pDnode->wrappers[SNODE]);
bmSetMgmtFp(&pDnode->wrappers[BNODE]);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
......
......@@ -227,7 +227,7 @@ static int32_t mmStart(SMgmtWrapper *pWrapper) {
return mndStart(pMgmt->pMnode);
}
void mmGetMgmtFp(SMgmtWrapper *pWrapper) {
void mmSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = mmOpen;
mgmtFp.closeFp = mmClose;
......
......@@ -112,7 +112,7 @@ int32_t qmOpen(SMgmtWrapper *pWrapper) {
return code;
}
void qmGetMgmtFp(SMgmtWrapper *pWrapper) {
void qmSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = qmOpen;
mgmtFp.closeFp = qmClose;
......
......@@ -109,7 +109,7 @@ int32_t smOpen(SMgmtWrapper *pWrapper) {
return code;
}
void smGetMgmtFp(SMgmtWrapper *pWrapper) {
void smSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = smOpen;
mgmtFp.closeFp = smClose;
......
......@@ -333,7 +333,7 @@ static int32_t vmRequire(SMgmtWrapper *pWrapper, bool *required) {
return 0;
}
void vmGetMgmtFp(SMgmtWrapper *pWrapper) {
void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
SMgmtFp mgmtFp = {0};
mgmtFp.openFp = vmInit;
mgmtFp.closeFp = vmCleanup;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册