From d58c42b481a013e881b2700d9e30ef857b27ccb8 Mon Sep 17 00:00:00 2001 From: jtao1735 Date: Mon, 4 May 2020 13:10:50 +0000 Subject: [PATCH] working version --- src/client/src/tscSql.c | 4 +- src/client/src/tscSystem.c | 2 +- src/common/inc/tglobal.h | 4 +- src/common/src/tglobal.c | 11 +- src/cq/src/cqMain.c | 8 +- .../inc/{dnodeMClient.h => dnodeDnode.h} | 11 +- src/dnode/inc/dnodeMnode.h | 30 -- src/dnode/src/dnodeClient.c | 77 +++ src/dnode/src/dnodeMClient.c | 468 ------------------ src/dnode/src/dnodeMain.c | 11 +- src/dnode/src/dnodeMgmt.c | 399 ++++++++++++++- src/dnode/src/{dnodeMnode.c => dnodeServer.c} | 44 +- src/dnode/src/dnodeShell.c | 2 +- src/inc/dnode.h | 6 + src/inc/mnode.h | 3 +- src/inc/taosdef.h | 5 +- src/kit/shell/src/shellEngine.c | 2 +- src/kit/shell/src/shellImport.c | 2 +- src/kit/shell/src/shellLinux.c | 2 +- src/mnode/inc/{mgmtDServer.h => mgmtServer.h} | 4 +- src/mnode/src/mgmtDClient.c | 87 ---- src/mnode/src/mgmtDnode.c | 6 +- src/mnode/src/mgmtMain.c | 12 +- src/mnode/src/{mgmtDServer.c => mgmtServer.c} | 70 +-- src/mnode/src/mgmtTable.c | 19 +- src/mnode/src/mgmtVgroup.c | 15 +- 26 files changed, 576 insertions(+), 728 deletions(-) rename src/dnode/inc/{dnodeMClient.h => dnodeDnode.h} (80%) delete mode 100644 src/dnode/inc/dnodeMnode.h create mode 100644 src/dnode/src/dnodeClient.c delete mode 100644 src/dnode/src/dnodeMClient.c rename src/dnode/src/{dnodeMnode.c => dnodeServer.c} (73%) rename src/mnode/inc/{mgmtDServer.h => mgmtServer.h} (93%) delete mode 100644 src/mnode/src/mgmtDClient.c rename src/mnode/src/{mgmtDServer.c => mgmtServer.c} (55%) diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 7d6768b144..d835a3c0aa 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -77,7 +77,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con tscMgmtIpSet.inUse = 0; tscMgmtIpSet.numOfIps = 1; strcpy(tscMgmtIpSet.fqdn[0], ip); - tscMgmtIpSet.port[0] = port? port: tsMnodeShellPort; + tscMgmtIpSet.port[0] = port? port: tsDnodeShellPort; } else { if (tsFirst[0] != 0) { taosGetFqdnPortFromEp(tsFirst, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]); @@ -100,7 +100,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con strncpy(pObj->user, user, TSDB_USER_LEN); taosEncryptPass((uint8_t *)pass, strlen(pass), pObj->pass); - pObj->mgmtPort = port ? port : tsMnodeShellPort; + pObj->mgmtPort = port ? port : tsDnodeShellPort; if (db) { int32_t len = strlen(db); diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index aa3c836ba0..7ddeed4565 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -56,7 +56,7 @@ int32_t tscInitRpc(const char *user, const char *secret) { if (pDnodeConn == NULL) { memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; - rpcInit.label = "TSC-vnode"; + rpcInit.label = "TSC"; rpcInit.numOfThreads = tscNumOfThreads; rpcInit.cfp = tscProcessMsgFromServer; rpcInit.sessions = tsMaxVnodeConnections; diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index dd0dd230fd..ef12ece393 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -55,10 +55,8 @@ extern char tsFirst[]; extern char tsSecond[]; extern char tsLocalEp[]; extern uint16_t tsServerPort; -extern uint16_t tsMnodeDnodePort; -extern uint16_t tsMnodeShellPort; extern uint16_t tsDnodeShellPort; -extern uint16_t tsDnodeMnodePort; +extern uint16_t tsDnodeDnodePort; extern uint16_t tsSyncPort; extern int32_t tsStatusInterval; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index b482ca64f4..0af0710c85 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -66,11 +66,9 @@ char tsSecond[TSDB_FQDN_LEN] = {0}; char tsArbitrator[TSDB_FQDN_LEN] = {0}; char tsLocalEp[TSDB_FQDN_LEN] = {0}; // Local End Point, hostname:port uint16_t tsServerPort = 6030; -uint16_t tsMnodeShellPort = 6030; // udp[6030-6034] tcp[6030] -uint16_t tsDnodeShellPort = 6035; // udp[6035-6039] tcp[6035] -uint16_t tsMnodeDnodePort = 6040; // udp/tcp -uint16_t tsDnodeMnodePort = 6045; // udp/tcp -uint16_t tsSyncPort = 6050; +uint16_t tsDnodeShellPort = 6030; // udp[6035-6039] tcp[6035] +uint16_t tsDnodeDnodePort = 6035; // udp/tcp +uint16_t tsSyncPort = 6040; int32_t tsStatusInterval = 1; // second int32_t tsShellActivityTimer = 3; // second @@ -1245,8 +1243,7 @@ bool taosCheckGlobalCfg() { tsVersion = 10 * tsVersion; tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035] - tsMnodeDnodePort = tsServerPort + TSDB_PORT_MNODEDNODE; // udp/tcp - tsDnodeMnodePort = tsServerPort + TSDB_PORT_DNODEMNODE; // udp/tcp + tsDnodeDnodePort = tsServerPort + TSDB_PORT_DNODEDNODE; // udp/tcp tsSyncPort = tsServerPort + TSDB_PORT_SYNC; return true; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 62b9a41494..e4f3142b89 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -26,10 +26,10 @@ #include "tcq.h" #include "taos.h" -#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__);} -#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__);} -#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__);} -#define cPrint(...) {taosPrintLog("WAL ", 255, __VA_ARGS__);} +#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__);} +#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__);} +#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__);} +#define cPrint(...) {taosPrintLog("CQ ", 255, __VA_ARGS__);} typedef struct { int vgId; diff --git a/src/dnode/inc/dnodeMClient.h b/src/dnode/inc/dnodeDnode.h similarity index 80% rename from src/dnode/inc/dnodeMClient.h rename to src/dnode/inc/dnodeDnode.h index 6d413ada88..2ce8d80c0f 100644 --- a/src/dnode/inc/dnodeMClient.h +++ b/src/dnode/inc/dnodeDnode.h @@ -13,16 +13,17 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_DNODE_MCLIENT_H -#define TDENGINE_DNODE_MCLIENT_H +#ifndef TDENGINE_DNODE_DNODE_H +#define TDENGINE_DNODE_DNODE_H #ifdef __cplusplus extern "C" { #endif -int32_t dnodeInitMClient(); -void dnodeCleanupMClient(); -void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); +int32_t dnodeInitServer(); +void dnodeCleanupServer(); +int32_t dnodeInitClient(); +void dnodeCleanupClient(); #ifdef __cplusplus } diff --git a/src/dnode/inc/dnodeMnode.h b/src/dnode/inc/dnodeMnode.h deleted file mode 100644 index 76a65a06c9..0000000000 --- a/src/dnode/inc/dnodeMnode.h +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_DNODE_MNODE_H -#define TDENGINE_DNODE_MNODE_H - -#ifdef __cplusplus -extern "C" { -#endif - -int32_t dnodeInitMnode(); -void dnodeCleanupMnode(); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/dnode/src/dnodeClient.c b/src/dnode/src/dnodeClient.c new file mode 100644 index 0000000000..aa3ec0595f --- /dev/null +++ b/src/dnode/src/dnodeClient.c @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taosmsg.h" +#include "trpc.h" +#include "tutil.h" +#include "tglobal.h" +#include "dnode.h" +#include "dnodeLog.h" +#include "dnodeMgmt.h" + +static void *tsDnodeClientRpc; +static void (*dnodeProcessDnodeRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg); +extern void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet); + +int32_t dnodeInitClient() { + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.label = "DND-C"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = dnodeProcessRspFromDnode; + rpcInit.ufp = dnodeUpdateIpSet; + rpcInit.sessions = 100; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.idleTime = tsShellActivityTimer * 2000; + rpcInit.user = "t"; + rpcInit.ckey = "key"; + rpcInit.secret = "secret"; + + tsDnodeClientRpc = rpcOpen(&rpcInit); + if (tsDnodeClientRpc == NULL) { + dError("failed to init mnode rpc client"); + return -1; + } + + dPrint("inter-dndoes rpc client is opened"); + return 0; +} + +void dnodeCleanupClient() { + if (tsDnodeClientRpc) { + rpcClose(tsDnodeClientRpc); + tsDnodeClientRpc = NULL; + dPrint("inter-dnodes rpc client is closed"); + } +} + +static void dnodeProcessRspFromDnode(SRpcMsg *pMsg) { + if (dnodeProcessDnodeRspFp[pMsg->msgType]) { + (*dnodeProcessDnodeRspFp[pMsg->msgType])(pMsg); + } else { + dError("%s is not processed", taosMsg[pMsg->msgType]); + } + rpcFreeCont(pMsg->pCont); +} + +void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { + dnodeProcessDnodeRspFp[msgType] = fp; +} + +void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) { + rpcSendRequest(tsDnodeClientRpc, ipSet, rpcMsg); +} diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c deleted file mode 100644 index 3aa863799b..0000000000 --- a/src/dnode/src/dnodeMClient.c +++ /dev/null @@ -1,468 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "cJSON.h" -#include "taosmsg.h" -#include "trpc.h" -#include "tutil.h" -#include "tsync.h" -#include "ttime.h" -#include "ttimer.h" -#include "tbalance.h" -#include "tglobal.h" -#include "vnode.h" -#include "mnode.h" -#include "dnode.h" -#include "dnodeLog.h" -#include "dnodeMClient.h" -#include "dnodeModule.h" -#include "dnodeMgmt.h" - -#define MPEER_CONTENT_LEN 2000 - -static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes); -static bool dnodeReadMnodeInfos(); -static void dnodeSaveMnodeInfos(); -static void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg); -static bool dnodeReadDnodeCfg(); -static void dnodeSaveDnodeCfg(); -static void dnodeProcessRspFromMnode(SRpcMsg *pMsg); -static void dnodeProcessStatusRsp(SRpcMsg *pMsg); -static void dnodeSendStatusMsg(void *handle, void *tmrId); -static void (*tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); - -static void *tsDnodeMClientRpc = NULL; -static void *tsDnodeTmr = NULL; -static void *tsStatusTimer = NULL; -static uint32_t tsRebootTime; - -static SRpcIpSet tsMnodeIpSet = {0}; -static SDMMnodeInfos tsMnodeInfos = {0}; -static SDMDnodeCfg tsDnodeCfg = {0}; - -void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { - dTrace("mgmt IP list is changed for ufp is called"); - tsMnodeIpSet = *pIpSet; -} - -void dnodeGetMnodeDnodeIpSet(void *ipSetRaw) { - SRpcIpSet *ipSet = ipSetRaw; - ipSet->numOfIps = tsMnodeInfos.nodeNum; - ipSet->inUse = tsMnodeInfos.inUse; - for (int32_t i = 0; i < tsMnodeInfos.nodeNum; ++i) { - taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, ipSet->fqdn[i], &ipSet->port[i]); - ipSet->port[i] += TSDB_PORT_MNODEDNODE; - } -} - -int32_t dnodeInitMClient() { - dnodeReadDnodeCfg(); - tsRebootTime = taosGetTimestampSec(); - - tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM"); - if (tsDnodeTmr == NULL) { - dError("failed to init dnode timer"); - return -1; - } - - if (!dnodeReadMnodeInfos()) { - memset(&tsMnodeIpSet, 0, sizeof(SRpcIpSet)); - memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos)); - tsMnodeIpSet.numOfIps = 1; - taosGetFqdnPortFromEp(tsFirst, tsMnodeIpSet.fqdn[0], &tsMnodeIpSet.port[0]); - tsMnodeIpSet.port[0] += TSDB_PORT_MNODEDNODE; - if (strcmp(tsSecond, tsFirst) != 0) { - tsMnodeIpSet.numOfIps = 2; - taosGetFqdnPortFromEp(tsSecond, tsMnodeIpSet.fqdn[1], &tsMnodeIpSet.port[1]); - tsMnodeIpSet.port[1] += TSDB_PORT_MNODEDNODE; - } - } else { - tsMnodeIpSet.inUse = tsMnodeInfos.inUse; - tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum; - for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]); - tsMnodeIpSet.port[i] += TSDB_PORT_MNODEDNODE; - } - } - - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.label = "DND-MC"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = dnodeProcessRspFromMnode; - rpcInit.ufp = dnodeUpdateIpSet; - rpcInit.sessions = 100; - rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.idleTime = tsShellActivityTimer * 2000; - rpcInit.user = "t"; - rpcInit.ckey = "key"; - rpcInit.secret = "secret"; - - tsDnodeMClientRpc = rpcOpen(&rpcInit); - if (tsDnodeMClientRpc == NULL) { - dError("failed to init mnode rpc client"); - return -1; - } - - tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; - taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer); - - dPrint("mnode rpc client is opened"); - return 0; -} - -void dnodeCleanupMClient() { - if (tsStatusTimer != NULL) { - taosTmrStopA(&tsStatusTimer); - tsStatusTimer = NULL; - } - - if (tsDnodeTmr != NULL) { - taosTmrCleanUp(tsDnodeTmr); - tsDnodeTmr = NULL; - } - - if (tsDnodeMClientRpc) { - rpcClose(tsDnodeMClientRpc); - tsDnodeMClientRpc = NULL; - dPrint("mnode rpc client is closed"); - } -} - -static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) { - if (tsDnodeProcessMgmtRspFp[pMsg->msgType]) { - (*tsDnodeProcessMgmtRspFp[pMsg->msgType])(pMsg); - } else { - dError("%s is not processed in dnode mclient", taosMsg[pMsg->msgType]); - SRpcMsg rpcRsp = {.pCont = 0, .contLen = 0, .code = TSDB_CODE_OPS_NOT_SUPPORT, .handle = pMsg->handle}; - rpcSendResponse(&rpcRsp); - } - - rpcFreeCont(pMsg->pCont); -} - -static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { - if (pMsg->code != TSDB_CODE_SUCCESS) { - dError("status rsp is received, error:%s", tstrerror(pMsg->code)); - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); - return; - } - - SDMStatusRsp *pStatusRsp = pMsg->pCont; - SDMMnodeInfos *pMnodes = &pStatusRsp->mnodes; - if (pMnodes->nodeNum <= 0) { - dError("status msg is invalid, num of ips is %d", pMnodes->nodeNum); - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); - return; - } - - SDMDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; - pCfg->numOfVnodes = htonl(pCfg->numOfVnodes); - pCfg->moduleStatus = htonl(pCfg->moduleStatus); - pCfg->dnodeId = htonl(pCfg->dnodeId); - - for (int32_t i = 0; i < pMnodes->nodeNum; ++i) { - SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i]; - pMnodeInfo->nodeId = htonl(pMnodeInfo->nodeId); - } - - SDMVgroupAccess *pVgAcccess = pStatusRsp->vgAccess; - for (int32_t i = 0; i < pCfg->numOfVnodes; ++i) { - pVgAcccess[i].vgId = htonl(pVgAcccess[i].vgId); - } - - dnodeProcessModuleStatus(pCfg->moduleStatus); - dnodeUpdateDnodeCfg(pCfg); - dnodeUpdateMnodeInfos(pMnodes); - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); -} - -static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { - bool mnodesChanged = (memcmp(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos)) != 0); - bool mnodesNotInit = (tsMnodeInfos.nodeNum == 0); - if (!(mnodesChanged || mnodesNotInit)) return; - - memcpy(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos)); - - tsMnodeIpSet.inUse = tsMnodeInfos.inUse; - tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum; - for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) { - taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]); - tsMnodeIpSet.port[i] += TSDB_PORT_MNODEDNODE; - } - - dPrint("mnodes is changed, nodeNum:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse); - for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) { - dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp); - } - - dnodeSaveMnodeInfos(); - sdbUpdateSync(); -} - -void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { - if (tsDnodeMClientRpc) { - rpcSendRequest(tsDnodeMClientRpc, &tsMnodeIpSet, rpcMsg); - } -} - -static bool dnodeReadMnodeInfos() { - char ipFile[TSDB_FILENAME_LEN] = {0}; - sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir); - FILE *fp = fopen(ipFile, "r"); - if (!fp) { - dTrace("failed to read mnode mgmtIpList.json, file not exist"); - return false; - } - - bool ret = false; - int maxLen = 2000; - char *content = calloc(1, maxLen + 1); - int len = fread(content, 1, maxLen, fp); - if (len <= 0) { - free(content); - fclose(fp); - dError("failed to read mnode mgmtIpList.json, content is null"); - return false; - } - - cJSON* root = cJSON_Parse(content); - if (root == NULL) { - dError("failed to read mnode mgmtIpList.json, invalid json format"); - goto PARSE_OVER; - } - - cJSON* inUse = cJSON_GetObjectItem(root, "inUse"); - if (!inUse || inUse->type != cJSON_Number) { - dError("failed to read mnode mgmtIpList.json, inUse not found"); - goto PARSE_OVER; - } - tsMnodeInfos.inUse = inUse->valueint; - - cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum"); - if (!nodeNum || nodeNum->type != cJSON_Number) { - dError("failed to read mnode mgmtIpList.json, nodeNum not found"); - goto PARSE_OVER; - } - tsMnodeInfos.nodeNum = nodeNum->valueint; - - cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); - if (!nodeInfos || nodeInfos->type != cJSON_Array) { - dError("failed to read mnode mgmtIpList.json, nodeInfos not found"); - goto PARSE_OVER; - } - - int size = cJSON_GetArraySize(nodeInfos); - if (size != tsMnodeInfos.nodeNum) { - dError("failed to read mnode mgmtIpList.json, nodeInfos size not matched"); - goto PARSE_OVER; - } - - for (int i = 0; i < size; ++i) { - cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i); - if (nodeInfo == NULL) continue; - - cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); - if (!nodeId || nodeId->type != cJSON_Number) { - dError("failed to read mnode mgmtIpList.json, nodeId not found"); - goto PARSE_OVER; - } - tsMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint; - - cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); - if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { - dError("failed to read mnode mgmtIpList.json, nodeName not found"); - goto PARSE_OVER; - } - strncpy(tsMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_FQDN_LEN); - } - - ret = true; - - dPrint("read mnode iplist successed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse); - for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) { - dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp); - } - -PARSE_OVER: - free(content); - cJSON_Delete(root); - fclose(fp); - return ret; -} - -static void dnodeSaveMnodeInfos() { - char ipFile[TSDB_FILENAME_LEN] = {0}; - sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir); - FILE *fp = fopen(ipFile, "w"); - if (!fp) return; - - int32_t len = 0; - int32_t maxLen = 2000; - char * content = calloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsMnodeInfos.inUse); - len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsMnodeInfos.nodeNum); - len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); - for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) { - len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMnodeInfos.nodeInfos[i].nodeId); - len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsMnodeInfos.nodeInfos[i].nodeEp); - if (i < tsMnodeInfos.nodeNum -1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }]\n"); - } - } - len += snprintf(content + len, maxLen - len, "}\n"); - - fwrite(content, 1, len, fp); - fclose(fp); - free(content); - - dPrint("save mnode iplist successed"); -} - -char *dnodeGetMnodeMasterEp() { - return tsMnodeInfos.nodeInfos[tsMnodeIpSet.inUse].nodeEp; -} - -void* dnodeGetMnodeInfos() { - return &tsMnodeInfos; -} - -static void dnodeSendStatusMsg(void *handle, void *tmrId) { - if (tsDnodeTmr == NULL) { - dError("dnode timer is already released"); - return; - } - - if (tsStatusTimer == NULL) { - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); - dError("failed to start status timer"); - return; - } - - int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); - SDMStatusMsg *pStatus = rpcMallocCont(contLen); - if (pStatus == NULL) { - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); - dError("failed to malloc status message"); - return; - } - - //strcpy(pStatus->dnodeName, tsDnodeName); - pStatus->version = htonl(tsVersion); - pStatus->dnodeId = htonl(tsDnodeCfg.dnodeId); - strcpy(pStatus->dnodeEp, tsLocalEp); - pStatus->lastReboot = htonl(tsRebootTime); - pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes); - pStatus->numOfCores = htons((uint16_t) tsNumOfCores); - pStatus->diskAvailable = tsAvailDataDirGB; - pStatus->alternativeRole = (uint8_t) tsAlternativeRole; - - vnodeBuildStatusMsg(pStatus); - contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); - pStatus->openVnodes = htons(pStatus->openVnodes); - - SRpcMsg rpcMsg = { - .pCont = pStatus, - .contLen = contLen, - .msgType = TSDB_MSG_TYPE_DM_STATUS - }; - - dnodeSendMsgToMnode(&rpcMsg); -} - -static bool dnodeReadDnodeCfg() { - char dnodeCfgFile[TSDB_FILENAME_LEN] = {0}; - sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir); - - FILE *fp = fopen(dnodeCfgFile, "r"); - if (!fp) { - dTrace("failed to read dnodeCfg.json, file not exist"); - return false; - } - - bool ret = false; - int maxLen = 100; - char *content = calloc(1, maxLen + 1); - int len = fread(content, 1, maxLen, fp); - if (len <= 0) { - free(content); - fclose(fp); - dError("failed to read dnodeCfg.json, content is null"); - return false; - } - - cJSON* root = cJSON_Parse(content); - if (root == NULL) { - dError("failed to read dnodeCfg.json, invalid json format"); - goto PARSE_CFG_OVER; - } - - cJSON* dnodeId = cJSON_GetObjectItem(root, "dnodeId"); - if (!dnodeId || dnodeId->type != cJSON_Number) { - dError("failed to read dnodeCfg.json, dnodeId not found"); - goto PARSE_CFG_OVER; - } - tsDnodeCfg.dnodeId = dnodeId->valueint; - - ret = true; - - dPrint("read numOfVnodes successed, dnodeId:%d", tsDnodeCfg.dnodeId); - -PARSE_CFG_OVER: - free(content); - cJSON_Delete(root); - fclose(fp); - return ret; -} - -static void dnodeSaveDnodeCfg() { - char dnodeCfgFile[TSDB_FILENAME_LEN] = {0}; - sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir); - - FILE *fp = fopen(dnodeCfgFile, "w"); - if (!fp) return; - - int32_t len = 0; - int32_t maxLen = 100; - char * content = calloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d\n", tsDnodeCfg.dnodeId); - len += snprintf(content + len, maxLen - len, "}\n"); - - fwrite(content, 1, len, fp); - fclose(fp); - free(content); - - dPrint("save dnodeId successed"); -} - -void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg) { - if (tsDnodeCfg.dnodeId == 0) { - dPrint("dnodeId is set to %d", pCfg->dnodeId); - tsDnodeCfg.dnodeId = pCfg->dnodeId; - dnodeSaveDnodeCfg(); - } -} - -int32_t dnodeGetDnodeId() { - return tsDnodeCfg.dnodeId; -} diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 940b884927..f6bd026703 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -23,9 +23,8 @@ #include "tglobal.h" #include "dnode.h" #include "dnodeLog.h" -#include "dnodeMClient.h" #include "dnodeMgmt.h" -#include "dnodeMnode.h" +#include "dnodeDnode.h" #include "dnodeModule.h" #include "dnodeRead.h" #include "dnodeShell.h" @@ -167,9 +166,9 @@ static int32_t dnodeInitSystem() { if (dnodeInitStorage() != 0) return -1; if (dnodeInitRead() != 0) return -1; if (dnodeInitWrite() != 0) return -1; - if (dnodeInitMClient() != 0) return -1; + if (dnodeInitClient() != 0) return -1; if (dnodeInitModules() != 0) return -1; - if (dnodeInitMnode() != 0) return -1; + if (dnodeInitServer() != 0) return -1; if (dnodeInitMgmt() != 0) return -1; if (dnodeInitShell() != 0) return -1; @@ -185,9 +184,9 @@ static void dnodeCleanUpSystem() { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); dnodeCleanupShell(); - dnodeCleanupMnode(); + dnodeCleanupServer(); dnodeCleanupMgmt(); - dnodeCleanupMClient(); + dnodeCleanupClient(); dnodeCleanupWrite(); dnodeCleanupRead(); dnodeCleanUpModules(); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index fbf1ceea71..db25cfd23b 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -15,19 +15,47 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "cJSON.h" #include "ihash.h" #include "taoserror.h" #include "taosmsg.h" +#include "ttime.h" +#include "ttimer.h" #include "trpc.h" #include "tsdb.h" #include "twal.h" -#include "vnode.h" +#include "tsync.h" +#include "ttime.h" +#include "ttimer.h" +#include "tbalance.h" #include "tglobal.h" +#include "dnode.h" +#include "vnode.h" +#include "mnode.h" #include "dnodeLog.h" -#include "dnodeMClient.h" #include "dnodeMgmt.h" #include "dnodeRead.h" #include "dnodeWrite.h" +#include "dnodeModule.h" + +#define MPEER_CONTENT_LEN 2000 + +static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes); +static bool dnodeReadMnodeInfos(); +static void dnodeSaveMnodeInfos(); +static void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg); +static bool dnodeReadDnodeCfg(); +static void dnodeSaveDnodeCfg(); +static void dnodeProcessStatusRsp(SRpcMsg *pMsg); +static void dnodeSendStatusMsg(void *handle, void *tmrId); + +static void *tsDnodeTmr = NULL; +static void *tsStatusTimer = NULL; +static uint32_t tsRebootTime; + +static SRpcIpSet tsMnodeIpSet = {0}; +static SDMMnodeInfos tsMnodeInfos = {0}; +static SDMDnodeCfg tsDnodeCfg = {0}; static int32_t dnodeOpenVnodes(); static void dnodeCloseVnodes(); @@ -43,15 +71,59 @@ int32_t dnodeInitMgmt() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg; + dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp); + dnodeReadDnodeCfg(); + tsRebootTime = taosGetTimestampSec(); + + tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM"); + if (tsDnodeTmr == NULL) { + dError("failed to init dnode timer"); + return -1; + } + + if (!dnodeReadMnodeInfos()) { + memset(&tsMnodeIpSet, 0, sizeof(SRpcIpSet)); + memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos)); + tsMnodeIpSet.numOfIps = 1; + taosGetFqdnPortFromEp(tsFirst, tsMnodeIpSet.fqdn[0], &tsMnodeIpSet.port[0]); + tsMnodeIpSet.port[0] += TSDB_PORT_DNODEDNODE; + if (strcmp(tsSecond, tsFirst) != 0) { + tsMnodeIpSet.numOfIps = 2; + taosGetFqdnPortFromEp(tsSecond, tsMnodeIpSet.fqdn[1], &tsMnodeIpSet.port[1]); + tsMnodeIpSet.port[1] += TSDB_PORT_DNODEDNODE; + } + } else { + tsMnodeIpSet.inUse = tsMnodeInfos.inUse; + tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum; + for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) { + taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]); + tsMnodeIpSet.port[i] += TSDB_PORT_DNODEDNODE; + } + } + int32_t code = dnodeOpenVnodes(); if (code != TSDB_CODE_SUCCESS) { return -1; } + taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer); + + dPrint("dnode mgmt is initialized"); + return TSDB_CODE_SUCCESS; } void dnodeCleanupMgmt() { + if (tsStatusTimer != NULL) { + taosTmrStopA(&tsStatusTimer); + tsStatusTimer = NULL; + } + + if (tsDnodeTmr != NULL) { + taosTmrCleanUp(tsDnodeTmr); + tsDnodeTmr = NULL; + } + dnodeCloseVnodes(); } @@ -193,3 +265,326 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) { SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont; return taosCfgDynamicOptions(pCfg->config); } + + +void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) { + dTrace("mgmt IP list is changed for ufp is called"); + tsMnodeIpSet = *pIpSet; +} + +void dnodeGetMnodeDnodeIpSet(void *ipSetRaw) { + SRpcIpSet *ipSet = ipSetRaw; + ipSet->numOfIps = tsMnodeInfos.nodeNum; + ipSet->inUse = tsMnodeInfos.inUse; + for (int32_t i = 0; i < tsMnodeInfos.nodeNum; ++i) { + taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, ipSet->fqdn[i], &ipSet->port[i]); + ipSet->port[i] += TSDB_PORT_DNODEDNODE; + } +} + +static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { + if (pMsg->code != TSDB_CODE_SUCCESS) { + dError("status rsp is received, error:%s", tstrerror(pMsg->code)); + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); + return; + } + + SDMStatusRsp *pStatusRsp = pMsg->pCont; + SDMMnodeInfos *pMnodes = &pStatusRsp->mnodes; + if (pMnodes->nodeNum <= 0) { + dError("status msg is invalid, num of ips is %d", pMnodes->nodeNum); + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); + return; + } + + SDMDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; + pCfg->numOfVnodes = htonl(pCfg->numOfVnodes); + pCfg->moduleStatus = htonl(pCfg->moduleStatus); + pCfg->dnodeId = htonl(pCfg->dnodeId); + + for (int32_t i = 0; i < pMnodes->nodeNum; ++i) { + SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i]; + pMnodeInfo->nodeId = htonl(pMnodeInfo->nodeId); + } + + SDMVgroupAccess *pVgAcccess = pStatusRsp->vgAccess; + for (int32_t i = 0; i < pCfg->numOfVnodes; ++i) { + pVgAcccess[i].vgId = htonl(pVgAcccess[i].vgId); + } + + dnodeProcessModuleStatus(pCfg->moduleStatus); + dnodeUpdateDnodeCfg(pCfg); + dnodeUpdateMnodeInfos(pMnodes); + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); +} + +static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) { + bool mnodesChanged = (memcmp(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos)) != 0); + bool mnodesNotInit = (tsMnodeInfos.nodeNum == 0); + if (!(mnodesChanged || mnodesNotInit)) return; + + memcpy(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos)); + + tsMnodeIpSet.inUse = tsMnodeInfos.inUse; + tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum; + for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) { + taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]); + tsMnodeIpSet.port[i] += TSDB_PORT_DNODEDNODE; + } + + dPrint("mnodes is changed, nodeNum:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse); + for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) { + dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp); + } + + dnodeSaveMnodeInfos(); + sdbUpdateSync(); +} + +static bool dnodeReadMnodeInfos() { + char ipFile[TSDB_FILENAME_LEN] = {0}; + sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir); + FILE *fp = fopen(ipFile, "r"); + if (!fp) { + dTrace("failed to read mnode mgmtIpList.json, file not exist"); + return false; + } + + bool ret = false; + int maxLen = 2000; + char *content = calloc(1, maxLen + 1); + int len = fread(content, 1, maxLen, fp); + if (len <= 0) { + free(content); + fclose(fp); + dError("failed to read mnode mgmtIpList.json, content is null"); + return false; + } + + cJSON* root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read mnode mgmtIpList.json, invalid json format"); + goto PARSE_OVER; + } + + cJSON* inUse = cJSON_GetObjectItem(root, "inUse"); + if (!inUse || inUse->type != cJSON_Number) { + dError("failed to read mnode mgmtIpList.json, inUse not found"); + goto PARSE_OVER; + } + tsMnodeInfos.inUse = inUse->valueint; + + cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum"); + if (!nodeNum || nodeNum->type != cJSON_Number) { + dError("failed to read mnode mgmtIpList.json, nodeNum not found"); + goto PARSE_OVER; + } + tsMnodeInfos.nodeNum = nodeNum->valueint; + + cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); + if (!nodeInfos || nodeInfos->type != cJSON_Array) { + dError("failed to read mnode mgmtIpList.json, nodeInfos not found"); + goto PARSE_OVER; + } + + int size = cJSON_GetArraySize(nodeInfos); + if (size != tsMnodeInfos.nodeNum) { + dError("failed to read mnode mgmtIpList.json, nodeInfos size not matched"); + goto PARSE_OVER; + } + + for (int i = 0; i < size; ++i) { + cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i); + if (nodeInfo == NULL) continue; + + cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); + if (!nodeId || nodeId->type != cJSON_Number) { + dError("failed to read mnode mgmtIpList.json, nodeId not found"); + goto PARSE_OVER; + } + tsMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint; + + cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); + if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { + dError("failed to read mnode mgmtIpList.json, nodeName not found"); + goto PARSE_OVER; + } + strncpy(tsMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_FQDN_LEN); + } + + ret = true; + + dPrint("read mnode iplist successed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse); + for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) { + dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp); + } + +PARSE_OVER: + free(content); + cJSON_Delete(root); + fclose(fp); + return ret; +} + +static void dnodeSaveMnodeInfos() { + char ipFile[TSDB_FILENAME_LEN] = {0}; + sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir); + FILE *fp = fopen(ipFile, "w"); + if (!fp) return; + + int32_t len = 0; + int32_t maxLen = 2000; + char * content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsMnodeInfos.inUse); + len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsMnodeInfos.nodeNum); + len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); + for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) { + len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMnodeInfos.nodeInfos[i].nodeId); + len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsMnodeInfos.nodeInfos[i].nodeEp); + if (i < tsMnodeInfos.nodeNum -1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + fclose(fp); + free(content); + + dPrint("save mnode iplist successed"); +} + +char *dnodeGetMnodeMasterEp() { + return tsMnodeInfos.nodeInfos[tsMnodeIpSet.inUse].nodeEp; +} + +void* dnodeGetMnodeInfos() { + return &tsMnodeInfos; +} + +static void dnodeSendStatusMsg(void *handle, void *tmrId) { + if (tsDnodeTmr == NULL) { + dError("dnode timer is already released"); + return; + } + + if (tsStatusTimer == NULL) { + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); + dError("failed to start status timer"); + return; + } + + int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); + SDMStatusMsg *pStatus = rpcMallocCont(contLen); + if (pStatus == NULL) { + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); + dError("failed to malloc status message"); + return; + } + + //strcpy(pStatus->dnodeName, tsDnodeName); + pStatus->version = htonl(tsVersion); + pStatus->dnodeId = htonl(tsDnodeCfg.dnodeId); + strcpy(pStatus->dnodeEp, tsLocalEp); + pStatus->lastReboot = htonl(tsRebootTime); + pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes); + pStatus->numOfCores = htons((uint16_t) tsNumOfCores); + pStatus->diskAvailable = tsAvailDataDirGB; + pStatus->alternativeRole = (uint8_t) tsAlternativeRole; + + vnodeBuildStatusMsg(pStatus); + contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); + pStatus->openVnodes = htons(pStatus->openVnodes); + + SRpcMsg rpcMsg = { + .pCont = pStatus, + .contLen = contLen, + .msgType = TSDB_MSG_TYPE_DM_STATUS + }; + + dnodeSendMsgToDnode(&tsMnodeIpSet, &rpcMsg); +} + +static bool dnodeReadDnodeCfg() { + char dnodeCfgFile[TSDB_FILENAME_LEN] = {0}; + sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir); + + FILE *fp = fopen(dnodeCfgFile, "r"); + if (!fp) { + dTrace("failed to read dnodeCfg.json, file not exist"); + return false; + } + + bool ret = false; + int maxLen = 100; + char *content = calloc(1, maxLen + 1); + int len = fread(content, 1, maxLen, fp); + if (len <= 0) { + free(content); + fclose(fp); + dError("failed to read dnodeCfg.json, content is null"); + return false; + } + + cJSON* root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read dnodeCfg.json, invalid json format"); + goto PARSE_CFG_OVER; + } + + cJSON* dnodeId = cJSON_GetObjectItem(root, "dnodeId"); + if (!dnodeId || dnodeId->type != cJSON_Number) { + dError("failed to read dnodeCfg.json, dnodeId not found"); + goto PARSE_CFG_OVER; + } + tsDnodeCfg.dnodeId = dnodeId->valueint; + + ret = true; + + dPrint("read numOfVnodes successed, dnodeId:%d", tsDnodeCfg.dnodeId); + +PARSE_CFG_OVER: + free(content); + cJSON_Delete(root); + fclose(fp); + return ret; +} + +static void dnodeSaveDnodeCfg() { + char dnodeCfgFile[TSDB_FILENAME_LEN] = {0}; + sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir); + + FILE *fp = fopen(dnodeCfgFile, "w"); + if (!fp) return; + + int32_t len = 0; + int32_t maxLen = 100; + char * content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d\n", tsDnodeCfg.dnodeId); + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + fclose(fp); + free(content); + + dPrint("save dnodeId successed"); +} + +void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg) { + if (tsDnodeCfg.dnodeId == 0) { + dPrint("dnodeId is set to %d", pCfg->dnodeId); + tsDnodeCfg.dnodeId = pCfg->dnodeId; + dnodeSaveDnodeCfg(); + } +} + +int32_t dnodeGetDnodeId() { + return tsDnodeCfg.dnodeId; +} + diff --git a/src/dnode/src/dnodeMnode.c b/src/dnode/src/dnodeServer.c similarity index 73% rename from src/dnode/src/dnodeMnode.c rename to src/dnode/src/dnodeServer.c index 75c09d43ba..169cd6cffa 100644 --- a/src/dnode/src/dnodeMnode.c +++ b/src/dnode/src/dnodeServer.c @@ -23,10 +23,10 @@ #include "dnodeWrite.h" static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); -static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg); -static void *tsDnodeMnodeRpc = NULL; +static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg); +static void *tsDnodeServerRpc = NULL; -int32_t dnodeInitMnode() { +int32_t dnodeInitServer() { dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeWrite; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeWrite; dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeWrite; @@ -38,33 +38,35 @@ int32_t dnodeInitMnode() { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = tsDnodeMnodePort; - rpcInit.label = "DND-MS"; + rpcInit.localPort = tsDnodeDnodePort; + rpcInit.label = "DND-S"; rpcInit.numOfThreads = 1; - rpcInit.cfp = dnodeProcessMsgFromMnode; + rpcInit.cfp = dnodeProcessReqMsgFromDnode; rpcInit.sessions = 100; rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 2000; - tsDnodeMnodeRpc = rpcOpen(&rpcInit); - if (tsDnodeMnodeRpc == NULL) { - dError("failed to init mnode rpc server"); + tsDnodeServerRpc = rpcOpen(&rpcInit); + if (tsDnodeServerRpc == NULL) { + dError("failed to init inter-dnodes RPC server"); return -1; } - dPrint("mnode rpc server is opened"); + dPrint("inter-dnodes RPC server is opened"); return 0; } -void dnodeCleanupMnode() { - if (tsDnodeMnodeRpc) { - rpcClose(tsDnodeMnodeRpc); - tsDnodeMnodeRpc = NULL; - dPrint("mnode rpc server is closed"); +void dnodeCleanupServer() { + if (tsDnodeServerRpc) { + rpcClose(tsDnodeServerRpc); + tsDnodeServerRpc = NULL; + dPrint("inter-dnodes RPC server is closed"); } } -static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) { +void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg); + +static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) { SRpcMsg rspMsg; rspMsg.handle = pMsg->handle; rspMsg.pCont = NULL; @@ -74,7 +76,7 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) { rspMsg.code = TSDB_CODE_NOT_READY; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); - dTrace("thandle:%p, query msg is ignored since dnode not running", pMsg->handle); + dTrace("thandle:%p, query msg is ignored since dnode not running", pMsg->handle); return; } @@ -83,15 +85,11 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) { rpcSendResponse(&rspMsg); return; } - + if (dnodeProcessMgmtMsgFp[pMsg->msgType]) { (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg); } else { - dError("%s is not processed in dnode mserver", taosMsg[pMsg->msgType]); - rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED; - rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); + mgmtProcessReqMsgFromDnode(pMsg); } } - diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 581273d21c..e6a392a341 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -49,7 +49,7 @@ int32_t dnodeInitShell() { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = tsMnodeShellPort; + rpcInit.localPort = tsDnodeShellPort; rpcInit.label = "SHELL"; rpcInit.numOfThreads = numOfThreads; rpcInit.cfp = dnodeProcessMsgFromShell; diff --git a/src/inc/dnode.h b/src/inc/dnode.h index c4b893ab86..99b9046aac 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -20,6 +20,8 @@ extern "C" { #endif +#include "trpc.h" + typedef struct { int32_t queryReqNum; int32_t submitReqNum; @@ -47,6 +49,10 @@ void dnodeGetMnodeDnodeIpSet(void *ipSet); void * dnodeGetMnodeInfos(); int32_t dnodeGetDnodeId(); +void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); +void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); +void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg); + #ifdef __cplusplus } #endif diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 21955e29c1..37fec24c20 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -26,7 +26,8 @@ void mgmtCleanUpSystem(); void mgmtStopSystem(); void sdbUpdateSync(); -void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg); +void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg); +void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg); #ifdef __cplusplus } diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 95310ae44a..ec9debaf2b 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -330,9 +330,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_MAX_CHILD_TABLES 100000 #define TSDB_PORT_DNODESHELL 0 -#define TSDB_PORT_DNODEMNODE 10 -#define TSDB_PORT_MNODEDNODE 15 -#define TSDB_PORT_SYNC 20 +#define TSDB_PORT_DNODEDNODE 5 +#define TSDB_PORT_SYNC 10 #define TAOS_QTYPE_RPC 0 #define TAOS_QTYPE_FWD 1 diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index eeaeec83f2..1bc24c6c71 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -68,7 +68,7 @@ TAOS *shellInit(struct arguments *args) { tsMeterMetaKeepTimer = 3000; // Connect to the database. - TAOS *con = taos_connect(args->host, args->user, args->password, args->database, tsMnodeShellPort); + TAOS *con = taos_connect(args->host, args->user, args->password, args->database, tsDnodeShellPort); if (con == NULL) { return con; } diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c index e5c50bb74e..b29b96379b 100644 --- a/src/kit/shell/src/shellImport.c +++ b/src/kit/shell/src/shellImport.c @@ -229,7 +229,7 @@ static void shellRunImportThreads(struct arguments* args) ShellThreadObj *pThread = threadObj + t; pThread->threadIndex = t; pThread->totalThreads = args->threadNum; - pThread->taos = taos_connect(args->host, args->user, args->password, args->database, tsMnodeShellPort); + pThread->taos = taos_connect(args->host, args->user, args->password, args->database, tsDnodeShellPort); if (pThread->taos == NULL) { fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, taos_errstr(pThread->taos)); exit(0); diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index 22ffa78c81..f5a1145cf8 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -63,7 +63,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { break; case 'P': if (arg) { - tsMnodeShellPort = atoi(arg); + tsDnodeShellPort = atoi(arg); } else { fprintf(stderr, "Invalid port\n"); return -1; diff --git a/src/mnode/inc/mgmtDServer.h b/src/mnode/inc/mgmtServer.h similarity index 93% rename from src/mnode/inc/mgmtDServer.h rename to src/mnode/inc/mgmtServer.h index 937ae8f1ac..180e893cb0 100644 --- a/src/mnode/inc/mgmtDServer.h +++ b/src/mnode/inc/mgmtServer.h @@ -20,8 +20,8 @@ extern "C" { #endif -int32_t mgmtInitDServer(); -void mgmtCleanupDServer(); +int32_t mgmtInitServer(); +void mgmtCleanupServer(); void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); #ifdef __cplusplus diff --git a/src/mnode/src/mgmtDClient.c b/src/mnode/src/mgmtDClient.c deleted file mode 100644 index 229964e1d6..0000000000 --- a/src/mnode/src/mgmtDClient.c +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "taoserror.h" -#include "tsched.h" -#include "tsystem.h" -#include "tutil.h" -#include "tglobal.h" -#include "dnode.h" -#include "tgrant.h" -#include "mgmtDef.h" -#include "mgmtLog.h" -#include "mgmtMnode.h" -#include "mgmtDb.h" -#include "mgmtDnode.h" -#include "mgmtProfile.h" -#include "mgmtShell.h" -#include "mgmtTable.h" -#include "mgmtVgroup.h" - -static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg); -static void (*mgmtProcessDnodeRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); -static void *tsMgmtDClientRpc = NULL; - -int32_t mgmtInitDClient() { - SRpcInit rpcInit = {0}; - rpcInit.localPort = 0; - rpcInit.label = "MND-DC"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = mgmtProcessRspFromDnode; - rpcInit.sessions = 100; - rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.user = "mgmtDClient"; - rpcInit.ckey = "key"; - rpcInit.secret = "secret"; - - tsMgmtDClientRpc = rpcOpen(&rpcInit); - if (tsMgmtDClientRpc == NULL) { - mError("failed to init client connection to dnode"); - return -1; - } - - mPrint("client connection to dnode is opened"); - return 0; -} - -void mgmtCleanupDClient() { - if (tsMgmtDClientRpc) { - rpcClose(tsMgmtDClientRpc); - tsMgmtDClientRpc = NULL; - } -} - -void mgmtAddDClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { - mgmtProcessDnodeRspFp[msgType] = fp; -} - -void mgmtSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) { - rpcSendRequest(tsMgmtDClientRpc, ipSet, rpcMsg); -} - -static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) { - if (mgmtProcessDnodeRspFp[rpcMsg->msgType]) { - (*mgmtProcessDnodeRspFp[rpcMsg->msgType])(rpcMsg); - } else { - mError("%s is not processed in mgmt dclient", taosMsg[rpcMsg->msgType]); - SRpcMsg rpcRsp = {.pCont = 0, .contLen = 0, .code = TSDB_CODE_OPS_NOT_SUPPORT, .handle = rpcMsg->handle}; - rpcSendResponse(&rpcRsp); - } - - rpcFreeCont(rpcMsg->pCont); -} diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 90ee40dc93..33c012f079 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -27,8 +27,6 @@ #include "dnode.h" #include "mgmtDef.h" #include "mgmtLog.h" -#include "mgmtDClient.h" -#include "mgmtDServer.h" #include "mgmtDnode.h" #include "mgmtMnode.h" #include "mgmtSdb.h" @@ -152,7 +150,7 @@ int32_t mgmtInitDnodes() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DNODE, mgmtProcessCreateDnodeMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_DNODE, mgmtProcessDropDnodeMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp); + dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp); mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules); @@ -241,7 +239,7 @@ void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) { .pCont = pMdCfgDnode, .contLen = sizeof(SMDCfgDnodeMsg) }; - mgmtSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg); + dnodeSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg); rpcRsp.code = TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mgmtMain.c b/src/mnode/src/mgmtMain.c index 0f18c95539..aa95381df3 100644 --- a/src/mnode/src/mgmtMain.c +++ b/src/mnode/src/mgmtMain.c @@ -24,12 +24,11 @@ #include "dnode.h" #include "mgmtDef.h" #include "mgmtLog.h" +#include "mgmtServer.h" #include "mgmtAcct.h" #include "mgmtDnode.h" #include "mgmtMnode.h" #include "mgmtDb.h" -#include "mgmtDClient.h" -#include "mgmtDServer.h" #include "mgmtSdb.h" #include "mgmtVgroup.h" #include "mgmtUser.h" @@ -100,11 +99,7 @@ int32_t mgmtStartSystem() { mError("failed to init balance") } - if (mgmtInitDClient() < 0) { - return -1; - } - - if (mgmtInitDServer() < 0) { + if (mgmtInitServer() < 0) { return -1; } @@ -141,8 +136,7 @@ void mgmtCleanUpSystem() { mgmtCleanupMnodes(); balanceCleanUp(); mgmtCleanUpShell(); - mgmtCleanupDClient(); - mgmtCleanupDServer(); + mgmtCleanupServer(); mgmtCleanUpAccts(); mgmtCleanUpTables(); mgmtCleanUpVgroups(); diff --git a/src/mnode/src/mgmtDServer.c b/src/mnode/src/mgmtServer.c similarity index 55% rename from src/mnode/src/mgmtDServer.c rename to src/mnode/src/mgmtServer.c index 726554e490..c2b07a3f4e 100644 --- a/src/mnode/src/mgmtDServer.c +++ b/src/mnode/src/mgmtServer.c @@ -27,7 +27,6 @@ #include "mgmtDef.h" #include "mgmtLog.h" #include "mgmtDb.h" -#include "mgmtDServer.h" #include "mgmtMnode.h" #include "mgmtProfile.h" #include "mgmtShell.h" @@ -35,45 +34,21 @@ #include "mgmtTable.h" #include "mgmtVgroup.h" -static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg); -static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey); static void (*mgmtProcessDnodeMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg); -static void *tsMgmtDServerRpc; -static void *tsMgmtDServerQhandle = NULL; +static void *tsMgmtServerQhandle = NULL; -int32_t mgmtInitDServer() { - SRpcInit rpcInit = {0}; - rpcInit.localPort = tsMnodeDnodePort; - rpcInit.label = "MND-DS"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = mgmtProcessMsgFromDnode; - rpcInit.sessions = 100; - rpcInit.connType = TAOS_CONN_SERVER; - rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.afp = mgmtDServerRetrieveAuth; +int32_t mgmtInitServer() { - tsMgmtDServerRpc = rpcOpen(&rpcInit); - if (tsMgmtDServerRpc == NULL) { - mError("failed to init server connection to dnode"); - return -1; - } - - tsMgmtDServerQhandle = taosInitScheduler(tsMaxShellConns, 1, "MS"); + tsMgmtServerQhandle = taosInitScheduler(tsMaxShellConns, 1, "MS"); mPrint("server connection to dnode is opened"); return 0; } -void mgmtCleanupDServer() { - if (tsMgmtDServerQhandle) { - taosCleanUpScheduler(tsMgmtDServerQhandle); - tsMgmtDServerQhandle = NULL; - } - - if (tsMgmtDServerRpc) { - rpcClose(tsMgmtDServerRpc); - tsMgmtDServerRpc = NULL; - mPrint("server connection to dnode is closed"); +void mgmtCleanupServer() { + if (tsMgmtServerQhandle) { + taosCleanUpScheduler(tsMgmtServerQhandle); + tsMgmtServerQhandle = NULL; } } @@ -81,21 +56,27 @@ void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) { mgmtProcessDnodeMsgFp[msgType] = fp; } -static void mgmtProcessDServerRequest(SSchedMsg *sched) { +static void mgmtProcessRequestFromDnode(SSchedMsg *sched) { SRpcMsg *pMsg = sched->msg; (*mgmtProcessDnodeMsgFp[pMsg->msgType])(pMsg); rpcFreeCont(pMsg->pCont); free(pMsg); } -static void mgmtAddToDServerQueue(SRpcMsg *pMsg) { +static void mgmtAddToServerQueue(SRpcMsg *pMsg) { SSchedMsg schedMsg; schedMsg.msg = pMsg; - schedMsg.fp = mgmtProcessDServerRequest; - taosScheduleTask(tsMgmtDServerQhandle, &schedMsg); + schedMsg.fp = mgmtProcessRequestFromDnode; + taosScheduleTask(tsMgmtServerQhandle, &schedMsg); } -static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) { +void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg) { + if (mgmtProcessDnodeMsgFp[rpcMsg->msgType] == NULL) { + mError("%s is not processed in mnode", taosMsg[rpcMsg->msgType]); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_MSG_NOT_PROCESSED); + rpcFreeCont(rpcMsg->pCont); + } + if (rpcMsg->pCont == NULL) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN); return; @@ -116,17 +97,8 @@ static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) { return; } - if (mgmtProcessDnodeMsgFp[rpcMsg->msgType]) { - SRpcMsg *pMsg = malloc(sizeof(SRpcMsg)); - memcpy(pMsg, rpcMsg, sizeof(SRpcMsg)); - mgmtAddToDServerQueue(pMsg); - } else { - mError("%s is not processed in mgmt dserver", taosMsg[rpcMsg->msgType]); - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_MSG_NOT_PROCESSED); - rpcFreeCont(rpcMsg->pCont); - } + SRpcMsg *pMsg = malloc(sizeof(SRpcMsg)); + memcpy(pMsg, rpcMsg, sizeof(SRpcMsg)); + mgmtAddToServerQueue(pMsg); } -static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { - return TSDB_CODE_SUCCESS; -} diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 6ed19b3d11..c6d973fd81 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -24,13 +24,12 @@ #include "tname.h" #include "tidpool.h" #include "tglobal.h" +#include "dnode.h" #include "mgmtDef.h" #include "mgmtLog.h" #include "mgmtAcct.h" -#include "mgmtDClient.h" #include "mgmtDb.h" #include "mgmtDnode.h" -#include "mgmtDServer.h" #include "tgrant.h" #include "mgmtMnode.h" #include "mgmtProfile.h" @@ -538,10 +537,10 @@ int32_t mgmtInitTables() { mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLE_META, mgmtProcessTableMetaMsg); mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_STABLE_VGROUP, mgmtProcessSuperTableVgroupMsg); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateChildTableRsp); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropChildTableRsp); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropSuperTableRsp); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp); + dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateChildTableRsp); + dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropChildTableRsp); + dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropSuperTableRsp); + dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp); mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg); @@ -810,7 +809,7 @@ static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) { if (pVgroup != NULL) { SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE}; - mgmtSendMsgToDnode(&ipSet, &rpcMsg); + dnodeSendMsgToDnode(&ipSet, &rpcMsg); mgmtDecVgroupRef(pVgroup); } } @@ -1487,7 +1486,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE }; - mgmtSendMsgToDnode(&ipSet, &rpcMsg); + dnodeSendMsgToDnode(&ipSet, &rpcMsg); } static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { @@ -1525,7 +1524,7 @@ static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) { .msgType = TSDB_MSG_TYPE_MD_DROP_TABLE }; - mgmtSendMsgToDnode(&ipSet, &rpcMsg); + dnodeSendMsgToDnode(&ipSet, &rpcMsg); } static int32_t mgmtModifyChildTableTagValue(SChildTableObj *pTable, char *tagName, char *nContent) { @@ -1827,7 +1826,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) { .code = 0, .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE }; - mgmtSendMsgToDnode(&ipSet, &rpcRsp); + dnodeSendMsgToDnode(&ipSet, &rpcRsp); mgmtDecTableRef(pTable); mgmtDecDnodeRef(pDnode); diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index d8007d000d..4b455de41a 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -23,11 +23,10 @@ #include "ttime.h" #include "tbalance.h" #include "tglobal.h" +#include "dnode.h" #include "mgmtDef.h" #include "mgmtLog.h" #include "mgmtDb.h" -#include "mgmtDClient.h" -#include "mgmtDServer.h" #include "mgmtDnode.h" #include "mgmtMnode.h" #include "mgmtProfile.h" @@ -220,8 +219,8 @@ int32_t mgmtInitVgroups() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp); - mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp); + dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp); + dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp); mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg); mTrace("table:vgroups is created"); @@ -583,7 +582,7 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) { }; for (int i = 0; i < pVgroup->numOfVnodes; ++i) { strcpy(ipSet.fqdn[i], pVgroup->vnodeGid[i].pDnode->dnodeFqdn); - ipSet.port[i] = pVgroup->vnodeGid[i].pDnode->dnodePort + TSDB_PORT_DNODEMNODE; + ipSet.port[i] = pVgroup->vnodeGid[i].pDnode->dnodePort + TSDB_PORT_DNODEDNODE; } return ipSet; } @@ -594,7 +593,7 @@ SRpcIpSet mgmtGetIpSetFromIp(char *ep) { ipSet.numOfIps = 1; ipSet.inUse = 0; taosGetFqdnPortFromEp(ep, ipSet.fqdn[0], &ipSet.port[0]); - ipSet.port[0] += TSDB_PORT_DNODEMNODE; + ipSet.port[0] += TSDB_PORT_DNODEDNODE; return ipSet; } @@ -608,7 +607,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) { .code = 0, .msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE }; - mgmtSendMsgToDnode(ipSet, &rpcMsg); + dnodeSendMsgToDnode(ipSet, &rpcMsg); } void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) { @@ -674,7 +673,7 @@ void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) { .code = 0, .msgType = TSDB_MSG_TYPE_MD_DROP_VNODE }; - mgmtSendMsgToDnode(ipSet, &rpcMsg); + dnodeSendMsgToDnode(ipSet, &rpcMsg); } static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) { -- GitLab