提交 47b9216b 编写于 作者: S Shengliang

refactor: node mgmt

上级 6296188f
...@@ -26,24 +26,25 @@ extern "C" { ...@@ -26,24 +26,25 @@ extern "C" {
typedef struct SBnodeMgmt { typedef struct SBnodeMgmt {
SBnode *pBnode; SBnode *pBnode;
SDnode *pDnode; SMsgCb msgCb;
SMgmtWrapper *pWrapper;
const char *path; const char *path;
const char *name;
int32_t dnodeId;
SMultiWorker writeWorker; SMultiWorker writeWorker;
SSingleWorker monitorWorker; SSingleWorker monitorWorker;
} SBnodeMgmt; } SBnodeMgmt;
// bmHandle.c // bmHandle.c
void bmInitMsgHandle(SMgmtWrapper *pWrapper); SArray *bmGetMsgHandles();
int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SNodeMsg *pReq);
// bmWorker.c // bmWorker.c
int32_t bmStartWorker(SBnodeMgmt *pMgmt); int32_t bmStartWorker(SBnodeMgmt *pMgmt);
void bmStopWorker(SBnodeMgmt *pMgmt); void bmStopWorker(SBnodeMgmt *pMgmt);
int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t bmPutNodeMsgToWriteQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t bmPutNodeMsgToMonitorQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -16,12 +16,12 @@ ...@@ -16,12 +16,12 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "bmInt.h" #include "bmInt.h"
void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo) {} static void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMonBmInfo bmInfo = {0}; SMonBmInfo bmInfo = {0};
bmGetMonitorInfo(pWrapper, &bmInfo); bmGetMonitorInfo(pMgmt, &bmInfo);
dmGetMonitorSysInfo(&bmInfo.sys); dmGetSystemInfo(&bmInfo.sys);
monGetLogs(&bmInfo.log); monGetLogs(&bmInfo.log);
int32_t rspLen = tSerializeSMonBmInfo(NULL, 0, &bmInfo); int32_t rspLen = tSerializeSMonBmInfo(NULL, 0, &bmInfo);
...@@ -43,8 +43,7 @@ int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { ...@@ -43,8 +43,7 @@ int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
return 0; return 0;
} }
int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t bmProcessCreateReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
SDCreateBnodeReq createReq = {0}; SDCreateBnodeReq createReq = {0};
...@@ -53,14 +52,14 @@ int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -53,14 +52,14 @@ int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (pDnode->data.dnodeId != 0 && createReq.dnodeId != pDnode->data.dnodeId) { if (pMgmt->dnodeId != 0 && createReq.dnodeId != pMgmt->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->data.dnodeId); dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pMgmt->dnodeId);
return -1; return -1;
} }
bool deployed = true; bool deployed = true;
if (dmWriteFile(pWrapper, deployed) != 0) { if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) {
dError("failed to write bnode file since %s", terrstr()); dError("failed to write bnode file since %s", terrstr());
return -1; return -1;
} }
...@@ -68,8 +67,7 @@ int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -68,8 +67,7 @@ int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t bmProcessDropReq(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
SDDropBnodeReq dropReq = {0}; SDDropBnodeReq dropReq = {0};
...@@ -78,14 +76,14 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -78,14 +76,14 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (dropReq.dnodeId != pDnode->data.dnodeId) { if (dropReq.dnodeId != pMgmt->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop bnode since %s", terrstr()); dError("failed to drop bnode since %s", terrstr());
return -1; return -1;
} }
bool deployed = false; bool deployed = false;
if (dmWriteFile(pWrapper, deployed) != 0) { if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) {
dError("failed to write bnode file since %s", terrstr()); dError("failed to write bnode file since %s", terrstr());
return -1; return -1;
} }
...@@ -93,6 +91,19 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -93,6 +91,19 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
void bmInitMsgHandle(SMgmtWrapper *pWrapper) { SArray *bmGetMsgHandles() {
dmSetMsgHandle(pWrapper, TDMT_MON_BM_INFO, bmProcessMonitorMsg, 0); int32_t code = -1;
SArray *pArray = taosArrayInit(2, sizeof(SMgmtHandle));
if (pArray == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MON_BM_INFO, bmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
code = 0;
_OVER:
if (code != 0) {
taosArrayDestroy(pArray);
return NULL;
} else {
return pArray;
}
} }
...@@ -16,18 +16,13 @@ ...@@ -16,18 +16,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "bmInt.h" #include "bmInt.h"
static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); } static int32_t bmRequire(const SMgmtInputOpt *pInput, bool *required) {
return dmReadFile(pInput->path, pInput->name, required);
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
pOption->msgCb = msgCb;
} }
static void bmClose(SMgmtWrapper *pWrapper) { static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { pOption->msgCb = pMgmt->msgCb; }
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
static void bmClose(SBnodeMgmt *pMgmt) {
dInfo("bnode-mgmt start to cleanup"); dInfo("bnode-mgmt start to cleanup");
if (pMgmt->pBnode != NULL) { if (pMgmt->pBnode != NULL) {
bmStopWorker(pMgmt); bmStopWorker(pMgmt);
...@@ -35,12 +30,11 @@ static void bmClose(SMgmtWrapper *pWrapper) { ...@@ -35,12 +30,11 @@ static void bmClose(SMgmtWrapper *pWrapper) {
pMgmt->pBnode = NULL; pMgmt->pBnode = NULL;
} }
pWrapper->pMgmt = NULL;
taosMemoryFree(pMgmt); taosMemoryFree(pMgmt);
dInfo("bnode-mgmt is cleaned up"); dInfo("bnode-mgmt is cleaned up");
} }
int32_t bmOpen(SMgmtWrapper *pWrapper) { int32_t bmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
dInfo("bnode-mgmt start to init"); dInfo("bnode-mgmt start to init");
SBnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SBnodeMgmt)); SBnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SBnodeMgmt));
if (pMgmt == NULL) { if (pMgmt == NULL) {
...@@ -48,40 +42,42 @@ int32_t bmOpen(SMgmtWrapper *pWrapper) { ...@@ -48,40 +42,42 @@ int32_t bmOpen(SMgmtWrapper *pWrapper) {
return -1; return -1;
} }
pMgmt->path = pWrapper->path; pMgmt->path = pInput->path;
pMgmt->pDnode = pWrapper->pDnode; pMgmt->name = pInput->name;
pMgmt->pWrapper = pWrapper; pMgmt->dnodeId = pInput->dnodeId;
pWrapper->pMgmt = pMgmt; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.pMgmt = pMgmt;
SBnodeOpt option = {0}; SBnodeOpt option = {0};
bmInitOption(pMgmt, &option); bmInitOption(pMgmt, &option);
pMgmt->pBnode = bndOpen(pMgmt->path, &option); pMgmt->pBnode = bndOpen(pMgmt->path, &option);
if (pMgmt->pBnode == NULL) { if (pMgmt->pBnode == NULL) {
dError("failed to open bnode since %s", terrstr()); dError("failed to open bnode since %s", terrstr());
bmClose(pWrapper); bmClose(pMgmt);
return -1; return -1;
} }
dmReportStartup(pWrapper->pDnode, "bnode-impl", "initialized"); tmsgReportStartup("bnode-impl", "initialized");
if (bmStartWorker(pMgmt) != 0) { if (bmStartWorker(pMgmt) != 0) {
dError("failed to start bnode worker since %s", terrstr()); dError("failed to start bnode worker since %s", terrstr());
bmClose(pWrapper); bmClose(pMgmt);
return -1; return -1;
} }
dmReportStartup(pWrapper->pDnode, "bnode-worker", "initialized"); tmsgReportStartup("bnode-worker", "initialized");
pOutput->pMgmt = pMgmt;
dInfo("bnode-mgmt is initialized");
return 0; return 0;
} }
void bmInitWrapper(SMgmtWrapper *pWrapper) { SMgmtFunc bmGetMgmtFunc() {
SMgmtFp mgmtFp = {0}; SMgmtFunc mgmtFunc = {0};
mgmtFp.openFp = bmOpen; mgmtFunc.openFp = bmOpen;
mgmtFp.closeFp = bmClose; mgmtFunc.closeFp = (NodeCloseFp)bmClose;
mgmtFp.createFp = bmProcessCreateReq; mgmtFunc.createFp = (NodeCreateFp)bmProcessCreateReq;
mgmtFp.dropFp = bmProcessDropReq; mgmtFunc.dropFp = (NodeDropFp)bmProcessDropReq;
mgmtFp.requiredFp = bmRequire; mgmtFunc.requiredFp = bmRequire;
mgmtFunc.getHandlesFp = bmGetMsgHandles;
bmInitMsgHandle(pWrapper); return mgmtFunc;
pWrapper->name = "bnode";
pWrapper->fp = mgmtFp;
} }
...@@ -58,7 +58,7 @@ static void bmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -58,7 +58,7 @@ static void bmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
int32_t code = -1; int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_MON_BM_INFO) { if (pMsg->rpcMsg.msgType == TDMT_MON_BM_INFO) {
code = bmProcessGetMonBmInfoReq(pMgmt->pWrapper, pMsg); code = bmProcessGetMonBmInfoReq(pMgmt, pMsg);
} else { } else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
...@@ -106,8 +106,7 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -106,8 +106,7 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
} }
int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t bmPutNodeMsgToWriteQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
SMultiWorker *pWorker = &pMgmt->writeWorker; SMultiWorker *pWorker = &pMgmt->writeWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
...@@ -115,8 +114,7 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -115,8 +114,7 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t bmPutNodeMsgToMonitorQueue(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker; SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册