From ede2bc03fcec9e03e710604fd561f7087995fea3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 9 May 2022 11:04:16 +0800 Subject: [PATCH] refactor: node mgmt --- source/dnode/mgmt/implement/src/dmEps.c | 30 +++--- source/dnode/mgmt/implement/src/dmWorker.c | 28 ++++-- source/dnode/mgmt/mgmt_bnode/src/bmWorker.c | 24 ++++- source/dnode/mgmt/mgmt_mnode/src/mmFile.c | 22 ++-- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 4 +- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 106 +++++++++++++------- source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 45 +++++---- source/dnode/mgmt/mgmt_snode/src/smWorker.c | 48 +++++---- 8 files changed, 192 insertions(+), 115 deletions(-) diff --git a/source/dnode/mgmt/implement/src/dmEps.c b/source/dnode/mgmt/implement/src/dmEps.c index 853c238316..f5c9a1d91b 100644 --- a/source/dnode/mgmt/implement/src/dmEps.c +++ b/source/dnode/mgmt/implement/src/dmEps.c @@ -51,7 +51,7 @@ int32_t dmReadEps(SDnode *pDnode) { pDnode->data.dnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); if (pDnode->data.dnodeEps == NULL) { dError("failed to calloc dnodeEp array since %s", strerror(errno)); - goto PRASE_DNODE_OVER; + goto _OVER; } snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP); @@ -59,53 +59,53 @@ int32_t dmReadEps(SDnode *pDnode) { if (pFile == NULL) { // dDebug("file %s not exist", file); code = 0; - goto PRASE_DNODE_OVER; + goto _OVER; } len = (int32_t)taosReadFile(pFile, content, maxLen); if (len <= 0) { dError("failed to read %s since content is null", file); - goto PRASE_DNODE_OVER; + goto _OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { dError("failed to read %s since invalid json format", file); - goto PRASE_DNODE_OVER; + goto _OVER; } cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_Number) { dError("failed to read %s since dnodeId not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } pDnode->data.dnodeId = dnodeId->valueint; cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); if (!clusterId || clusterId->type != cJSON_String) { dError("failed to read %s since clusterId not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } pDnode->data.clusterId = atoll(clusterId->valuestring); cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_Number) { dError("failed to read %s since dropped not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } pDnode->data.dropped = dropped->valueint; cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes"); if (!dnodes || dnodes->type != cJSON_Array) { dError("failed to read %s since dnodes not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } int32_t numOfDnodes = cJSON_GetArraySize(dnodes); if (numOfDnodes <= 0) { dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes); - goto PRASE_DNODE_OVER; + goto _OVER; } for (int32_t i = 0; i < numOfDnodes; ++i) { @@ -117,7 +117,7 @@ int32_t dmReadEps(SDnode *pDnode) { cJSON *did = cJSON_GetObjectItem(node, "id"); if (!did || did->type != cJSON_Number) { dError("failed to read %s since dnodeId not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } dnodeEp.id = did->valueint; @@ -125,14 +125,14 @@ int32_t dmReadEps(SDnode *pDnode) { cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn"); if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { dError("failed to read %s since dnodeFqdn not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); cJSON *dnodePort = cJSON_GetObjectItem(node, "port"); if (!dnodePort || dnodePort->type != cJSON_Number) { dError("failed to read %s since dnodePort not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } dnodeEp.ep.port = dnodePort->valueint; @@ -140,7 +140,7 @@ int32_t dmReadEps(SDnode *pDnode) { cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode"); if (!isMnode || isMnode->type != cJSON_Number) { dError("failed to read %s since isMnode not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } dnodeEp.isMnode = isMnode->valueint; @@ -151,7 +151,7 @@ int32_t dmReadEps(SDnode *pDnode) { dDebug("succcessed to read file %s", file); dmPrintEps(pDnode); -PRASE_DNODE_OVER: +_OVER: if (content != NULL) taosMemoryFree(content); if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); @@ -176,7 +176,7 @@ PRASE_DNODE_OVER: int32_t dmWriteEps(SDnode *pDnode) { char file[PATH_MAX] = {0}; - char realfile[PATH_MAX]; + char realfile[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s%sdnode.json.bak", pDnode->wrappers[DNODE].path, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP); diff --git a/source/dnode/mgmt/implement/src/dmWorker.c b/source/dnode/mgmt/implement/src/dmWorker.c index b19c2ab36b..72b2111591 100644 --- a/source/dnode/mgmt/implement/src/dmWorker.c +++ b/source/dnode/mgmt/implement/src/dmWorker.c @@ -105,12 +105,13 @@ void dmStopMonitorThread(SDnode *pDnode) { } static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { - SDnode * pDnode = pInfo->ahandle; - SRpcMsg *pRpc = &pMsg->rpcMsg; - int32_t code = -1; + SDnode *pDnode = pInfo->ahandle; + + int32_t code = -1; + tmsg_t msgType = pMsg->rpcMsg.msgType; dTrace("msg:%p, will be processed in dnode-mgmt queue", pMsg); - switch (pRpc->msgType) { + switch (msgType) { case TDMT_DND_CONFIG_DNODE: code = dmProcessConfigReq(pDnode, pMsg); break; @@ -148,9 +149,14 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { break; } - if (pRpc->msgType & 1u) { - if (code != 0) code = terrno; - SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code, .refId = pRpc->refId}; + if (msgType & 1u) { + if (code != 0 && terrno != 0) code = terrno; + SRpcMsg rsp = { + .handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .code = code, + .refId = pMsg->rpcMsg.refId, + }; rpcSendResponse(&rsp); } @@ -160,7 +166,13 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { } int32_t dmStartWorker(SDnode *pDnode) { - SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessMgmtQueue, .param = pDnode}; + SSingleWorkerCfg cfg = { + .min = 1, + .max = 1, + .name = "dnode-mgmt", + .fp = (FItem)dmProcessMgmtQueue, + .param = pDnode, + }; if (tSingleWorkerInit(&pDnode->data.mgmtWorker, &cfg) != 0) { dError("failed to start dnode-mgmt worker since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c index 230fa23674..d3204039e6 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c @@ -18,7 +18,11 @@ static void bmSendErrorRsp(SNodeMsg *pMsg, int32_t code) { SRpcMsg rpcRsp = { - .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code, .refId = pMsg->rpcMsg.refId}; + .handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .code = code, + .refId = pMsg->rpcMsg.refId, + }; tmsgSendRsp(&rpcRsp); dTrace("msg:%p, is freed", pMsg); @@ -103,7 +107,7 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SBnodeMgmt * pMgmt = pWrapper->pMgmt; + SBnodeMgmt *pMgmt = pWrapper->pMgmt; SMultiWorker *pWorker = &pMgmt->writeWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); @@ -112,7 +116,7 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SBnodeMgmt * pMgmt = pWrapper->pMgmt; + SBnodeMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->monitorWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); @@ -121,7 +125,12 @@ int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } int32_t bmStartWorker(SBnodeMgmt *pMgmt) { - SMultiWorkerCfg cfg = {.max = 1, .name = "bnode-write", .fp = (FItems)bmProcessWriteQueue, .param = pMgmt}; + SMultiWorkerCfg cfg = { + .max = 1, + .name = "bnode-write", + .fp = (FItems)bmProcessWriteQueue, + .param = pMgmt, + }; if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { dError("failed to start bnode-write worker since %s", terrstr()); return -1; @@ -129,7 +138,12 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) { if (tsMultiProcess) { SSingleWorkerCfg mCfg = { - .min = 1, .max = 1, .name = "bnode-monitor", .fp = (FItem)bmProcessMonitorQueue, .param = pMgmt}; + .min = 1, + .max = 1, + .name = "bnode-monitor", + .fp = (FItem)bmProcessMonitorQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start bnode-monitor worker since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index 75c48e79eb..83c832a41e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -22,7 +22,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { int32_t maxLen = 4096; char *content = taosMemoryCalloc(1, maxLen + 1); cJSON *root = NULL; - char file[PATH_MAX]; + char file[PATH_MAX] = {0}; TdFilePtr pFile = NULL; snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP); @@ -30,39 +30,39 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { if (pFile == NULL) { // dDebug("file %s not exist", file); code = 0; - goto PRASE_MNODE_OVER; + goto _OVER; } len = (int32_t)taosReadFile(pFile, content, maxLen); if (len <= 0) { dError("failed to read %s since content is null", file); - goto PRASE_MNODE_OVER; + goto _OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { dError("failed to read %s since invalid json format", file); - goto PRASE_MNODE_OVER; + goto _OVER; } cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); if (!deployed || deployed->type != cJSON_Number) { dError("failed to read %s since deployed not found", file); - goto PRASE_MNODE_OVER; + goto _OVER; } *pDeployed = deployed->valueint; cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); if (!mnodes || mnodes->type != cJSON_Array) { dError("failed to read %s since nodes not found", file); - goto PRASE_MNODE_OVER; + goto _OVER; } pMgmt->replica = cJSON_GetArraySize(mnodes); if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica); - goto PRASE_MNODE_OVER; + goto _OVER; } for (int32_t i = 0; i < pMgmt->replica; ++i) { @@ -74,21 +74,21 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { cJSON *id = cJSON_GetObjectItem(node, "id"); if (!id || id->type != cJSON_Number) { dError("failed to read %s since id not found", file); - goto PRASE_MNODE_OVER; + goto _OVER; } pReplica->id = id->valueint; cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { dError("failed to read %s since fqdn not found", file); - goto PRASE_MNODE_OVER; + goto _OVER; } tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); cJSON *port = cJSON_GetObjectItem(node, "port"); if (!port || port->type != cJSON_Number) { dError("failed to read %s since port not found", file); - goto PRASE_MNODE_OVER; + goto _OVER; } pReplica->port = port->valueint; } @@ -96,7 +96,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { code = 0; dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed); -PRASE_MNODE_OVER: +_OVER: if (content != NULL) taosMemoryFree(content); if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index db69b62e58..0bf846b7fc 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -161,9 +161,7 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) { SMnodeOpt option = {0}; if (!deployed) { dInfo("mnode start to deploy"); - // if (pWrapper->procType == DND_PROC_CHILD) { - pWrapper->pDnode->data.dnodeId = 1; - // } + pWrapper->pDnode->data.dnodeId = 1; mmBuildOptionForDeploy(pMgmt, &option); } else { dInfo("mnode start to open"); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 1f27b314e2..aac5bbc16a 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -17,42 +17,48 @@ #include "mmInt.h" static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, - .refId = pMsg->rpcMsg.refId, - .code = code, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen}; + SRpcMsg rsp = { + .handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .refId = pMsg->rpcMsg.refId, + .code = code, + .pCont = pMsg->pRsp, + .contLen = pMsg->rspLen, + }; tmsgSendRsp(&rsp); } static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; + int32_t code = -1; + tmsg_t msgType = pMsg->rpcMsg.msgType; dTrace("msg:%p, get from mnode queue", pMsg); - SRpcMsg *pRpc = &pMsg->rpcMsg; - int32_t code = -1; - if (pMsg->rpcMsg.msgType == TDMT_DND_ALTER_MNODE) { - code = mmProcessAlterReq(pMgmt, pMsg); - } else if (pMsg->rpcMsg.msgType == TDMT_MON_MM_INFO) { - code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg); - } else if (pMsg->rpcMsg.msgType == TDMT_MON_MM_LOAD) { - code = mmProcessGetMnodeLoadsReq(pMgmt->pWrapper, pMsg); - } else { - pMsg->pNode = pMgmt->pMnode; - code = mndProcessMsg(pMsg); + switch (msgType) { + case TDMT_DND_ALTER_MNODE: + code = mmProcessAlterReq(pMgmt, pMsg); + break; + case TDMT_MON_MM_INFO: + code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg); + break; + case TDMT_MON_MM_LOAD: + code = mmProcessGetMnodeLoadsReq(pMgmt->pWrapper, pMsg); + break; + default: + pMsg->pNode = pMgmt->pMnode; + code = mndProcessMsg(pMsg); } - if (pRpc->msgType & 1U) { - if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (msgType & 1U) { + if (pMsg->rpcMsg.handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && terrno != 0) code = terrno; mmSendRsp(pMsg, code); } } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pRpc->pCont); + rpcFreeCont(pMsg->rpcMsg.pCont); taosFreeQitem(pMsg); } @@ -78,38 +84,38 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static void mmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { +static void mmPutNodeMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); } int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutMsgToWorker(&pMgmt->writeWorker, pMsg); + mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); return 0; } int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutMsgToWorker(&pMgmt->syncWorker, pMsg); + mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); return 0; } int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutMsgToWorker(&pMgmt->readWorker, pMsg); + mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); return 0; } int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutMsgToWorker(&pMgmt->queryWorker, pMsg); + mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); return 0; } int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutMsgToWorker(&pMgmt->monitorWorker, pMsg); + mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg); return 0; } @@ -144,40 +150,62 @@ int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { - SSingleWorkerCfg qCfg = {.min = tsNumOfMnodeQueryThreads, - .max = tsNumOfMnodeQueryThreads, - .name = "mnode-query", - .fp = (FItem)mmProcessQueryQueue, - .param = pMgmt}; + SSingleWorkerCfg qCfg = { + .min = tsNumOfMnodeQueryThreads, + .max = tsNumOfMnodeQueryThreads, + .name = "mnode-query", + .fp = (FItem)mmProcessQueryQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { dError("failed to start mnode-query worker since %s", terrstr()); return -1; } - SSingleWorkerCfg rCfg = {.min = tsNumOfMnodeReadThreads, - .max = tsNumOfMnodeReadThreads, - .name = "mnode-read", - .fp = (FItem)mmProcessQueue, - .param = pMgmt}; + SSingleWorkerCfg rCfg = { + .min = tsNumOfMnodeReadThreads, + .max = tsNumOfMnodeReadThreads, + .name = "mnode-read", + .fp = (FItem)mmProcessQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) { dError("failed to start mnode-read worker since %s", terrstr()); return -1; } - SSingleWorkerCfg wCfg = {.min = 1, .max = 1, .name = "mnode-write", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + SSingleWorkerCfg wCfg = { + .min = 1, + .max = 1, + .name = "mnode-write", + .fp = (FItem)mmProcessQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) { dError("failed to start mnode-write worker since %s", terrstr()); return -1; } - SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + SSingleWorkerCfg sCfg = { + .min = 1, + .max = 1, + .name = "mnode-sync", + .fp = (FItem)mmProcessQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { dError("failed to start mnode mnode-sync worker since %s", terrstr()); return -1; } if (tsMultiProcess) { - SSingleWorkerCfg mCfg = {.min = 1, .max = 1, .name = "mnode-monitor", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + SSingleWorkerCfg mCfg = { + .min = 1, + .max = 1, + .name = "mnode-monitor", + .fp = (FItem)mmProcessQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start mnode mnode-monitor worker since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index da85ee64a8..965d35cb3e 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -17,12 +17,14 @@ #include "qmInt.h" static inline void qmSendRsp(SNodeMsg *pMsg, int32_t code) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, - .refId = pMsg->rpcMsg.refId, - .code = code, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen}; + SRpcMsg rsp = { + .handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .refId = pMsg->rpcMsg.refId, + .code = code, + .pCont = pMsg->pRsp, + .contLen = pMsg->rspLen, + }; tmsgSendRsp(&rsp); } @@ -145,22 +147,26 @@ int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { } int32_t qmStartWorker(SQnodeMgmt *pMgmt) { - SSingleWorkerCfg queryCfg = {.min = tsNumOfVnodeQueryThreads, - .max = tsNumOfVnodeQueryThreads, - .name = "qnode-query", - .fp = (FItem)qmProcessQueryQueue, - .param = pMgmt}; + SSingleWorkerCfg queryCfg = { + .min = tsNumOfVnodeQueryThreads, + .max = tsNumOfVnodeQueryThreads, + .name = "qnode-query", + .fp = (FItem)qmProcessQueryQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) { dError("failed to start qnode-query worker since %s", terrstr()); return -1; } - SSingleWorkerCfg fetchCfg = {.min = tsNumOfQnodeFetchThreads, - .max = tsNumOfQnodeFetchThreads, - .name = "qnode-fetch", - .fp = (FItem)qmProcessFetchQueue, - .param = pMgmt}; + SSingleWorkerCfg fetchCfg = { + .min = tsNumOfQnodeFetchThreads, + .max = tsNumOfQnodeFetchThreads, + .name = "qnode-fetch", + .fp = (FItem)qmProcessFetchQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) { dError("failed to start qnode-fetch worker since %s", terrstr()); @@ -169,7 +175,12 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { if (tsMultiProcess) { SSingleWorkerCfg mCfg = { - .min = 1, .max = 1, .name = "qnode-monitor", .fp = (FItem)qmProcessMonitorQueue, .param = pMgmt}; + .min = 1, + .max = 1, + .name = "qnode-monitor", + .fp = (FItem)qmProcessMonitorQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start qnode-monitor worker since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 25872aec55..2ae439bbd6 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -17,12 +17,14 @@ #include "smInt.h" static inline void smSendRsp(SNodeMsg *pMsg, int32_t code) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, - .refId = pMsg->rpcMsg.refId, - .code = code, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen}; + SRpcMsg rsp = { + .handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .refId = pMsg->rpcMsg.refId, + .code = code, + .pCont = pMsg->pRsp, + .contLen = pMsg->rspLen, + }; tmsgSendRsp(&rsp); } @@ -90,7 +92,12 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { return -1; } - SMultiWorkerCfg cfg = {.max = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; + SMultiWorkerCfg cfg = { + .max = 1, + .name = "snode-unique", + .fp = smProcessUniqueQueue, + .param = pMgmt, + }; if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) { dError("failed to start snode-unique worker since %s", terrstr()); return -1; @@ -101,11 +108,13 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { } } - SSingleWorkerCfg cfg = {.min = tsNumOfSnodeSharedThreads, - .max = tsNumOfSnodeSharedThreads, - .name = "snode-shared", - .fp = (FItem)smProcessSharedQueue, - .param = pMgmt}; + SSingleWorkerCfg cfg = { + .min = tsNumOfSnodeSharedThreads, + .max = tsNumOfSnodeSharedThreads, + .name = "snode-shared", + .fp = (FItem)smProcessSharedQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) { dError("failed to start snode shared-worker since %s", terrstr()); @@ -114,7 +123,12 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { if (tsMultiProcess) { SSingleWorkerCfg mCfg = { - .min = 1, .max = 1, .name = "snode-monitor", .fp = (FItem)smProcessMonitorQueue, .param = pMgmt}; + .min = 1, + .max = 1, + .name = "snode-monitor", + .fp = (FItem)smProcessMonitorQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start snode-monitor worker since %s", terrstr()); return -1; @@ -150,7 +164,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { } int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SSnodeMgmt * pMgmt = pWrapper->pMgmt; + SSnodeMgmt *pMgmt = pWrapper->pMgmt; SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; @@ -163,7 +177,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SSnodeMgmt * pMgmt = pWrapper->pMgmt; + SSnodeMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->monitorWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); @@ -172,7 +186,7 @@ int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SSnodeMgmt * pMgmt = pWrapper->pMgmt; + SSnodeMgmt *pMgmt = pWrapper->pMgmt; int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); if (pWorker == NULL) { @@ -186,7 +200,7 @@ int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SSnodeMgmt * pMgmt = pWrapper->pMgmt; + SSnodeMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->sharedWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); -- GitLab