提交 a27bffd6 编写于 作者: S Shengliang

refactor: node mgmt

上级 47b9216b
...@@ -26,30 +26,31 @@ extern "C" { ...@@ -26,30 +26,31 @@ extern "C" {
typedef struct SQnodeMgmt { typedef struct SQnodeMgmt {
SQnode *pQnode; SQnode *pQnode;
SDnode *pDnode; SMsgCb msgCb;
SMgmtWrapper *pWrapper;
const char *path; const char *path;
const char *name;
int32_t dnodeId;
SSingleWorker queryWorker; SSingleWorker queryWorker;
SSingleWorker fetchWorker; SSingleWorker fetchWorker;
SSingleWorker monitorWorker; SSingleWorker monitorWorker;
} SQnodeMgmt; } SQnodeMgmt;
// qmHandle.c // qmHandle.c
void qmInitMsgHandle(SMgmtWrapper *pWrapper); SArray *qmGetMsgHandles();
int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmProcessGetMonQmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SNodeMsg *pReq);
// qmWorker.c // qmWorker.c
int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t qmPutRpcMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype); int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype);
int32_t qmStartWorker(SQnodeMgmt *pMgmt); int32_t qmStartWorker(SQnodeMgmt *pMgmt);
void qmStopWorker(SQnodeMgmt *pMgmt); void qmStopWorker(SQnodeMgmt *pMgmt);
int32_t qmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t qmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -16,12 +16,12 @@ ...@@ -16,12 +16,12 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "qmInt.h" #include "qmInt.h"
void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo) {} static void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
int32_t qmProcessGetMonQmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMonQmInfo qmInfo = {0}; SMonQmInfo qmInfo = {0};
qmGetMonitorInfo(pWrapper, &qmInfo); qmGetMonitorInfo(pMgmt, &qmInfo);
dmGetMonitorSysInfo(&qmInfo.sys); dmGetSystemInfo(&qmInfo.sys);
monGetLogs(&qmInfo.log); monGetLogs(&qmInfo.log);
int32_t rspLen = tSerializeSMonQmInfo(NULL, 0, &qmInfo); int32_t rspLen = tSerializeSMonQmInfo(NULL, 0, &qmInfo);
...@@ -43,8 +43,7 @@ int32_t qmProcessGetMonQmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { ...@@ -43,8 +43,7 @@ int32_t qmProcessGetMonQmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
return 0; return 0;
} }
int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
SDCreateQnodeReq createReq = {0}; SDCreateQnodeReq createReq = {0};
...@@ -53,14 +52,14 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -53,14 +52,14 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (createReq.dnodeId != pDnode->data.dnodeId) { if (createReq.dnodeId != pMgmt->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create qnode since %s", terrstr()); dError("failed to create qnode since %s", terrstr());
return -1; return -1;
} }
bool deployed = true; bool deployed = true;
if (dmWriteFile(pWrapper, deployed) != 0) { if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) {
dError("failed to write qnode file since %s", terrstr()); dError("failed to write qnode file since %s", terrstr());
return -1; return -1;
} }
...@@ -68,8 +67,7 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -68,8 +67,7 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
SDDropQnodeReq dropReq = {0}; SDDropQnodeReq dropReq = {0};
...@@ -78,14 +76,14 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -78,14 +76,14 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return -1; return -1;
} }
if (dropReq.dnodeId != pDnode->data.dnodeId) { if (dropReq.dnodeId != pMgmt->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION; terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop qnode since %s", terrstr()); dError("failed to drop qnode since %s", terrstr());
return -1; return -1;
} }
bool deployed = false; bool deployed = false;
if (dmWriteFile(pWrapper, deployed) != 0) { if (dmWriteFile(pMgmt->path, pMgmt->name, deployed) != 0) {
dError("failed to write qnode file since %s", terrstr()); dError("failed to write qnode file since %s", terrstr());
return -1; return -1;
} }
...@@ -93,18 +91,31 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -93,18 +91,31 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
void qmInitMsgHandle(SMgmtWrapper *pWrapper) { SArray *qmGetMsgHandles() {
dmSetMsgHandle(pWrapper, TDMT_MON_QM_INFO, qmProcessMonitorMsg, 0); int32_t code = -1;
SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle));
if (pArray == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MON_QM_INFO, qmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
// Requests handled by VNODE // Requests handled by VNODE
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH, qmProcessFetchMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, qmProcessFetchMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, qmProcessFetchMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, qmProcessFetchMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_RES_READY, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_TASKS_STATUS, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, qmProcessFetchMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, qmProcessFetchMsg, 1); if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:
if (code != 0) {
taosArrayDestroy(pArray);
return NULL;
} else {
return pArray;
}
} }
...@@ -16,21 +16,13 @@ ...@@ -16,21 +16,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "qmInt.h" #include "qmInt.h"
static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); } static int32_t qmRequire(const SMgmtInputOpt *pInput, bool *required) {
return dmReadFile(pInput->path, pInput->name, required);
static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue;
msgCb.qsizeFp = qmGetQueueSize;
pOption->msgCb = msgCb;
} }
static void qmClose(SMgmtWrapper *pWrapper) { static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { pOption->msgCb = pMgmt->msgCb; }
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
static void qmClose(SQnodeMgmt *pMgmt) {
dInfo("qnode-mgmt start to cleanup"); dInfo("qnode-mgmt start to cleanup");
if (pMgmt->pQnode != NULL) { if (pMgmt->pQnode != NULL) {
qmStopWorker(pMgmt); qmStopWorker(pMgmt);
...@@ -38,12 +30,11 @@ static void qmClose(SMgmtWrapper *pWrapper) { ...@@ -38,12 +30,11 @@ static void qmClose(SMgmtWrapper *pWrapper) {
pMgmt->pQnode = NULL; pMgmt->pQnode = NULL;
} }
pWrapper->pMgmt = NULL;
taosMemoryFree(pMgmt); taosMemoryFree(pMgmt);
dInfo("qnode-mgmt is cleaned up"); dInfo("qnode-mgmt is cleaned up");
} }
static int32_t qmOpen(SMgmtWrapper *pWrapper) { static int32_t qmOpen(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
dInfo("qnode-mgmt start to init"); dInfo("qnode-mgmt start to init");
SQnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SQnodeMgmt)); SQnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SQnodeMgmt));
if (pMgmt == NULL) { if (pMgmt == NULL) {
...@@ -51,41 +42,45 @@ static int32_t qmOpen(SMgmtWrapper *pWrapper) { ...@@ -51,41 +42,45 @@ static int32_t qmOpen(SMgmtWrapper *pWrapper) {
return -1; return -1;
} }
pMgmt->path = pWrapper->path; pMgmt->path = pInput->path;
pMgmt->pDnode = pWrapper->pDnode; pMgmt->name = pInput->name;
pMgmt->pWrapper = pWrapper; pMgmt->dnodeId = pInput->dnodeId;
pWrapper->pMgmt = pMgmt; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qmPutRpcMsgToQueryQueue;
pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)qmPutRpcMsgToFetchQueue;
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)qmGetQueueSize;
pMgmt->msgCb.pMgmt = pMgmt;
SQnodeOpt option = {0}; SQnodeOpt option = {0};
qmInitOption(pMgmt, &option); qmInitOption(pMgmt, &option);
pMgmt->pQnode = qndOpen(&option); pMgmt->pQnode = qndOpen(&option);
if (pMgmt->pQnode == NULL) { if (pMgmt->pQnode == NULL) {
dError("failed to open qnode since %s", terrstr()); dError("failed to open qnode since %s", terrstr());
qmClose(pWrapper); qmClose(pMgmt);
return -1; return -1;
} }
dmReportStartup(pWrapper->pDnode, "qnode-impl", "initialized"); tmsgReportStartup("qnode-impl", "initialized");
if (qmStartWorker(pMgmt) != 0) { if (qmStartWorker(pMgmt) != 0) {
dError("failed to start qnode worker since %s", terrstr()); dError("failed to start qnode worker since %s", terrstr());
qmClose(pWrapper); qmClose(pMgmt);
return -1; return -1;
} }
dmReportStartup(pWrapper->pDnode, "qnode-worker", "initialized"); tmsgReportStartup("qnode-worker", "initialized");
pOutput->pMgmt = pMgmt;
dInfo("qnode-mgmt is initialized"); dInfo("qnode-mgmt is initialized");
return 0; return 0;
} }
void qmInitWrapper(SMgmtWrapper *pWrapper) { SMgmtFunc qmGetMgmtFunc() {
SMgmtFp mgmtFp = {0}; SMgmtFunc mgmtFunc = {0};
mgmtFp.openFp = qmOpen; mgmtFunc.openFp = qmOpen;
mgmtFp.closeFp = qmClose; mgmtFunc.closeFp = (NodeCloseFp)qmClose;
mgmtFp.createFp = qmProcessCreateReq; mgmtFunc.createFp = (NodeCreateFp)qmProcessCreateReq;
mgmtFp.dropFp = qmProcessDropReq; mgmtFunc.dropFp = (NodeDropFp)qmProcessDropReq;
mgmtFp.requiredFp = qmRequire; mgmtFunc.requiredFp = qmRequire;
mgmtFunc.getHandlesFp = qmGetMsgHandles;
qmInitMsgHandle(pWrapper); return mgmtFunc;
pWrapper->name = "qnode";
pWrapper->fp = mgmtFp;
} }
...@@ -36,7 +36,7 @@ static void qmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -36,7 +36,7 @@ static void qmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
int32_t code = -1; int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_MON_QM_INFO) { if (pMsg->rpcMsg.msgType == TDMT_MON_QM_INFO) {
code = qmProcessGetMonQmInfoReq(pMgmt->pWrapper, pMsg); code = qmProcessGetMonitorInfoReq(pMgmt, pMsg);
} else { } else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
...@@ -83,27 +83,22 @@ static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -83,27 +83,22 @@ static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void qmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { static int32_t qmPutNodeMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
return 0;
} }
int32_t qmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t qmPutNodeMsgToQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SQnodeMgmt *pMgmt = pWrapper->pMgmt; return qmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
qmPutMsgToWorker(&pMgmt->queryWorker, pMsg);
return 0;
} }
int32_t qmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SQnodeMgmt *pMgmt = pWrapper->pMgmt; return qmPutNodeMsgToWorker(&pMgmt->fetchWorker, pMsg);
qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg);
return 0;
} }
int32_t qmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SQnodeMgmt *pMgmt = pWrapper->pMgmt; return qmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg);
qmPutMsgToWorker(&pMgmt->monitorWorker, pMsg);
return 0;
} }
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
...@@ -118,19 +113,16 @@ static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SR ...@@ -118,19 +113,16 @@ static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SR
return 0; return 0;
} }
int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pRpc) {
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
return qmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc); return qmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc);
} }
int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { int32_t qmPutRpcMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pRpc) {
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc); return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc);
} }
int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
int32_t size = -1; int32_t size = -1;
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
switch (qtype) { switch (qtype) {
case QUERY_QUEUE: case QUERY_QUEUE:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册