提交 628a5bba 编写于 作者: S Shengliang

refactor: node mgmt

上级 a27bffd6
...@@ -26,9 +26,10 @@ extern "C" { ...@@ -26,9 +26,10 @@ extern "C" {
typedef struct SSnodeMgmt { typedef struct SSnodeMgmt {
SSnode *pSnode; SSnode *pSnode;
SDnode *pDnode; SMsgCb msgCb;
SMgmtWrapper *pWrapper;
const char *path; const char *path;
const char *name;
int32_t dnodeId;
SRWLatch latch; SRWLatch latch;
int8_t uniqueWorkerInUse; int8_t uniqueWorkerInUse;
SArray *uniqueWorkers; // SArray<SMultiWorker*> SArray *uniqueWorkers; // SArray<SMultiWorker*>
...@@ -37,19 +38,19 @@ typedef struct SSnodeMgmt { ...@@ -37,19 +38,19 @@ typedef struct SSnodeMgmt {
} SSnodeMgmt; } SSnodeMgmt;
// smHandle.c // smHandle.c
void smInitMsgHandle(SMgmtWrapper *pWrapper); SArray *smGetMsgHandles();
int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq);
// smWorker.c // smWorker.c
int32_t smStartWorker(SSnodeMgmt *pMgmt); int32_t smStartWorker(SSnodeMgmt *pMgmt);
void smStopWorker(SSnodeMgmt *pMgmt); void smStopWorker(SSnodeMgmt *pMgmt);
int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessExecMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -16,12 +16,12 @@ ...@@ -16,12 +16,12 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "smInt.h" #include "smInt.h"
void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo) {} static void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMonSmInfo smInfo = {0}; SMonSmInfo smInfo = {0};
smGetMonitorInfo(pWrapper, &smInfo); smGetMonitorInfo(pMgmt, &smInfo);
dmGetMonitorSysInfo(&smInfo.sys); dmGetSystemInfo(&smInfo.sys);
monGetLogs(&smInfo.log); monGetLogs(&smInfo.log);
int32_t rspLen = tSerializeSMonSmInfo(NULL, 0, &smInfo); int32_t rspLen = tSerializeSMonSmInfo(NULL, 0, &smInfo);
...@@ -43,8 +43,7 @@ int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { ...@@ -43,8 +43,7 @@ int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
return 0; return 0;
} }
int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
SDCreateSnodeReq createReq = {0}; SDCreateSnodeReq createReq = {0};
...@@ -53,14 +52,14 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -53,14 +52,14 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (createReq.dnodeId != pDnode->data.dnodeId) { if (createReq.dnodeId != pMgmt->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create snode since %s", terrstr()); dError("failed to create snode since %s", terrstr());
return -1; return -1;
} }
bool deployed = true; bool deployed = true;
if (dmWriteFile(pWrapper, deployed) != 0) { if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) {
dError("failed to write snode file since %s", terrstr()); dError("failed to write snode file since %s", terrstr());
return -1; return -1;
} }
...@@ -68,8 +67,7 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -68,8 +67,7 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
SDDropSnodeReq dropReq = {0}; SDDropSnodeReq dropReq = {0};
...@@ -78,14 +76,14 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -78,14 +76,14 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (dropReq.dnodeId != pDnode->data.dnodeId) { if (dropReq.dnodeId != pMgmt->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop snode since %s", terrstr()); dError("failed to drop snode since %s", terrstr());
return -1; return -1;
} }
bool deployed = false; bool deployed = false;
if (dmWriteFile(pWrapper, deployed) != 0) { if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) {
dError("failed to write snode file since %s", terrstr()); dError("failed to write snode file since %s", terrstr());
return -1; return -1;
} }
...@@ -93,10 +91,23 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -93,10 +91,23 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
void smInitMsgHandle(SMgmtWrapper *pWrapper) { SArray *smGetMsgHandles() {
dmSetMsgHandle(pWrapper, TDMT_MON_SM_INFO, smProcessMonitorMsg, 0); int32_t code = -1;
SArray *pArray = taosArrayInit(4, sizeof(SMgmtHandle));
if (pArray == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
// Requests handled by SNODE // Requests handled by SNODE
dmSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, 0); if (dmSetMgmtHandle(pArray, TDMT_SND_TASK_EXEC, smPutNodeMsgToExecQueue, 0) == NULL) goto _OVER;
code = 0;
_OVER:
if (code != 0) {
taosArrayDestroy(pArray);
return NULL;
} else {
return pArray;
}
} }
...@@ -17,34 +17,25 @@ ...@@ -17,34 +17,25 @@
#include "smInt.h" #include "smInt.h"
#include "libs/function/function.h" #include "libs/function/function.h"
static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); } static int32_t smRequire(const SMgmtInputOpt *pInput, bool *required) {
return dmReadFile(pInput->path, pInput->name, required);
static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
pOption->msgCb = msgCb;
} }
static void smClose(SMgmtWrapper *pWrapper) { static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { pOption->msgCb = pMgmt->msgCb; }
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
static void smClose(SSnodeMgmt *pMgmt) {
dInfo("snode-mgmt start to cleanup"); dInfo("snode-mgmt start to cleanup");
udfcClose();
if (pMgmt->pSnode != NULL) { if (pMgmt->pSnode != NULL) {
smStopWorker(pMgmt); smStopWorker(pMgmt);
sndClose(pMgmt->pSnode); sndClose(pMgmt->pSnode);
pMgmt->pSnode = NULL; pMgmt->pSnode = NULL;
} }
pWrapper->pMgmt = NULL;
taosMemoryFree(pMgmt); taosMemoryFree(pMgmt);
dInfo("snode-mgmt is cleaned up"); dInfo("snode-mgmt is cleaned up");
} }
int32_t smOpen(SMgmtWrapper *pWrapper) { int32_t smOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
dInfo("snode-mgmt start to init"); dInfo("snode-mgmt start to init");
SSnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SSnodeMgmt)); SSnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SSnodeMgmt));
if (pMgmt == NULL) { if (pMgmt == NULL) {
...@@ -52,42 +43,47 @@ int32_t smOpen(SMgmtWrapper *pWrapper) { ...@@ -52,42 +43,47 @@ int32_t smOpen(SMgmtWrapper *pWrapper) {
return -1; return -1;
} }
pMgmt->path = pWrapper->path; pMgmt->path = pInput->path;
pMgmt->pDnode = pWrapper->pDnode; pMgmt->name = pInput->name;
pMgmt->pWrapper = pWrapper; pMgmt->dnodeId = pInput->dnodeId;
pWrapper->pMgmt = pMgmt; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.pMgmt = pMgmt;
SSnodeOpt option = {0}; SSnodeOpt option = {0};
smInitOption(pMgmt, &option); smInitOption(pMgmt, &option);
pMgmt->pSnode = sndOpen(pMgmt->path, &option); pMgmt->pSnode = sndOpen(pMgmt->path, &option);
if (pMgmt->pSnode == NULL) { if (pMgmt->pSnode == NULL) {
dError("failed to open snode since %s", terrstr()); dError("failed to open snode since %s", terrstr());
smClose(pMgmt);
return -1; return -1;
} }
dmReportStartup(pWrapper->pDnode, "snode-impl", "initialized"); tmsgReportStartup("snode-impl", "initialized");
if (smStartWorker(pMgmt) != 0) { if (smStartWorker(pMgmt) != 0) {
dError("failed to start snode worker since %s", terrstr()); dError("failed to start snode worker since %s", terrstr());
smClose(pMgmt);
return -1; return -1;
} }
dmReportStartup(pWrapper->pDnode, "snode-worker", "initialized"); tmsgReportStartup("snode-worker", "initialized");
if (udfcOpen() != 0) { if (udfcOpen() != 0) {
dError("failed to open udfc in snode"); dError("failed to open udfc in snode");
smClose(pMgmt);
return -1;
} }
pOutput->pMgmt = pMgmt;
return 0; return 0;
} }
void smInitWrapper(SMgmtWrapper *pWrapper) { SMgmtFunc smGetMgmtFunc() {
SMgmtFp mgmtFp = {0}; SMgmtFunc mgmtFunc = {0};
mgmtFp.openFp = smOpen; mgmtFunc.openFp = smOpen;
mgmtFp.closeFp = smClose; mgmtFunc.closeFp = (NodeCloseFp)smClose;
mgmtFp.createFp = smProcessCreateReq; mgmtFunc.createFp = (NodeCreateFp)smProcessCreateReq;
mgmtFp.dropFp = smProcessDropReq; mgmtFunc.dropFp = (NodeDropFp)smProcessDropReq;
mgmtFp.requiredFp = smRequire; mgmtFunc.requiredFp = smRequire;
mgmtFunc.getHandlesFp = smGetMsgHandles;
smInitMsgHandle(pWrapper); return mgmtFunc;
pWrapper->name = "snode";
pWrapper->fp = mgmtFp;
} }
...@@ -36,7 +36,7 @@ static void smProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -36,7 +36,7 @@ static void smProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
int32_t code = -1; int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) { if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) {
code = smProcessGetMonSmInfoReq(pMgmt->pWrapper, pMsg); code = smProcessGetMonitorInfoReq(pMgmt, pMsg);
} else { } else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
...@@ -163,8 +163,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { ...@@ -163,8 +163,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
return 0; return 0;
} }
int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
if (pWorker == NULL) { if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
...@@ -176,8 +175,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -176,8 +175,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smPutNodeMsgToMonitorQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker; SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
...@@ -185,8 +183,7 @@ int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -185,8 +183,7 @@ int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smPutNodeMsgToUniqueQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg);
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
if (pWorker == NULL) { if (pWorker == NULL) {
...@@ -199,8 +196,7 @@ int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -199,8 +196,7 @@ int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smPutNodeMsgToSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->sharedWorker; SSingleWorker *pWorker = &pMgmt->sharedWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
...@@ -208,11 +204,11 @@ int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -208,11 +204,11 @@ int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t smProcessExecMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smPutNodeMsgToExecQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg); int32_t workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg);
if (workerType == SND_WORKER_TYPE__SHARED) { if (workerType == SND_WORKER_TYPE__SHARED) {
return smProcessSharedMsg(pWrapper, pMsg); return smPutNodeMsgToSharedQueue(pMgmt, pMsg);
} else { } else {
return smProcessUniqueMsg(pWrapper, pMsg); return smPutNodeMsgToUniqueQueue(pMgmt, pMsg);
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册