提交 ede2bc03 编写于 作者: S Shengliang Guan

refactor: node mgmt

上级 ff2f5d28
...@@ -51,7 +51,7 @@ int32_t dmReadEps(SDnode *pDnode) { ...@@ -51,7 +51,7 @@ int32_t dmReadEps(SDnode *pDnode) {
pDnode->data.dnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); pDnode->data.dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
if (pDnode->data.dnodeEps == NULL) { if (pDnode->data.dnodeEps == NULL) {
dError("failed to calloc dnodeEp array since %s", strerror(errno)); dError("failed to calloc dnodeEp array since %s", strerror(errno));
goto PRASE_DNODE_OVER; goto _OVER;
} }
snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP);
...@@ -59,53 +59,53 @@ int32_t dmReadEps(SDnode *pDnode) { ...@@ -59,53 +59,53 @@ int32_t dmReadEps(SDnode *pDnode) {
if (pFile == NULL) { if (pFile == NULL) {
// dDebug("file %s not exist", file); // dDebug("file %s not exist", file);
code = 0; code = 0;
goto PRASE_DNODE_OVER; goto _OVER;
} }
len = (int32_t)taosReadFile(pFile, content, maxLen); len = (int32_t)taosReadFile(pFile, content, maxLen);
if (len <= 0) { if (len <= 0) {
dError("failed to read %s since content is null", file); dError("failed to read %s since content is null", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
content[len] = 0; content[len] = 0;
root = cJSON_Parse(content); root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
dError("failed to read %s since invalid json format", file); dError("failed to read %s since invalid json format", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_Number) { if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read %s since dnodeId not found", file); dError("failed to read %s since dnodeId not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
pDnode->data.dnodeId = dnodeId->valueint; pDnode->data.dnodeId = dnodeId->valueint;
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
if (!clusterId || clusterId->type != cJSON_String) { if (!clusterId || clusterId->type != cJSON_String) {
dError("failed to read %s since clusterId not found", file); dError("failed to read %s since clusterId not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
pDnode->data.clusterId = atoll(clusterId->valuestring); pDnode->data.clusterId = atoll(clusterId->valuestring);
cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) { if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", file); dError("failed to read %s since dropped not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
pDnode->data.dropped = dropped->valueint; pDnode->data.dropped = dropped->valueint;
cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes"); cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes");
if (!dnodes || dnodes->type != cJSON_Array) { if (!dnodes || dnodes->type != cJSON_Array) {
dError("failed to read %s since dnodes not found", file); dError("failed to read %s since dnodes not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
int32_t numOfDnodes = cJSON_GetArraySize(dnodes); int32_t numOfDnodes = cJSON_GetArraySize(dnodes);
if (numOfDnodes <= 0) { if (numOfDnodes <= 0) {
dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes); dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes);
goto PRASE_DNODE_OVER; goto _OVER;
} }
for (int32_t i = 0; i < numOfDnodes; ++i) { for (int32_t i = 0; i < numOfDnodes; ++i) {
...@@ -117,7 +117,7 @@ int32_t dmReadEps(SDnode *pDnode) { ...@@ -117,7 +117,7 @@ int32_t dmReadEps(SDnode *pDnode) {
cJSON *did = cJSON_GetObjectItem(node, "id"); cJSON *did = cJSON_GetObjectItem(node, "id");
if (!did || did->type != cJSON_Number) { if (!did || did->type != cJSON_Number) {
dError("failed to read %s since dnodeId not found", file); dError("failed to read %s since dnodeId not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
dnodeEp.id = did->valueint; dnodeEp.id = did->valueint;
...@@ -125,14 +125,14 @@ int32_t dmReadEps(SDnode *pDnode) { ...@@ -125,14 +125,14 @@ int32_t dmReadEps(SDnode *pDnode) {
cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn"); cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn");
if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
dError("failed to read %s since dnodeFqdn not found", file); dError("failed to read %s since dnodeFqdn not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
cJSON *dnodePort = cJSON_GetObjectItem(node, "port"); cJSON *dnodePort = cJSON_GetObjectItem(node, "port");
if (!dnodePort || dnodePort->type != cJSON_Number) { if (!dnodePort || dnodePort->type != cJSON_Number) {
dError("failed to read %s since dnodePort not found", file); dError("failed to read %s since dnodePort not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
dnodeEp.ep.port = dnodePort->valueint; dnodeEp.ep.port = dnodePort->valueint;
...@@ -140,7 +140,7 @@ int32_t dmReadEps(SDnode *pDnode) { ...@@ -140,7 +140,7 @@ int32_t dmReadEps(SDnode *pDnode) {
cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode"); cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode");
if (!isMnode || isMnode->type != cJSON_Number) { if (!isMnode || isMnode->type != cJSON_Number) {
dError("failed to read %s since isMnode not found", file); dError("failed to read %s since isMnode not found", file);
goto PRASE_DNODE_OVER; goto _OVER;
} }
dnodeEp.isMnode = isMnode->valueint; dnodeEp.isMnode = isMnode->valueint;
...@@ -151,7 +151,7 @@ int32_t dmReadEps(SDnode *pDnode) { ...@@ -151,7 +151,7 @@ int32_t dmReadEps(SDnode *pDnode) {
dDebug("succcessed to read file %s", file); dDebug("succcessed to read file %s", file);
dmPrintEps(pDnode); dmPrintEps(pDnode);
PRASE_DNODE_OVER: _OVER:
if (content != NULL) taosMemoryFree(content); if (content != NULL) taosMemoryFree(content);
if (root != NULL) cJSON_Delete(root); if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile); if (pFile != NULL) taosCloseFile(&pFile);
...@@ -176,7 +176,7 @@ PRASE_DNODE_OVER: ...@@ -176,7 +176,7 @@ PRASE_DNODE_OVER:
int32_t dmWriteEps(SDnode *pDnode) { int32_t dmWriteEps(SDnode *pDnode) {
char file[PATH_MAX] = {0}; char file[PATH_MAX] = {0};
char realfile[PATH_MAX]; char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%sdnode.json.bak", pDnode->wrappers[DNODE].path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%sdnode.json.bak", pDnode->wrappers[DNODE].path, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP);
......
...@@ -105,12 +105,13 @@ void dmStopMonitorThread(SDnode *pDnode) { ...@@ -105,12 +105,13 @@ void dmStopMonitorThread(SDnode *pDnode) {
} }
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SDnode * pDnode = pInfo->ahandle; SDnode *pDnode = pInfo->ahandle;
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1; int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, will be processed in dnode-mgmt queue", pMsg); dTrace("msg:%p, will be processed in dnode-mgmt queue", pMsg);
switch (pRpc->msgType) { switch (msgType) {
case TDMT_DND_CONFIG_DNODE: case TDMT_DND_CONFIG_DNODE:
code = dmProcessConfigReq(pDnode, pMsg); code = dmProcessConfigReq(pDnode, pMsg);
break; break;
...@@ -148,9 +149,14 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -148,9 +149,14 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
break; break;
} }
if (pRpc->msgType & 1u) { if (msgType & 1u) {
if (code != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code, .refId = pRpc->refId}; SRpcMsg rsp = {
.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.code = code,
.refId = pMsg->rpcMsg.refId,
};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
} }
...@@ -160,7 +166,13 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -160,7 +166,13 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
} }
int32_t dmStartWorker(SDnode *pDnode) { int32_t dmStartWorker(SDnode *pDnode) {
SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessMgmtQueue, .param = pDnode}; SSingleWorkerCfg cfg = {
.min = 1,
.max = 1,
.name = "dnode-mgmt",
.fp = (FItem)dmProcessMgmtQueue,
.param = pDnode,
};
if (tSingleWorkerInit(&pDnode->data.mgmtWorker, &cfg) != 0) { if (tSingleWorkerInit(&pDnode->data.mgmtWorker, &cfg) != 0) {
dError("failed to start dnode-mgmt worker since %s", terrstr()); dError("failed to start dnode-mgmt worker since %s", terrstr());
return -1; return -1;
......
...@@ -18,7 +18,11 @@ ...@@ -18,7 +18,11 @@
static void bmSendErrorRsp(SNodeMsg *pMsg, int32_t code) { static void bmSendErrorRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code, .refId = pMsg->rpcMsg.refId}; .handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.code = code,
.refId = pMsg->rpcMsg.refId,
};
tmsgSendRsp(&rpcRsp); tmsgSendRsp(&rpcRsp);
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
...@@ -103,7 +107,7 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -103,7 +107,7 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
} }
int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SBnodeMgmt * pMgmt = pWrapper->pMgmt; SBnodeMgmt *pMgmt = pWrapper->pMgmt;
SMultiWorker *pWorker = &pMgmt->writeWorker; SMultiWorker *pWorker = &pMgmt->writeWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
...@@ -112,7 +116,7 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -112,7 +116,7 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SBnodeMgmt * pMgmt = pWrapper->pMgmt; SBnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker; SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
...@@ -121,7 +125,12 @@ int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -121,7 +125,12 @@ int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t bmStartWorker(SBnodeMgmt *pMgmt) { int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
SMultiWorkerCfg cfg = {.max = 1, .name = "bnode-write", .fp = (FItems)bmProcessWriteQueue, .param = pMgmt}; SMultiWorkerCfg cfg = {
.max = 1,
.name = "bnode-write",
.fp = (FItems)bmProcessWriteQueue,
.param = pMgmt,
};
if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) {
dError("failed to start bnode-write worker since %s", terrstr()); dError("failed to start bnode-write worker since %s", terrstr());
return -1; return -1;
...@@ -129,7 +138,12 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) { ...@@ -129,7 +138,12 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
if (tsMultiProcess) { if (tsMultiProcess) {
SSingleWorkerCfg mCfg = { SSingleWorkerCfg mCfg = {
.min = 1, .max = 1, .name = "bnode-monitor", .fp = (FItem)bmProcessMonitorQueue, .param = pMgmt}; .min = 1,
.max = 1,
.name = "bnode-monitor",
.fp = (FItem)bmProcessMonitorQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start bnode-monitor worker since %s", terrstr()); dError("failed to start bnode-monitor worker since %s", terrstr());
return -1; return -1;
......
...@@ -22,7 +22,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { ...@@ -22,7 +22,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
int32_t maxLen = 4096; int32_t maxLen = 4096;
char *content = taosMemoryCalloc(1, maxLen + 1); char *content = taosMemoryCalloc(1, maxLen + 1);
cJSON *root = NULL; cJSON *root = NULL;
char file[PATH_MAX]; char file[PATH_MAX] = {0};
TdFilePtr pFile = NULL; TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
...@@ -30,39 +30,39 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { ...@@ -30,39 +30,39 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
if (pFile == NULL) { if (pFile == NULL) {
// dDebug("file %s not exist", file); // dDebug("file %s not exist", file);
code = 0; code = 0;
goto PRASE_MNODE_OVER; goto _OVER;
} }
len = (int32_t)taosReadFile(pFile, content, maxLen); len = (int32_t)taosReadFile(pFile, content, maxLen);
if (len <= 0) { if (len <= 0) {
dError("failed to read %s since content is null", file); dError("failed to read %s since content is null", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
content[len] = 0; content[len] = 0;
root = cJSON_Parse(content); root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
dError("failed to read %s since invalid json format", file); dError("failed to read %s since invalid json format", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
if (!deployed || deployed->type != cJSON_Number) { if (!deployed || deployed->type != cJSON_Number) {
dError("failed to read %s since deployed not found", file); dError("failed to read %s since deployed not found", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
*pDeployed = deployed->valueint; *pDeployed = deployed->valueint;
cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
if (!mnodes || mnodes->type != cJSON_Array) { if (!mnodes || mnodes->type != cJSON_Array) {
dError("failed to read %s since nodes not found", file); dError("failed to read %s since nodes not found", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
pMgmt->replica = cJSON_GetArraySize(mnodes); pMgmt->replica = cJSON_GetArraySize(mnodes);
if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica); dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
goto PRASE_MNODE_OVER; goto _OVER;
} }
for (int32_t i = 0; i < pMgmt->replica; ++i) { for (int32_t i = 0; i < pMgmt->replica; ++i) {
...@@ -74,21 +74,21 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { ...@@ -74,21 +74,21 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
cJSON *id = cJSON_GetObjectItem(node, "id"); cJSON *id = cJSON_GetObjectItem(node, "id");
if (!id || id->type != cJSON_Number) { if (!id || id->type != cJSON_Number) {
dError("failed to read %s since id not found", file); dError("failed to read %s since id not found", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
pReplica->id = id->valueint; pReplica->id = id->valueint;
cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
dError("failed to read %s since fqdn not found", file); dError("failed to read %s since fqdn not found", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
cJSON *port = cJSON_GetObjectItem(node, "port"); cJSON *port = cJSON_GetObjectItem(node, "port");
if (!port || port->type != cJSON_Number) { if (!port || port->type != cJSON_Number) {
dError("failed to read %s since port not found", file); dError("failed to read %s since port not found", file);
goto PRASE_MNODE_OVER; goto _OVER;
} }
pReplica->port = port->valueint; pReplica->port = port->valueint;
} }
...@@ -96,7 +96,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { ...@@ -96,7 +96,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
code = 0; code = 0;
dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed); dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
PRASE_MNODE_OVER: _OVER:
if (content != NULL) taosMemoryFree(content); if (content != NULL) taosMemoryFree(content);
if (root != NULL) cJSON_Delete(root); if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile); if (pFile != NULL) taosCloseFile(&pFile);
......
...@@ -161,9 +161,7 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) { ...@@ -161,9 +161,7 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) {
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if (!deployed) { if (!deployed) {
dInfo("mnode start to deploy"); dInfo("mnode start to deploy");
// if (pWrapper->procType == DND_PROC_CHILD) { pWrapper->pDnode->data.dnodeId = 1;
pWrapper->pDnode->data.dnodeId = 1;
// }
mmBuildOptionForDeploy(pMgmt, &option); mmBuildOptionForDeploy(pMgmt, &option);
} else { } else {
dInfo("mnode start to open"); dInfo("mnode start to open");
......
...@@ -17,42 +17,48 @@ ...@@ -17,42 +17,48 @@
#include "mmInt.h" #include "mmInt.h"
static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) { static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, SRpcMsg rsp = {
.ahandle = pMsg->rpcMsg.ahandle, .handle = pMsg->rpcMsg.handle,
.refId = pMsg->rpcMsg.refId, .ahandle = pMsg->rpcMsg.ahandle,
.code = code, .refId = pMsg->rpcMsg.refId,
.pCont = pMsg->pRsp, .code = code,
.contLen = pMsg->rspLen}; .pCont = pMsg->pRsp,
.contLen = pMsg->rspLen,
};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle; SMnodeMgmt *pMgmt = pInfo->ahandle;
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, get from mnode queue", pMsg); dTrace("msg:%p, get from mnode queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_DND_ALTER_MNODE) { switch (msgType) {
code = mmProcessAlterReq(pMgmt, pMsg); case TDMT_DND_ALTER_MNODE:
} else if (pMsg->rpcMsg.msgType == TDMT_MON_MM_INFO) { code = mmProcessAlterReq(pMgmt, pMsg);
code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg); break;
} else if (pMsg->rpcMsg.msgType == TDMT_MON_MM_LOAD) { case TDMT_MON_MM_INFO:
code = mmProcessGetMnodeLoadsReq(pMgmt->pWrapper, pMsg); code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg);
} else { break;
pMsg->pNode = pMgmt->pMnode; case TDMT_MON_MM_LOAD:
code = mndProcessMsg(pMsg); code = mmProcessGetMnodeLoadsReq(pMgmt->pWrapper, pMsg);
break;
default:
pMsg->pNode = pMgmt->pMnode;
code = mndProcessMsg(pMsg);
} }
if (pRpc->msgType & 1U) { if (msgType & 1U) {
if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (pMsg->rpcMsg.handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
mmSendRsp(pMsg, code); mmSendRsp(pMsg, code);
} }
} }
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pRpc->pCont); rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
...@@ -78,38 +84,38 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -78,38 +84,38 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void mmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { static void mmPutNodeMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
} }
int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutMsgToWorker(&pMgmt->writeWorker, pMsg); mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg);
return 0; return 0;
} }
int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutMsgToWorker(&pMgmt->syncWorker, pMsg); mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg);
return 0; return 0;
} }
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutMsgToWorker(&pMgmt->readWorker, pMsg); mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg);
return 0; return 0;
} }
int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutMsgToWorker(&pMgmt->queryWorker, pMsg); mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg);
return 0; return 0;
} }
int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mmPutMsgToWorker(&pMgmt->monitorWorker, pMsg); mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg);
return 0; return 0;
} }
...@@ -144,40 +150,62 @@ int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { ...@@ -144,40 +150,62 @@ int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
} }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) { int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg qCfg = {.min = tsNumOfMnodeQueryThreads, SSingleWorkerCfg qCfg = {
.max = tsNumOfMnodeQueryThreads, .min = tsNumOfMnodeQueryThreads,
.name = "mnode-query", .max = tsNumOfMnodeQueryThreads,
.fp = (FItem)mmProcessQueryQueue, .name = "mnode-query",
.param = pMgmt}; .fp = (FItem)mmProcessQueryQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) {
dError("failed to start mnode-query worker since %s", terrstr()); dError("failed to start mnode-query worker since %s", terrstr());
return -1; return -1;
} }
SSingleWorkerCfg rCfg = {.min = tsNumOfMnodeReadThreads, SSingleWorkerCfg rCfg = {
.max = tsNumOfMnodeReadThreads, .min = tsNumOfMnodeReadThreads,
.name = "mnode-read", .max = tsNumOfMnodeReadThreads,
.fp = (FItem)mmProcessQueue, .name = "mnode-read",
.param = pMgmt}; .fp = (FItem)mmProcessQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) { if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) {
dError("failed to start mnode-read worker since %s", terrstr()); dError("failed to start mnode-read worker since %s", terrstr());
return -1; return -1;
} }
SSingleWorkerCfg wCfg = {.min = 1, .max = 1, .name = "mnode-write", .fp = (FItem)mmProcessQueue, .param = pMgmt}; SSingleWorkerCfg wCfg = {
.min = 1,
.max = 1,
.name = "mnode-write",
.fp = (FItem)mmProcessQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) { if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) {
dError("failed to start mnode-write worker since %s", terrstr()); dError("failed to start mnode-write worker since %s", terrstr());
return -1; return -1;
} }
SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt}; SSingleWorkerCfg sCfg = {
.min = 1,
.max = 1,
.name = "mnode-sync",
.fp = (FItem)mmProcessQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
dError("failed to start mnode mnode-sync worker since %s", terrstr()); dError("failed to start mnode mnode-sync worker since %s", terrstr());
return -1; return -1;
} }
if (tsMultiProcess) { if (tsMultiProcess) {
SSingleWorkerCfg mCfg = {.min = 1, .max = 1, .name = "mnode-monitor", .fp = (FItem)mmProcessQueue, .param = pMgmt}; SSingleWorkerCfg mCfg = {
.min = 1,
.max = 1,
.name = "mnode-monitor",
.fp = (FItem)mmProcessQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start mnode mnode-monitor worker since %s", terrstr()); dError("failed to start mnode mnode-monitor worker since %s", terrstr());
return -1; return -1;
......
...@@ -17,12 +17,14 @@ ...@@ -17,12 +17,14 @@
#include "qmInt.h" #include "qmInt.h"
static inline void qmSendRsp(SNodeMsg *pMsg, int32_t code) { static inline void qmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, SRpcMsg rsp = {
.ahandle = pMsg->rpcMsg.ahandle, .handle = pMsg->rpcMsg.handle,
.refId = pMsg->rpcMsg.refId, .ahandle = pMsg->rpcMsg.ahandle,
.code = code, .refId = pMsg->rpcMsg.refId,
.pCont = pMsg->pRsp, .code = code,
.contLen = pMsg->rspLen}; .pCont = pMsg->pRsp,
.contLen = pMsg->rspLen,
};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
...@@ -145,22 +147,26 @@ int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { ...@@ -145,22 +147,26 @@ int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
} }
int32_t qmStartWorker(SQnodeMgmt *pMgmt) { int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
SSingleWorkerCfg queryCfg = {.min = tsNumOfVnodeQueryThreads, SSingleWorkerCfg queryCfg = {
.max = tsNumOfVnodeQueryThreads, .min = tsNumOfVnodeQueryThreads,
.name = "qnode-query", .max = tsNumOfVnodeQueryThreads,
.fp = (FItem)qmProcessQueryQueue, .name = "qnode-query",
.param = pMgmt}; .fp = (FItem)qmProcessQueryQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) { if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) {
dError("failed to start qnode-query worker since %s", terrstr()); dError("failed to start qnode-query worker since %s", terrstr());
return -1; return -1;
} }
SSingleWorkerCfg fetchCfg = {.min = tsNumOfQnodeFetchThreads, SSingleWorkerCfg fetchCfg = {
.max = tsNumOfQnodeFetchThreads, .min = tsNumOfQnodeFetchThreads,
.name = "qnode-fetch", .max = tsNumOfQnodeFetchThreads,
.fp = (FItem)qmProcessFetchQueue, .name = "qnode-fetch",
.param = pMgmt}; .fp = (FItem)qmProcessFetchQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) { if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) {
dError("failed to start qnode-fetch worker since %s", terrstr()); dError("failed to start qnode-fetch worker since %s", terrstr());
...@@ -169,7 +175,12 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { ...@@ -169,7 +175,12 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
if (tsMultiProcess) { if (tsMultiProcess) {
SSingleWorkerCfg mCfg = { SSingleWorkerCfg mCfg = {
.min = 1, .max = 1, .name = "qnode-monitor", .fp = (FItem)qmProcessMonitorQueue, .param = pMgmt}; .min = 1,
.max = 1,
.name = "qnode-monitor",
.fp = (FItem)qmProcessMonitorQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start qnode-monitor worker since %s", terrstr()); dError("failed to start qnode-monitor worker since %s", terrstr());
return -1; return -1;
......
...@@ -17,12 +17,14 @@ ...@@ -17,12 +17,14 @@
#include "smInt.h" #include "smInt.h"
static inline void smSendRsp(SNodeMsg *pMsg, int32_t code) { static inline void smSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, SRpcMsg rsp = {
.ahandle = pMsg->rpcMsg.ahandle, .handle = pMsg->rpcMsg.handle,
.refId = pMsg->rpcMsg.refId, .ahandle = pMsg->rpcMsg.ahandle,
.code = code, .refId = pMsg->rpcMsg.refId,
.pCont = pMsg->pRsp, .code = code,
.contLen = pMsg->rspLen}; .pCont = pMsg->pRsp,
.contLen = pMsg->rspLen,
};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
...@@ -90,7 +92,12 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { ...@@ -90,7 +92,12 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
return -1; return -1;
} }
SMultiWorkerCfg cfg = {.max = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; SMultiWorkerCfg cfg = {
.max = 1,
.name = "snode-unique",
.fp = smProcessUniqueQueue,
.param = pMgmt,
};
if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) { if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) {
dError("failed to start snode-unique worker since %s", terrstr()); dError("failed to start snode-unique worker since %s", terrstr());
return -1; return -1;
...@@ -101,11 +108,13 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { ...@@ -101,11 +108,13 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
} }
} }
SSingleWorkerCfg cfg = {.min = tsNumOfSnodeSharedThreads, SSingleWorkerCfg cfg = {
.max = tsNumOfSnodeSharedThreads, .min = tsNumOfSnodeSharedThreads,
.name = "snode-shared", .max = tsNumOfSnodeSharedThreads,
.fp = (FItem)smProcessSharedQueue, .name = "snode-shared",
.param = pMgmt}; .fp = (FItem)smProcessSharedQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) { if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) {
dError("failed to start snode shared-worker since %s", terrstr()); dError("failed to start snode shared-worker since %s", terrstr());
...@@ -114,7 +123,12 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { ...@@ -114,7 +123,12 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
if (tsMultiProcess) { if (tsMultiProcess) {
SSingleWorkerCfg mCfg = { SSingleWorkerCfg mCfg = {
.min = 1, .max = 1, .name = "snode-monitor", .fp = (FItem)smProcessMonitorQueue, .param = pMgmt}; .min = 1,
.max = 1,
.name = "snode-monitor",
.fp = (FItem)smProcessMonitorQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start snode-monitor worker since %s", terrstr()); dError("failed to start snode-monitor worker since %s", terrstr());
return -1; return -1;
...@@ -150,7 +164,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { ...@@ -150,7 +164,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
} }
int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt * pMgmt = pWrapper->pMgmt; SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
if (pWorker == NULL) { if (pWorker == NULL) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
...@@ -163,7 +177,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -163,7 +177,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt * pMgmt = pWrapper->pMgmt; SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker; SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
...@@ -172,7 +186,7 @@ int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -172,7 +186,7 @@ int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt * pMgmt = pWrapper->pMgmt; SSnodeMgmt *pMgmt = pWrapper->pMgmt;
int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg);
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
if (pWorker == NULL) { if (pWorker == NULL) {
...@@ -186,7 +200,7 @@ int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -186,7 +200,7 @@ int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt * pMgmt = pWrapper->pMgmt; SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->sharedWorker; SSingleWorker *pWorker = &pMgmt->sharedWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册