提交 d58c42b4 编写于 作者: J jtao1735

working version

上级 b6225968
......@@ -77,7 +77,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
tscMgmtIpSet.inUse = 0;
tscMgmtIpSet.numOfIps = 1;
strcpy(tscMgmtIpSet.fqdn[0], ip);
tscMgmtIpSet.port[0] = port? port: tsMnodeShellPort;
tscMgmtIpSet.port[0] = port? port: tsDnodeShellPort;
} else {
if (tsFirst[0] != 0) {
taosGetFqdnPortFromEp(tsFirst, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]);
......@@ -100,7 +100,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
strncpy(pObj->user, user, TSDB_USER_LEN);
taosEncryptPass((uint8_t *)pass, strlen(pass), pObj->pass);
pObj->mgmtPort = port ? port : tsMnodeShellPort;
pObj->mgmtPort = port ? port : tsDnodeShellPort;
if (db) {
int32_t len = strlen(db);
......
......@@ -56,7 +56,7 @@ int32_t tscInitRpc(const char *user, const char *secret) {
if (pDnodeConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "TSC-vnode";
rpcInit.label = "TSC";
rpcInit.numOfThreads = tscNumOfThreads;
rpcInit.cfp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxVnodeConnections;
......
......@@ -55,10 +55,8 @@ extern char tsFirst[];
extern char tsSecond[];
extern char tsLocalEp[];
extern uint16_t tsServerPort;
extern uint16_t tsMnodeDnodePort;
extern uint16_t tsMnodeShellPort;
extern uint16_t tsDnodeShellPort;
extern uint16_t tsDnodeMnodePort;
extern uint16_t tsDnodeDnodePort;
extern uint16_t tsSyncPort;
extern int32_t tsStatusInterval;
......
......@@ -66,11 +66,9 @@ char tsSecond[TSDB_FQDN_LEN] = {0};
char tsArbitrator[TSDB_FQDN_LEN] = {0};
char tsLocalEp[TSDB_FQDN_LEN] = {0}; // Local End Point, hostname:port
uint16_t tsServerPort = 6030;
uint16_t tsMnodeShellPort = 6030; // udp[6030-6034] tcp[6030]
uint16_t tsDnodeShellPort = 6035; // udp[6035-6039] tcp[6035]
uint16_t tsMnodeDnodePort = 6040; // udp/tcp
uint16_t tsDnodeMnodePort = 6045; // udp/tcp
uint16_t tsSyncPort = 6050;
uint16_t tsDnodeShellPort = 6030; // udp[6035-6039] tcp[6035]
uint16_t tsDnodeDnodePort = 6035; // udp/tcp
uint16_t tsSyncPort = 6040;
int32_t tsStatusInterval = 1; // second
int32_t tsShellActivityTimer = 3; // second
......@@ -1245,8 +1243,7 @@ bool taosCheckGlobalCfg() {
tsVersion = 10 * tsVersion;
tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035]
tsMnodeDnodePort = tsServerPort + TSDB_PORT_MNODEDNODE; // udp/tcp
tsDnodeMnodePort = tsServerPort + TSDB_PORT_DNODEMNODE; // udp/tcp
tsDnodeDnodePort = tsServerPort + TSDB_PORT_DNODEDNODE; // udp/tcp
tsSyncPort = tsServerPort + TSDB_PORT_SYNC;
return true;
......
......@@ -26,10 +26,10 @@
#include "tcq.h"
#include "taos.h"
#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__);}
#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__);}
#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__);}
#define cPrint(...) {taosPrintLog("WAL ", 255, __VA_ARGS__);}
#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__);}
#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__);}
#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__);}
#define cPrint(...) {taosPrintLog("CQ ", 255, __VA_ARGS__);}
typedef struct {
int vgId;
......
......@@ -13,16 +13,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_MCLIENT_H
#define TDENGINE_DNODE_MCLIENT_H
#ifndef TDENGINE_DNODE_DNODE_H
#define TDENGINE_DNODE_DNODE_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t dnodeInitMClient();
void dnodeCleanupMClient();
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
int32_t dnodeInitServer();
void dnodeCleanupServer();
int32_t dnodeInitClient();
void dnodeCleanupClient();
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_DNODE_MNODE_H
#define TDENGINE_DNODE_MNODE_H
#ifdef __cplusplus
extern "C" {
#endif
int32_t dnodeInitMnode();
void dnodeCleanupMnode();
#ifdef __cplusplus
}
#endif
#endif
......@@ -7,7 +7,6 @@
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
......@@ -15,73 +14,64 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tsched.h"
#include "tsystem.h"
#include "taosmsg.h"
#include "trpc.h"
#include "tutil.h"
#include "tglobal.h"
#include "dnode.h"
#include "tgrant.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtMnode.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtTable.h"
#include "mgmtVgroup.h"
#include "dnodeLog.h"
#include "dnodeMgmt.h"
static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg);
static void (*mgmtProcessDnodeRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
static void *tsMgmtDClientRpc = NULL;
static void *tsDnodeClientRpc;
static void (*dnodeProcessDnodeRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg);
extern void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet);
int32_t mgmtInitDClient() {
SRpcInit rpcInit = {0};
rpcInit.localPort = 0;
rpcInit.label = "MND-DC";
int32_t dnodeInitClient() {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1;
rpcInit.cfp = mgmtProcessRspFromDnode;
rpcInit.cfp = dnodeProcessRspFromDnode;
rpcInit.ufp = dnodeUpdateIpSet;
rpcInit.sessions = 100;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "mgmtDClient";
rpcInit.idleTime = tsShellActivityTimer * 2000;
rpcInit.user = "t";
rpcInit.ckey = "key";
rpcInit.secret = "secret";
tsMgmtDClientRpc = rpcOpen(&rpcInit);
if (tsMgmtDClientRpc == NULL) {
mError("failed to init client connection to dnode");
tsDnodeClientRpc = rpcOpen(&rpcInit);
if (tsDnodeClientRpc == NULL) {
dError("failed to init mnode rpc client");
return -1;
}
mPrint("client connection to dnode is opened");
dPrint("inter-dndoes rpc client is opened");
return 0;
}
void mgmtCleanupDClient() {
if (tsMgmtDClientRpc) {
rpcClose(tsMgmtDClientRpc);
tsMgmtDClientRpc = NULL;
void dnodeCleanupClient() {
if (tsDnodeClientRpc) {
rpcClose(tsDnodeClientRpc);
tsDnodeClientRpc = NULL;
dPrint("inter-dnodes rpc client is closed");
}
}
void mgmtAddDClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
mgmtProcessDnodeRspFp[msgType] = fp;
static void dnodeProcessRspFromDnode(SRpcMsg *pMsg) {
if (dnodeProcessDnodeRspFp[pMsg->msgType]) {
(*dnodeProcessDnodeRspFp[pMsg->msgType])(pMsg);
} else {
dError("%s is not processed", taosMsg[pMsg->msgType]);
}
rpcFreeCont(pMsg->pCont);
}
void mgmtSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsMgmtDClientRpc, ipSet, rpcMsg);
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
dnodeProcessDnodeRspFp[msgType] = fp;
}
static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
if (mgmtProcessDnodeRspFp[rpcMsg->msgType]) {
(*mgmtProcessDnodeRspFp[rpcMsg->msgType])(rpcMsg);
} else {
mError("%s is not processed in mgmt dclient", taosMsg[rpcMsg->msgType]);
SRpcMsg rpcRsp = {.pCont = 0, .contLen = 0, .code = TSDB_CODE_OPS_NOT_SUPPORT, .handle = rpcMsg->handle};
rpcSendResponse(&rpcRsp);
}
rpcFreeCont(rpcMsg->pCont);
void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsDnodeClientRpc, ipSet, rpcMsg);
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "taosmsg.h"
#include "trpc.h"
#include "tutil.h"
#include "tsync.h"
#include "ttime.h"
#include "ttimer.h"
#include "tbalance.h"
#include "tglobal.h"
#include "vnode.h"
#include "mnode.h"
#include "dnode.h"
#include "dnodeLog.h"
#include "dnodeMClient.h"
#include "dnodeModule.h"
#include "dnodeMgmt.h"
#define MPEER_CONTENT_LEN 2000
static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes);
static bool dnodeReadMnodeInfos();
static void dnodeSaveMnodeInfos();
static void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg);
static bool dnodeReadDnodeCfg();
static void dnodeSaveDnodeCfg();
static void dnodeProcessRspFromMnode(SRpcMsg *pMsg);
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void (*tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void *tsDnodeMClientRpc = NULL;
static void *tsDnodeTmr = NULL;
static void *tsStatusTimer = NULL;
static uint32_t tsRebootTime;
static SRpcIpSet tsMnodeIpSet = {0};
static SDMMnodeInfos tsMnodeInfos = {0};
static SDMDnodeCfg tsDnodeCfg = {0};
void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
dTrace("mgmt IP list is changed for ufp is called");
tsMnodeIpSet = *pIpSet;
}
void dnodeGetMnodeDnodeIpSet(void *ipSetRaw) {
SRpcIpSet *ipSet = ipSetRaw;
ipSet->numOfIps = tsMnodeInfos.nodeNum;
ipSet->inUse = tsMnodeInfos.inUse;
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; ++i) {
taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, ipSet->fqdn[i], &ipSet->port[i]);
ipSet->port[i] += TSDB_PORT_MNODEDNODE;
}
}
int32_t dnodeInitMClient() {
dnodeReadDnodeCfg();
tsRebootTime = taosGetTimestampSec();
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
if (tsDnodeTmr == NULL) {
dError("failed to init dnode timer");
return -1;
}
if (!dnodeReadMnodeInfos()) {
memset(&tsMnodeIpSet, 0, sizeof(SRpcIpSet));
memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos));
tsMnodeIpSet.numOfIps = 1;
taosGetFqdnPortFromEp(tsFirst, tsMnodeIpSet.fqdn[0], &tsMnodeIpSet.port[0]);
tsMnodeIpSet.port[0] += TSDB_PORT_MNODEDNODE;
if (strcmp(tsSecond, tsFirst) != 0) {
tsMnodeIpSet.numOfIps = 2;
taosGetFqdnPortFromEp(tsSecond, tsMnodeIpSet.fqdn[1], &tsMnodeIpSet.port[1]);
tsMnodeIpSet.port[1] += TSDB_PORT_MNODEDNODE;
}
} else {
tsMnodeIpSet.inUse = tsMnodeInfos.inUse;
tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum;
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]);
tsMnodeIpSet.port[i] += TSDB_PORT_MNODEDNODE;
}
}
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-MC";
rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessRspFromMnode;
rpcInit.ufp = dnodeUpdateIpSet;
rpcInit.sessions = 100;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 2000;
rpcInit.user = "t";
rpcInit.ckey = "key";
rpcInit.secret = "secret";
tsDnodeMClientRpc = rpcOpen(&rpcInit);
if (tsDnodeMClientRpc == NULL) {
dError("failed to init mnode rpc client");
return -1;
}
tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
dPrint("mnode rpc client is opened");
return 0;
}
void dnodeCleanupMClient() {
if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer);
tsStatusTimer = NULL;
}
if (tsDnodeTmr != NULL) {
taosTmrCleanUp(tsDnodeTmr);
tsDnodeTmr = NULL;
}
if (tsDnodeMClientRpc) {
rpcClose(tsDnodeMClientRpc);
tsDnodeMClientRpc = NULL;
dPrint("mnode rpc client is closed");
}
}
static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) {
if (tsDnodeProcessMgmtRspFp[pMsg->msgType]) {
(*tsDnodeProcessMgmtRspFp[pMsg->msgType])(pMsg);
} else {
dError("%s is not processed in dnode mclient", taosMsg[pMsg->msgType]);
SRpcMsg rpcRsp = {.pCont = 0, .contLen = 0, .code = TSDB_CODE_OPS_NOT_SUPPORT, .handle = pMsg->handle};
rpcSendResponse(&rpcRsp);
}
rpcFreeCont(pMsg->pCont);
}
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) {
dError("status rsp is received, error:%s", tstrerror(pMsg->code));
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return;
}
SDMStatusRsp *pStatusRsp = pMsg->pCont;
SDMMnodeInfos *pMnodes = &pStatusRsp->mnodes;
if (pMnodes->nodeNum <= 0) {
dError("status msg is invalid, num of ips is %d", pMnodes->nodeNum);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return;
}
SDMDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
pCfg->moduleStatus = htonl(pCfg->moduleStatus);
pCfg->dnodeId = htonl(pCfg->dnodeId);
for (int32_t i = 0; i < pMnodes->nodeNum; ++i) {
SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i];
pMnodeInfo->nodeId = htonl(pMnodeInfo->nodeId);
}
SDMVgroupAccess *pVgAcccess = pStatusRsp->vgAccess;
for (int32_t i = 0; i < pCfg->numOfVnodes; ++i) {
pVgAcccess[i].vgId = htonl(pVgAcccess[i].vgId);
}
dnodeProcessModuleStatus(pCfg->moduleStatus);
dnodeUpdateDnodeCfg(pCfg);
dnodeUpdateMnodeInfos(pMnodes);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
}
static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
bool mnodesChanged = (memcmp(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos)) != 0);
bool mnodesNotInit = (tsMnodeInfos.nodeNum == 0);
if (!(mnodesChanged || mnodesNotInit)) return;
memcpy(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos));
tsMnodeIpSet.inUse = tsMnodeInfos.inUse;
tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum;
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]);
tsMnodeIpSet.port[i] += TSDB_PORT_MNODEDNODE;
}
dPrint("mnodes is changed, nodeNum:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp);
}
dnodeSaveMnodeInfos();
sdbUpdateSync();
}
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
if (tsDnodeMClientRpc) {
rpcSendRequest(tsDnodeMClientRpc, &tsMnodeIpSet, rpcMsg);
}
}
static bool dnodeReadMnodeInfos() {
char ipFile[TSDB_FILENAME_LEN] = {0};
sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "r");
if (!fp) {
dTrace("failed to read mnode mgmtIpList.json, file not exist");
return false;
}
bool ret = false;
int maxLen = 2000;
char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
dError("failed to read mnode mgmtIpList.json, content is null");
return false;
}
cJSON* root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read mnode mgmtIpList.json, invalid json format");
goto PARSE_OVER;
}
cJSON* inUse = cJSON_GetObjectItem(root, "inUse");
if (!inUse || inUse->type != cJSON_Number) {
dError("failed to read mnode mgmtIpList.json, inUse not found");
goto PARSE_OVER;
}
tsMnodeInfos.inUse = inUse->valueint;
cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum");
if (!nodeNum || nodeNum->type != cJSON_Number) {
dError("failed to read mnode mgmtIpList.json, nodeNum not found");
goto PARSE_OVER;
}
tsMnodeInfos.nodeNum = nodeNum->valueint;
cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
dError("failed to read mnode mgmtIpList.json, nodeInfos not found");
goto PARSE_OVER;
}
int size = cJSON_GetArraySize(nodeInfos);
if (size != tsMnodeInfos.nodeNum) {
dError("failed to read mnode mgmtIpList.json, nodeInfos size not matched");
goto PARSE_OVER;
}
for (int i = 0; i < size; ++i) {
cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue;
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) {
dError("failed to read mnode mgmtIpList.json, nodeId not found");
goto PARSE_OVER;
}
tsMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
dError("failed to read mnode mgmtIpList.json, nodeName not found");
goto PARSE_OVER;
}
strncpy(tsMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_FQDN_LEN);
}
ret = true;
dPrint("read mnode iplist successed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp);
}
PARSE_OVER:
free(content);
cJSON_Delete(root);
fclose(fp);
return ret;
}
static void dnodeSaveMnodeInfos() {
char ipFile[TSDB_FILENAME_LEN] = {0};
sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "w");
if (!fp) return;
int32_t len = 0;
int32_t maxLen = 2000;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsMnodeInfos.inUse);
len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsMnodeInfos.nodeNum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMnodeInfos.nodeInfos[i].nodeId);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsMnodeInfos.nodeInfos[i].nodeEp);
if (i < tsMnodeInfos.nodeNum -1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fclose(fp);
free(content);
dPrint("save mnode iplist successed");
}
char *dnodeGetMnodeMasterEp() {
return tsMnodeInfos.nodeInfos[tsMnodeIpSet.inUse].nodeEp;
}
void* dnodeGetMnodeInfos() {
return &tsMnodeInfos;
}
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
if (tsDnodeTmr == NULL) {
dError("dnode timer is already released");
return;
}
if (tsStatusTimer == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to start status timer");
return;
}
int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SDMStatusMsg *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to malloc status message");
return;
}
//strcpy(pStatus->dnodeName, tsDnodeName);
pStatus->version = htonl(tsVersion);
pStatus->dnodeId = htonl(tsDnodeCfg.dnodeId);
strcpy(pStatus->dnodeEp, tsLocalEp);
pStatus->lastReboot = htonl(tsRebootTime);
pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes);
pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB;
pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
vnodeBuildStatusMsg(pStatus);
contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
pStatus->openVnodes = htons(pStatus->openVnodes);
SRpcMsg rpcMsg = {
.pCont = pStatus,
.contLen = contLen,
.msgType = TSDB_MSG_TYPE_DM_STATUS
};
dnodeSendMsgToMnode(&rpcMsg);
}
static bool dnodeReadDnodeCfg() {
char dnodeCfgFile[TSDB_FILENAME_LEN] = {0};
sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(dnodeCfgFile, "r");
if (!fp) {
dTrace("failed to read dnodeCfg.json, file not exist");
return false;
}
bool ret = false;
int maxLen = 100;
char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
dError("failed to read dnodeCfg.json, content is null");
return false;
}
cJSON* root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read dnodeCfg.json, invalid json format");
goto PARSE_CFG_OVER;
}
cJSON* dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read dnodeCfg.json, dnodeId not found");
goto PARSE_CFG_OVER;
}
tsDnodeCfg.dnodeId = dnodeId->valueint;
ret = true;
dPrint("read numOfVnodes successed, dnodeId:%d", tsDnodeCfg.dnodeId);
PARSE_CFG_OVER:
free(content);
cJSON_Delete(root);
fclose(fp);
return ret;
}
static void dnodeSaveDnodeCfg() {
char dnodeCfgFile[TSDB_FILENAME_LEN] = {0};
sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(dnodeCfgFile, "w");
if (!fp) return;
int32_t len = 0;
int32_t maxLen = 100;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d\n", tsDnodeCfg.dnodeId);
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fclose(fp);
free(content);
dPrint("save dnodeId successed");
}
void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg) {
if (tsDnodeCfg.dnodeId == 0) {
dPrint("dnodeId is set to %d", pCfg->dnodeId);
tsDnodeCfg.dnodeId = pCfg->dnodeId;
dnodeSaveDnodeCfg();
}
}
int32_t dnodeGetDnodeId() {
return tsDnodeCfg.dnodeId;
}
......@@ -23,9 +23,8 @@
#include "tglobal.h"
#include "dnode.h"
#include "dnodeLog.h"
#include "dnodeMClient.h"
#include "dnodeMgmt.h"
#include "dnodeMnode.h"
#include "dnodeDnode.h"
#include "dnodeModule.h"
#include "dnodeRead.h"
#include "dnodeShell.h"
......@@ -167,9 +166,9 @@ static int32_t dnodeInitSystem() {
if (dnodeInitStorage() != 0) return -1;
if (dnodeInitRead() != 0) return -1;
if (dnodeInitWrite() != 0) return -1;
if (dnodeInitMClient() != 0) return -1;
if (dnodeInitClient() != 0) return -1;
if (dnodeInitModules() != 0) return -1;
if (dnodeInitMnode() != 0) return -1;
if (dnodeInitServer() != 0) return -1;
if (dnodeInitMgmt() != 0) return -1;
if (dnodeInitShell() != 0) return -1;
......@@ -185,9 +184,9 @@ static void dnodeCleanUpSystem() {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
dnodeCleanupShell();
dnodeCleanupMnode();
dnodeCleanupServer();
dnodeCleanupMgmt();
dnodeCleanupMClient();
dnodeCleanupClient();
dnodeCleanupWrite();
dnodeCleanupRead();
dnodeCleanUpModules();
......
......@@ -15,19 +15,47 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "cJSON.h"
#include "ihash.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "ttime.h"
#include "ttimer.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
#include "vnode.h"
#include "tsync.h"
#include "ttime.h"
#include "ttimer.h"
#include "tbalance.h"
#include "tglobal.h"
#include "dnode.h"
#include "vnode.h"
#include "mnode.h"
#include "dnodeLog.h"
#include "dnodeMClient.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "dnodeWrite.h"
#include "dnodeModule.h"
#define MPEER_CONTENT_LEN 2000
static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes);
static bool dnodeReadMnodeInfos();
static void dnodeSaveMnodeInfos();
static void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg);
static bool dnodeReadDnodeCfg();
static void dnodeSaveDnodeCfg();
static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
static void dnodeSendStatusMsg(void *handle, void *tmrId);
static void *tsDnodeTmr = NULL;
static void *tsStatusTimer = NULL;
static uint32_t tsRebootTime;
static SRpcIpSet tsMnodeIpSet = {0};
static SDMMnodeInfos tsMnodeInfos = {0};
static SDMDnodeCfg tsDnodeCfg = {0};
static int32_t dnodeOpenVnodes();
static void dnodeCloseVnodes();
......@@ -43,15 +71,59 @@ int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
dnodeReadDnodeCfg();
tsRebootTime = taosGetTimestampSec();
tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
if (tsDnodeTmr == NULL) {
dError("failed to init dnode timer");
return -1;
}
if (!dnodeReadMnodeInfos()) {
memset(&tsMnodeIpSet, 0, sizeof(SRpcIpSet));
memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos));
tsMnodeIpSet.numOfIps = 1;
taosGetFqdnPortFromEp(tsFirst, tsMnodeIpSet.fqdn[0], &tsMnodeIpSet.port[0]);
tsMnodeIpSet.port[0] += TSDB_PORT_DNODEDNODE;
if (strcmp(tsSecond, tsFirst) != 0) {
tsMnodeIpSet.numOfIps = 2;
taosGetFqdnPortFromEp(tsSecond, tsMnodeIpSet.fqdn[1], &tsMnodeIpSet.port[1]);
tsMnodeIpSet.port[1] += TSDB_PORT_DNODEDNODE;
}
} else {
tsMnodeIpSet.inUse = tsMnodeInfos.inUse;
tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum;
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]);
tsMnodeIpSet.port[i] += TSDB_PORT_DNODEDNODE;
}
}
int32_t code = dnodeOpenVnodes();
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
dPrint("dnode mgmt is initialized");
return TSDB_CODE_SUCCESS;
}
void dnodeCleanupMgmt() {
if (tsStatusTimer != NULL) {
taosTmrStopA(&tsStatusTimer);
tsStatusTimer = NULL;
}
if (tsDnodeTmr != NULL) {
taosTmrCleanUp(tsDnodeTmr);
tsDnodeTmr = NULL;
}
dnodeCloseVnodes();
}
......@@ -193,3 +265,326 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont;
return taosCfgDynamicOptions(pCfg->config);
}
void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
dTrace("mgmt IP list is changed for ufp is called");
tsMnodeIpSet = *pIpSet;
}
void dnodeGetMnodeDnodeIpSet(void *ipSetRaw) {
SRpcIpSet *ipSet = ipSetRaw;
ipSet->numOfIps = tsMnodeInfos.nodeNum;
ipSet->inUse = tsMnodeInfos.inUse;
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; ++i) {
taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, ipSet->fqdn[i], &ipSet->port[i]);
ipSet->port[i] += TSDB_PORT_DNODEDNODE;
}
}
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) {
dError("status rsp is received, error:%s", tstrerror(pMsg->code));
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return;
}
SDMStatusRsp *pStatusRsp = pMsg->pCont;
SDMMnodeInfos *pMnodes = &pStatusRsp->mnodes;
if (pMnodes->nodeNum <= 0) {
dError("status msg is invalid, num of ips is %d", pMnodes->nodeNum);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
return;
}
SDMDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
pCfg->moduleStatus = htonl(pCfg->moduleStatus);
pCfg->dnodeId = htonl(pCfg->dnodeId);
for (int32_t i = 0; i < pMnodes->nodeNum; ++i) {
SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i];
pMnodeInfo->nodeId = htonl(pMnodeInfo->nodeId);
}
SDMVgroupAccess *pVgAcccess = pStatusRsp->vgAccess;
for (int32_t i = 0; i < pCfg->numOfVnodes; ++i) {
pVgAcccess[i].vgId = htonl(pVgAcccess[i].vgId);
}
dnodeProcessModuleStatus(pCfg->moduleStatus);
dnodeUpdateDnodeCfg(pCfg);
dnodeUpdateMnodeInfos(pMnodes);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
}
static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
bool mnodesChanged = (memcmp(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos)) != 0);
bool mnodesNotInit = (tsMnodeInfos.nodeNum == 0);
if (!(mnodesChanged || mnodesNotInit)) return;
memcpy(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos));
tsMnodeIpSet.inUse = tsMnodeInfos.inUse;
tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum;
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]);
tsMnodeIpSet.port[i] += TSDB_PORT_DNODEDNODE;
}
dPrint("mnodes is changed, nodeNum:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp);
}
dnodeSaveMnodeInfos();
sdbUpdateSync();
}
static bool dnodeReadMnodeInfos() {
char ipFile[TSDB_FILENAME_LEN] = {0};
sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "r");
if (!fp) {
dTrace("failed to read mnode mgmtIpList.json, file not exist");
return false;
}
bool ret = false;
int maxLen = 2000;
char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
dError("failed to read mnode mgmtIpList.json, content is null");
return false;
}
cJSON* root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read mnode mgmtIpList.json, invalid json format");
goto PARSE_OVER;
}
cJSON* inUse = cJSON_GetObjectItem(root, "inUse");
if (!inUse || inUse->type != cJSON_Number) {
dError("failed to read mnode mgmtIpList.json, inUse not found");
goto PARSE_OVER;
}
tsMnodeInfos.inUse = inUse->valueint;
cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum");
if (!nodeNum || nodeNum->type != cJSON_Number) {
dError("failed to read mnode mgmtIpList.json, nodeNum not found");
goto PARSE_OVER;
}
tsMnodeInfos.nodeNum = nodeNum->valueint;
cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
dError("failed to read mnode mgmtIpList.json, nodeInfos not found");
goto PARSE_OVER;
}
int size = cJSON_GetArraySize(nodeInfos);
if (size != tsMnodeInfos.nodeNum) {
dError("failed to read mnode mgmtIpList.json, nodeInfos size not matched");
goto PARSE_OVER;
}
for (int i = 0; i < size; ++i) {
cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
if (nodeInfo == NULL) continue;
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) {
dError("failed to read mnode mgmtIpList.json, nodeId not found");
goto PARSE_OVER;
}
tsMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint;
cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
dError("failed to read mnode mgmtIpList.json, nodeName not found");
goto PARSE_OVER;
}
strncpy(tsMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_FQDN_LEN);
}
ret = true;
dPrint("read mnode iplist successed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp);
}
PARSE_OVER:
free(content);
cJSON_Delete(root);
fclose(fp);
return ret;
}
static void dnodeSaveMnodeInfos() {
char ipFile[TSDB_FILENAME_LEN] = {0};
sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "w");
if (!fp) return;
int32_t len = 0;
int32_t maxLen = 2000;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsMnodeInfos.inUse);
len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsMnodeInfos.nodeNum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMnodeInfos.nodeInfos[i].nodeId);
len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsMnodeInfos.nodeInfos[i].nodeEp);
if (i < tsMnodeInfos.nodeNum -1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
}
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fclose(fp);
free(content);
dPrint("save mnode iplist successed");
}
char *dnodeGetMnodeMasterEp() {
return tsMnodeInfos.nodeInfos[tsMnodeIpSet.inUse].nodeEp;
}
void* dnodeGetMnodeInfos() {
return &tsMnodeInfos;
}
static void dnodeSendStatusMsg(void *handle, void *tmrId) {
if (tsDnodeTmr == NULL) {
dError("dnode timer is already released");
return;
}
if (tsStatusTimer == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to start status timer");
return;
}
int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SDMStatusMsg *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to malloc status message");
return;
}
//strcpy(pStatus->dnodeName, tsDnodeName);
pStatus->version = htonl(tsVersion);
pStatus->dnodeId = htonl(tsDnodeCfg.dnodeId);
strcpy(pStatus->dnodeEp, tsLocalEp);
pStatus->lastReboot = htonl(tsRebootTime);
pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes);
pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
pStatus->diskAvailable = tsAvailDataDirGB;
pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
vnodeBuildStatusMsg(pStatus);
contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
pStatus->openVnodes = htons(pStatus->openVnodes);
SRpcMsg rpcMsg = {
.pCont = pStatus,
.contLen = contLen,
.msgType = TSDB_MSG_TYPE_DM_STATUS
};
dnodeSendMsgToDnode(&tsMnodeIpSet, &rpcMsg);
}
static bool dnodeReadDnodeCfg() {
char dnodeCfgFile[TSDB_FILENAME_LEN] = {0};
sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(dnodeCfgFile, "r");
if (!fp) {
dTrace("failed to read dnodeCfg.json, file not exist");
return false;
}
bool ret = false;
int maxLen = 100;
char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
dError("failed to read dnodeCfg.json, content is null");
return false;
}
cJSON* root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read dnodeCfg.json, invalid json format");
goto PARSE_CFG_OVER;
}
cJSON* dnodeId = cJSON_GetObjectItem(root, "dnodeId");
if (!dnodeId || dnodeId->type != cJSON_Number) {
dError("failed to read dnodeCfg.json, dnodeId not found");
goto PARSE_CFG_OVER;
}
tsDnodeCfg.dnodeId = dnodeId->valueint;
ret = true;
dPrint("read numOfVnodes successed, dnodeId:%d", tsDnodeCfg.dnodeId);
PARSE_CFG_OVER:
free(content);
cJSON_Delete(root);
fclose(fp);
return ret;
}
static void dnodeSaveDnodeCfg() {
char dnodeCfgFile[TSDB_FILENAME_LEN] = {0};
sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
FILE *fp = fopen(dnodeCfgFile, "w");
if (!fp) return;
int32_t len = 0;
int32_t maxLen = 100;
char * content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d\n", tsDnodeCfg.dnodeId);
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fclose(fp);
free(content);
dPrint("save dnodeId successed");
}
void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg) {
if (tsDnodeCfg.dnodeId == 0) {
dPrint("dnodeId is set to %d", pCfg->dnodeId);
tsDnodeCfg.dnodeId = pCfg->dnodeId;
dnodeSaveDnodeCfg();
}
}
int32_t dnodeGetDnodeId() {
return tsDnodeCfg.dnodeId;
}
......@@ -23,10 +23,10 @@
#include "dnodeWrite.h"
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg);
static void *tsDnodeMnodeRpc = NULL;
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg);
static void *tsDnodeServerRpc = NULL;
int32_t dnodeInitMnode() {
int32_t dnodeInitServer() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeWrite;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeWrite;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeWrite;
......@@ -38,33 +38,35 @@ int32_t dnodeInitMnode() {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeMnodePort;
rpcInit.label = "DND-MS";
rpcInit.localPort = tsDnodeDnodePort;
rpcInit.label = "DND-S";
rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessMsgFromMnode;
rpcInit.cfp = dnodeProcessReqMsgFromDnode;
rpcInit.sessions = 100;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 2000;
tsDnodeMnodeRpc = rpcOpen(&rpcInit);
if (tsDnodeMnodeRpc == NULL) {
dError("failed to init mnode rpc server");
tsDnodeServerRpc = rpcOpen(&rpcInit);
if (tsDnodeServerRpc == NULL) {
dError("failed to init inter-dnodes RPC server");
return -1;
}
dPrint("mnode rpc server is opened");
dPrint("inter-dnodes RPC server is opened");
return 0;
}
void dnodeCleanupMnode() {
if (tsDnodeMnodeRpc) {
rpcClose(tsDnodeMnodeRpc);
tsDnodeMnodeRpc = NULL;
dPrint("mnode rpc server is closed");
void dnodeCleanupServer() {
if (tsDnodeServerRpc) {
rpcClose(tsDnodeServerRpc);
tsDnodeServerRpc = NULL;
dPrint("inter-dnodes RPC server is closed");
}
}
static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) {
void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg);
static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) {
SRpcMsg rspMsg;
rspMsg.handle = pMsg->handle;
rspMsg.pCont = NULL;
......@@ -74,7 +76,7 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) {
rspMsg.code = TSDB_CODE_NOT_READY;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
dTrace("thandle:%p, query msg is ignored since dnode not running", pMsg->handle);
dTrace("thandle:%p, query msg is ignored since dnode not running", pMsg->handle);
return;
}
......@@ -83,15 +85,11 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) {
rpcSendResponse(&rspMsg);
return;
}
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
(*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else {
dError("%s is not processed in dnode mserver", taosMsg[pMsg->msgType]);
rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
mgmtProcessReqMsgFromDnode(pMsg);
}
}
......@@ -49,7 +49,7 @@ int32_t dnodeInitShell() {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsMnodeShellPort;
rpcInit.localPort = tsDnodeShellPort;
rpcInit.label = "SHELL";
rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dnodeProcessMsgFromShell;
......
......@@ -20,6 +20,8 @@
extern "C" {
#endif
#include "trpc.h"
typedef struct {
int32_t queryReqNum;
int32_t submitReqNum;
......@@ -47,6 +49,10 @@ void dnodeGetMnodeDnodeIpSet(void *ipSet);
void * dnodeGetMnodeInfos();
int32_t dnodeGetDnodeId();
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -26,7 +26,8 @@ void mgmtCleanUpSystem();
void mgmtStopSystem();
void sdbUpdateSync();
void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg);
void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg);
void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg);
#ifdef __cplusplus
}
......
......@@ -330,9 +330,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_CHILD_TABLES 100000
#define TSDB_PORT_DNODESHELL 0
#define TSDB_PORT_DNODEMNODE 10
#define TSDB_PORT_MNODEDNODE 15
#define TSDB_PORT_SYNC 20
#define TSDB_PORT_DNODEDNODE 5
#define TSDB_PORT_SYNC 10
#define TAOS_QTYPE_RPC 0
#define TAOS_QTYPE_FWD 1
......
......@@ -68,7 +68,7 @@ TAOS *shellInit(struct arguments *args) {
tsMeterMetaKeepTimer = 3000;
// Connect to the database.
TAOS *con = taos_connect(args->host, args->user, args->password, args->database, tsMnodeShellPort);
TAOS *con = taos_connect(args->host, args->user, args->password, args->database, tsDnodeShellPort);
if (con == NULL) {
return con;
}
......
......@@ -229,7 +229,7 @@ static void shellRunImportThreads(struct arguments* args)
ShellThreadObj *pThread = threadObj + t;
pThread->threadIndex = t;
pThread->totalThreads = args->threadNum;
pThread->taos = taos_connect(args->host, args->user, args->password, args->database, tsMnodeShellPort);
pThread->taos = taos_connect(args->host, args->user, args->password, args->database, tsDnodeShellPort);
if (pThread->taos == NULL) {
fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, taos_errstr(pThread->taos));
exit(0);
......
......@@ -63,7 +63,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
break;
case 'P':
if (arg) {
tsMnodeShellPort = atoi(arg);
tsDnodeShellPort = atoi(arg);
} else {
fprintf(stderr, "Invalid port\n");
return -1;
......
......@@ -20,8 +20,8 @@
extern "C" {
#endif
int32_t mgmtInitDServer();
void mgmtCleanupDServer();
int32_t mgmtInitServer();
void mgmtCleanupServer();
void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
#ifdef __cplusplus
......
......@@ -27,8 +27,6 @@
#include "dnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtDClient.h"
#include "mgmtDServer.h"
#include "mgmtDnode.h"
#include "mgmtMnode.h"
#include "mgmtSdb.h"
......@@ -152,7 +150,7 @@ int32_t mgmtInitDnodes() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DNODE, mgmtProcessCreateDnodeMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_DNODE, mgmtProcessDropDnodeMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules);
......@@ -241,7 +239,7 @@ void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) {
.pCont = pMdCfgDnode,
.contLen = sizeof(SMDCfgDnodeMsg)
};
mgmtSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg);
dnodeSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg);
rpcRsp.code = TSDB_CODE_SUCCESS;
}
......
......@@ -24,12 +24,11 @@
#include "dnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtServer.h"
#include "mgmtAcct.h"
#include "mgmtDnode.h"
#include "mgmtMnode.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtDServer.h"
#include "mgmtSdb.h"
#include "mgmtVgroup.h"
#include "mgmtUser.h"
......@@ -100,11 +99,7 @@ int32_t mgmtStartSystem() {
mError("failed to init balance")
}
if (mgmtInitDClient() < 0) {
return -1;
}
if (mgmtInitDServer() < 0) {
if (mgmtInitServer() < 0) {
return -1;
}
......@@ -141,8 +136,7 @@ void mgmtCleanUpSystem() {
mgmtCleanupMnodes();
balanceCleanUp();
mgmtCleanUpShell();
mgmtCleanupDClient();
mgmtCleanupDServer();
mgmtCleanupServer();
mgmtCleanUpAccts();
mgmtCleanUpTables();
mgmtCleanUpVgroups();
......
......@@ -27,7 +27,6 @@
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtDb.h"
#include "mgmtDServer.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
......@@ -35,45 +34,21 @@
#include "mgmtTable.h"
#include "mgmtVgroup.h"
static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg);
static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static void (*mgmtProcessDnodeMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
static void *tsMgmtDServerRpc;
static void *tsMgmtDServerQhandle = NULL;
static void *tsMgmtServerQhandle = NULL;
int32_t mgmtInitDServer() {
SRpcInit rpcInit = {0};
rpcInit.localPort = tsMnodeDnodePort;
rpcInit.label = "MND-DS";
rpcInit.numOfThreads = 1;
rpcInit.cfp = mgmtProcessMsgFromDnode;
rpcInit.sessions = 100;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.afp = mgmtDServerRetrieveAuth;
int32_t mgmtInitServer() {
tsMgmtDServerRpc = rpcOpen(&rpcInit);
if (tsMgmtDServerRpc == NULL) {
mError("failed to init server connection to dnode");
return -1;
}
tsMgmtDServerQhandle = taosInitScheduler(tsMaxShellConns, 1, "MS");
tsMgmtServerQhandle = taosInitScheduler(tsMaxShellConns, 1, "MS");
mPrint("server connection to dnode is opened");
return 0;
}
void mgmtCleanupDServer() {
if (tsMgmtDServerQhandle) {
taosCleanUpScheduler(tsMgmtDServerQhandle);
tsMgmtDServerQhandle = NULL;
}
if (tsMgmtDServerRpc) {
rpcClose(tsMgmtDServerRpc);
tsMgmtDServerRpc = NULL;
mPrint("server connection to dnode is closed");
void mgmtCleanupServer() {
if (tsMgmtServerQhandle) {
taosCleanUpScheduler(tsMgmtServerQhandle);
tsMgmtServerQhandle = NULL;
}
}
......@@ -81,21 +56,27 @@ void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
mgmtProcessDnodeMsgFp[msgType] = fp;
}
static void mgmtProcessDServerRequest(SSchedMsg *sched) {
static void mgmtProcessRequestFromDnode(SSchedMsg *sched) {
SRpcMsg *pMsg = sched->msg;
(*mgmtProcessDnodeMsgFp[pMsg->msgType])(pMsg);
rpcFreeCont(pMsg->pCont);
free(pMsg);
}
static void mgmtAddToDServerQueue(SRpcMsg *pMsg) {
static void mgmtAddToServerQueue(SRpcMsg *pMsg) {
SSchedMsg schedMsg;
schedMsg.msg = pMsg;
schedMsg.fp = mgmtProcessDServerRequest;
taosScheduleTask(tsMgmtDServerQhandle, &schedMsg);
schedMsg.fp = mgmtProcessRequestFromDnode;
taosScheduleTask(tsMgmtServerQhandle, &schedMsg);
}
static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) {
void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg) {
if (mgmtProcessDnodeMsgFp[rpcMsg->msgType] == NULL) {
mError("%s is not processed in mnode", taosMsg[rpcMsg->msgType]);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_MSG_NOT_PROCESSED);
rpcFreeCont(rpcMsg->pCont);
}
if (rpcMsg->pCont == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN);
return;
......@@ -116,17 +97,8 @@ static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) {
return;
}
if (mgmtProcessDnodeMsgFp[rpcMsg->msgType]) {
SRpcMsg *pMsg = malloc(sizeof(SRpcMsg));
memcpy(pMsg, rpcMsg, sizeof(SRpcMsg));
mgmtAddToDServerQueue(pMsg);
} else {
mError("%s is not processed in mgmt dserver", taosMsg[rpcMsg->msgType]);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_MSG_NOT_PROCESSED);
rpcFreeCont(rpcMsg->pCont);
}
SRpcMsg *pMsg = malloc(sizeof(SRpcMsg));
memcpy(pMsg, rpcMsg, sizeof(SRpcMsg));
mgmtAddToServerQueue(pMsg);
}
static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
return TSDB_CODE_SUCCESS;
}
......@@ -24,13 +24,12 @@
#include "tname.h"
#include "tidpool.h"
#include "tglobal.h"
#include "dnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtAcct.h"
#include "mgmtDClient.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtDServer.h"
#include "tgrant.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
......@@ -538,10 +537,10 @@ int32_t mgmtInitTables() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLE_META, mgmtProcessTableMetaMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_STABLE_VGROUP, mgmtProcessSuperTableVgroupMsg);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateChildTableRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropChildTableRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropSuperTableRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateChildTableRsp);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropChildTableRsp);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropSuperTableRsp);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg);
......@@ -810,7 +809,7 @@ static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) {
if (pVgroup != NULL) {
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
mgmtDecVgroupRef(pVgroup);
}
}
......@@ -1487,7 +1486,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
};
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
}
static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) {
......@@ -1525,7 +1524,7 @@ static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) {
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
};
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
}
static int32_t mgmtModifyChildTableTagValue(SChildTableObj *pTable, char *tagName, char *nContent) {
......@@ -1827,7 +1826,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) {
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
};
mgmtSendMsgToDnode(&ipSet, &rpcRsp);
dnodeSendMsgToDnode(&ipSet, &rpcRsp);
mgmtDecTableRef(pTable);
mgmtDecDnodeRef(pDnode);
......
......@@ -23,11 +23,10 @@
#include "ttime.h"
#include "tbalance.h"
#include "tglobal.h"
#include "dnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtDServer.h"
#include "mgmtDnode.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
......@@ -220,8 +219,8 @@ int32_t mgmtInitVgroups() {
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp);
mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp);
dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg);
mTrace("table:vgroups is created");
......@@ -583,7 +582,7 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
};
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
strcpy(ipSet.fqdn[i], pVgroup->vnodeGid[i].pDnode->dnodeFqdn);
ipSet.port[i] = pVgroup->vnodeGid[i].pDnode->dnodePort + TSDB_PORT_DNODEMNODE;
ipSet.port[i] = pVgroup->vnodeGid[i].pDnode->dnodePort + TSDB_PORT_DNODEDNODE;
}
return ipSet;
}
......@@ -594,7 +593,7 @@ SRpcIpSet mgmtGetIpSetFromIp(char *ep) {
ipSet.numOfIps = 1;
ipSet.inUse = 0;
taosGetFqdnPortFromEp(ep, ipSet.fqdn[0], &ipSet.port[0]);
ipSet.port[0] += TSDB_PORT_DNODEMNODE;
ipSet.port[0] += TSDB_PORT_DNODEDNODE;
return ipSet;
}
......@@ -608,7 +607,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE
};
mgmtSendMsgToDnode(ipSet, &rpcMsg);
dnodeSendMsgToDnode(ipSet, &rpcMsg);
}
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
......@@ -674,7 +673,7 @@ void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) {
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_DROP_VNODE
};
mgmtSendMsgToDnode(ipSet, &rpcMsg);
dnodeSendMsgToDnode(ipSet, &rpcMsg);
}
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册