提交 71607ccb 编写于 作者: S slguan

[TD-52] update status message

上级 78f7134e
...@@ -24,6 +24,7 @@ int32_t dnodeInitMClient(); ...@@ -24,6 +24,7 @@ int32_t dnodeInitMClient();
void dnodeCleanupMClient(); void dnodeCleanupMClient();
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
uint32_t dnodeGetMnodeMasteIp(); uint32_t dnodeGetMnodeMasteIp();
void * dnodeGetMpeerInfos();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -15,33 +15,46 @@ ...@@ -15,33 +15,46 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "cJSON.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "tutil.h" #include "tutil.h"
#include "tsync.h"
#include "dnode.h" #include "dnode.h"
#include "dnodeMClient.h" #include "dnodeMClient.h"
#include "dnodeModule.h" #include "dnodeModule.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#define MPEER_CONTENT_LEN 2000
static bool dnodeReadMnodeIpList(); static bool dnodeReadMnodeIpList();
static void dnodeSaveMnodeIpList(); static void dnodeSaveMnodeIpList();
static void dnodeProcessRspFromMnode(SRpcMsg *pMsg); static void dnodeProcessRspFromMnode(SRpcMsg *pMsg);
static void dnodeProcessStatusRsp(SRpcMsg *pMsg); static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
static void (*tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void (*tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
static void *tsDnodeMClientRpc = NULL; static void *tsDnodeMClientRpc = NULL;
static SRpcIpSet tsDnodeMnodeIpList = {0}; static SRpcIpSet tsMpeerIpList = {0};
static SDMNodeInfos tsMpeerInfos = {0};
int32_t dnodeInitMClient() { int32_t dnodeInitMClient() {
if (!dnodeReadMnodeIpList()) { if (!dnodeReadMnodeIpList()) {
dTrace("failed to read mnode iplist, set it from cfg file"); memset(&tsMpeerIpList, 0, sizeof(SRpcIpSet));
memset(&tsDnodeMnodeIpList, 0, sizeof(SRpcIpSet)); memset(&tsMpeerInfos, 0, sizeof(SDMNodeInfos));
tsDnodeMnodeIpList.port = tsMnodeDnodePort; tsMpeerIpList.port = tsMnodeDnodePort;
tsDnodeMnodeIpList.numOfIps = 1; tsMpeerIpList.numOfIps = 1;
tsDnodeMnodeIpList.ip[0] = inet_addr(tsMasterIp); tsMpeerIpList.ip[0] = inet_addr(tsMasterIp);
if (tsSecondIp[0]) { if (tsSecondIp[0]) {
tsDnodeMnodeIpList.numOfIps = 2; tsMpeerIpList.numOfIps = 2;
tsDnodeMnodeIpList.ip[1] = inet_addr(tsSecondIp); tsMpeerIpList.ip[1] = inet_addr(tsSecondIp);
}
} else {
SRpcIpSet mgmtIpSet = {0};
tsMpeerIpList.inUse = tsMpeerInfos.inUse;
tsMpeerIpList.numOfIps = tsMpeerInfos.nodeNum;
tsMpeerIpList.port = tsMpeerInfos.nodeInfos[0].nodePort;
for (int32_t i = 0; i < tsMpeerInfos.nodeNum; i++) {
tsMpeerIpList.ip[i] = tsMpeerInfos.nodeInfos[i].nodeIp;
} }
} }
...@@ -96,23 +109,31 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { ...@@ -96,23 +109,31 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
} }
SDMStatusRsp *pStatusRsp = pMsg->pCont; SDMStatusRsp *pStatusRsp = pMsg->pCont;
if (pStatusRsp->ipList.numOfIps <= 0) { SDMNodeInfos *mpeers = &pStatusRsp->mpeers;
dError("status msg is invalid, num of ips is %d", pStatusRsp->ipList.numOfIps); if (mpeers->nodeNum <= 0) {
dError("status msg is invalid, num of ips is %d", mpeers->nodeNum);
return; return;
} }
pStatusRsp->ipList.port = htons(pStatusRsp->ipList.port); SRpcIpSet mgmtIpSet = {0};
for (int32_t i = 0; i < pStatusRsp->ipList.numOfIps; ++i) { mgmtIpSet.inUse = mpeers->inUse;
pStatusRsp->ipList.ip[i] = htonl(pStatusRsp->ipList.ip[i]); mgmtIpSet.numOfIps = mpeers->nodeNum;
mgmtIpSet.port = htons(mpeers->nodeInfos[0].nodePort);
for (int32_t i = 0; i < mpeers->nodeNum; i++) {
mgmtIpSet.ip[i] = htonl(mpeers->nodeInfos[i].nodeIp);
} }
//dTrace("status msg is received, result:%s", tstrerror(pMsg->code)); if (memcmp(&mgmtIpSet, &tsMpeerIpList, sizeof(SRpcIpSet)) != 0) {
memcpy(&tsMpeerIpList, &mgmtIpSet, sizeof(SRpcIpSet));
if (memcmp(&(pStatusRsp->ipList), &tsDnodeMnodeIpList, sizeof(SRpcIpSet)) != 0) { memcpy(&tsMpeerInfos, mpeers, sizeof(SDMNodeInfos));
dPrint("mnode ip list is changed, numOfIps:%d inUse:%d", pStatusRsp->ipList.numOfIps, pStatusRsp->ipList.inUse); dPrint("mnode ip list is changed, numOfIps:%d inUse:%d", tsMpeerInfos.nodeNum, tsMpeerInfos.inUse);
memcpy(&tsDnodeMnodeIpList, &pStatusRsp->ipList, sizeof(SRpcIpSet)); for (int32_t i = 0; i < mpeers->nodeNum; i++) {
for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; ++i) { tsMpeerInfos.nodeInfos[i].nodeId = htonl(mpeers->nodeInfos[i].nodeId);
dPrint("mnode index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i])); tsMpeerInfos.nodeInfos[i].nodeIp = htonl(mpeers->nodeInfos[i].nodeIp);
tsMpeerInfos.nodeInfos[i].nodePort = htons(mpeers->nodeInfos[i].nodePort);
dPrint("mnode:%d, ip:%s:%u name:%s", tsMpeerInfos.nodeInfos[i].nodeId,
taosIpStr(tsMpeerInfos.nodeInfos[i].nodeId), tsMpeerInfos.nodeInfos[i].nodePort,
tsMpeerInfos.nodeInfos[i].nodeName);
} }
dnodeSaveMnodeIpList(); dnodeSaveMnodeIpList();
} }
...@@ -129,70 +150,148 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { ...@@ -129,70 +150,148 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
if (tsDnodeMClientRpc) { if (tsDnodeMClientRpc) {
rpcSendRequest(tsDnodeMClientRpc, &tsDnodeMnodeIpList, rpcMsg); rpcSendRequest(tsDnodeMClientRpc, &tsMpeerIpList, rpcMsg);
} }
} }
static bool dnodeReadMnodeIpList() { static bool dnodeReadMnodeIpList() {
char ipFile[TSDB_FILENAME_LEN] = {0}; char ipFile[TSDB_FILENAME_LEN] = {0};
sprintf(ipFile, "%s/iplist", tsDnodeDir); sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "r"); FILE *fp = fopen(ipFile, "r");
if (!fp) return false; if (!fp) {
dTrace("failed to read mnode mgmtIpList.json, file not exist");
return false;
}
bool ret = false;
int maxLen = 2000;
char *content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
dError("failed to read mnode mgmtIpList.json, content is null");
return false;
}
char option[32] = {0}; cJSON* root = cJSON_Parse(content);
int32_t value = 0; if (root == NULL) {
int32_t num = 0; dError("failed to read mnode mgmtIpList.json, invalid json format");
goto PARSE_OVER;
}
num = fscanf(fp, "%s %d", option, &value); cJSON* inUse = cJSON_GetObjectItem(root, "inUse");
if (num != 2) return false; if (!inUse || inUse->type != cJSON_Number) {
if (strcmp(option, "inUse") != 0) return false; dError("failed to read mnode mgmtIpList.json, inUse not found");
tsDnodeMnodeIpList.inUse = (int8_t)value;; goto PARSE_OVER;
}
tsMpeerInfos.inUse = inUse->valueint;
num = fscanf(fp, "%s %d", option, &value); cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum");
if (num != 2) return false; if (!nodeNum || nodeNum->type != cJSON_Number) {
if (strcmp(option, "numOfIps") != 0) return false; dError("failed to read mnode mgmtIpList.json, nodeNum not found");
tsDnodeMnodeIpList.numOfIps = (int8_t)value; goto PARSE_OVER;
}
tsMpeerInfos.nodeNum = nodeNum->valueint;
num = fscanf(fp, "%s %d", option, &value); cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
if (num != 2) return false; if (!nodeInfos || nodeInfos->type != cJSON_Array) {
if (strcmp(option, "port") != 0) return false; dError("failed to read mnode mgmtIpList.json, nodeInfos not found");
tsDnodeMnodeIpList.port = (uint16_t)value; goto PARSE_OVER;
}
for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; i++) { int size = cJSON_GetArraySize(nodeInfos);
num = fscanf(fp, "%s %d", option, &value); if (size != tsMpeerInfos.nodeNum) {
if (num != 2) return false; dError("failed to read mnode mgmtIpList.json, nodeInfos size not matched");
if (strncmp(option, "ip", 2) != 0) return false; goto PARSE_OVER;
tsDnodeMnodeIpList.ip[i] = (uint32_t)value;
} }
fclose(fp); for (int i = 0; i < size; ++i) {
dPrint("read mnode iplist successed"); cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; i++) { if (nodeInfo == NULL) continue;
dPrint("mnode index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i]));
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
if (!nodeId || nodeId->type != cJSON_Number) {
dError("failed to read mnode mgmtIpList.json, nodeId not found");
goto PARSE_OVER;
}
tsMpeerInfos.nodeInfos[i].nodeId = nodeId->valueint;
cJSON *nodeIp = cJSON_GetObjectItem(nodeInfo, "nodeIp");
if (!nodeIp || nodeIp->type != cJSON_String || nodeIp->valuestring == NULL) {
dError("failed to read mnode mgmtIpList.json, nodeIp not found");
goto PARSE_OVER;
}
tsMpeerInfos.nodeInfos[i].nodeIp = inet_addr(nodeIp->valuestring);
cJSON *nodePort = cJSON_GetObjectItem(nodeInfo, "nodePort");
if (!nodePort || nodePort->type != cJSON_Number) {
dError("failed to read mnode mgmtIpList.json, nodePort not found");
goto PARSE_OVER;
}
tsMpeerInfos.nodeInfos[i].nodePort = (uint16_t)nodePort->valueint;
cJSON *nodeName = cJSON_GetObjectItem(nodeInfo, "nodeName");
if (!nodeIp || nodeName->type != cJSON_String || nodeName->valuestring == NULL) {
dError("failed to read mnode mgmtIpList.json, nodeName not found");
goto PARSE_OVER;
}
strncpy(tsMpeerInfos.nodeInfos[i].nodeName, nodeName->valuestring, TSDB_NODE_NAME_LEN);
}
ret = true;
dPrint("read mnode iplist successed, numOfIps:%d inUse:%d", tsMpeerInfos.nodeNum, tsMpeerInfos.inUse);
for (int32_t i = 0; i < tsMpeerInfos.nodeNum; i++) {
dPrint("mnode:%d, ip:%s:%u name:%s", tsMpeerInfos.nodeInfos[i].nodeId,
taosIpStr(tsMpeerInfos.nodeInfos[i].nodeId), tsMpeerInfos.nodeInfos[i].nodePort,
tsMpeerInfos.nodeInfos[i].nodeName);
} }
return true; PARSE_OVER:
free(content);
fclose(fp);
return ret;
} }
static void dnodeSaveMnodeIpList() { static void dnodeSaveMnodeIpList() {
char ipFile[TSDB_FILENAME_LEN] = {0}; char ipFile[TSDB_FILENAME_LEN] = {0};
sprintf(ipFile, "%s/iplist", tsDnodeDir); sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir);
FILE *fp = fopen(ipFile, "w"); FILE *fp = fopen(ipFile, "w");
if (!fp) return; if (!fp) return;
fprintf(fp, "inUse %d\n", tsDnodeMnodeIpList.inUse); int32_t len = 0;
fprintf(fp, "numOfIps %d\n", tsDnodeMnodeIpList.numOfIps); int32_t maxLen = 2000;
fprintf(fp, "port %u\n", tsDnodeMnodeIpList.port); char * content = calloc(1, maxLen + 1);
for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; i++) {
fprintf(fp, "ip%d %u\n", i, tsDnodeMnodeIpList.ip[i]); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsMpeerInfos.inUse);
len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsMpeerInfos.nodeNum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < tsMpeerInfos.nodeNum; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMpeerInfos.nodeInfos[i].nodeId);
len += snprintf(content + len, maxLen - len, " \"nodeIp\": \"%s\",\n", taosIpStr(tsMpeerInfos.nodeInfos[i].nodeIp));
len += snprintf(content + len, maxLen - len, " \"nodePort\": %u,\n", tsMpeerInfos.nodeInfos[i].nodePort);
len += snprintf(content + len, maxLen - len, " \"nodeName\": \"%s\"\n", tsMpeerInfos.nodeInfos[i].nodeName);
if (i < tsMpeerInfos.nodeNum -1) {
len += snprintf(content + len, maxLen - len, " },{\n");
} else {
len += snprintf(content + len, maxLen - len, " }]\n");
} }
}
len += snprintf(content + len, maxLen - len, "}\n");
fwrite(content, 1, len, fp);
fclose(fp); fclose(fp);
free(content);
dPrint("save mnode iplist successed"); dPrint("save mnode iplist successed");
} }
uint32_t dnodeGetMnodeMasteIp() { uint32_t dnodeGetMnodeMasteIp() {
return tsDnodeMnodeIpList.ip[0]; return tsMpeerIpList.ip[tsMpeerIpList.inUse];
}
void* dnodeGetMpeerInfos() {
return &tsMpeerInfos;
} }
\ No newline at end of file
...@@ -52,8 +52,10 @@ typedef struct _mnode_obj { ...@@ -52,8 +52,10 @@ typedef struct _mnode_obj {
int32_t refCount; int32_t refCount;
int8_t role; int8_t role;
int8_t status; int8_t status;
uint16_t port;
uint32_t privateIp; uint32_t privateIp;
uint32_t publicIp; uint32_t publicIp;
char mnodeName[TSDB_NODE_NAME_LEN + 1];
} SMnodeObj; } SMnodeObj;
typedef struct _dnode_obj { typedef struct _dnode_obj {
......
...@@ -20,28 +20,30 @@ ...@@ -20,28 +20,30 @@
extern "C" { extern "C" {
#endif #endif
enum _TSDB_MN_STATUS { struct _mnode_obj;
TSDB_MN_STATUS_OFFLINE,
TSDB_MN_STATUS_UNSYNCED,
TSDB_MN_STATUS_SYNCING,
TSDB_MN_STATUS_SERVING
};
enum _TSDB_MN_ROLE { enum _TAOS_MN_STATUS {
TSDB_MN_ROLE_UNDECIDED, TAOS_MN_STATUS_OFFLINE,
TSDB_MN_ROLE_SLAVE, TAOS_MN_STATUS_DROPPING,
TSDB_MN_ROLE_MASTER TAOS_MN_STATUS_READY
}; };
int32_t mpeerInit(); int32_t mpeerInit();
void mpeerCleanup(); void mpeerCleanup();
int32_t mpeerGetMnodesNum();
void * mpeerGetNextMnode(void *pNode, struct _mnode_obj **pMnode);
void mpeerReleaseMnode(struct _mnode_obj *pMnode);
bool mpeerInServerStatus(); bool mpeerInServerStatus();
bool mpeerIsMaster(); bool mpeerIsMaster();
bool mpeerCheckRedirect();
bool mpeerCheckRedirect(void *handle);
void mpeerGetPrivateIpList(SRpcIpSet *ipSet); void mpeerGetPrivateIpList(SRpcIpSet *ipSet);
void mpeerGetPublicIpList(SRpcIpSet *ipSet); void mpeerGetPublicIpList(SRpcIpSet *ipSet);
void mpeerGetMpeerInfos(void *mpeers);
char * mpeerGetMnodeStatusStr(int32_t status);
char * mpeerGetMnodeRoleStr(int32_t role);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -593,7 +593,20 @@ typedef struct { ...@@ -593,7 +593,20 @@ typedef struct {
} SDMStatusMsg; } SDMStatusMsg;
typedef struct { typedef struct {
SRpcIpSet ipList; int32_t nodeId;
uint32_t nodeIp;
uint16_t nodePort;
char nodeName[TSDB_NODE_NAME_LEN + 1];
} SDMNodeInfo;
typedef struct {
int8_t inUse;
int8_t nodeNum;
SDMNodeInfo nodeInfos[TSDB_MAX_MPEERS];
} SDMNodeInfos;
typedef struct {
SDMNodeInfos mpeers;
SDnodeState dnodeState; SDnodeState dnodeState;
SVnodeAccess vnodeAccess[]; SVnodeAccess vnodeAccess[];
} SDMStatusRsp; } SDMStatusRsp;
......
...@@ -678,8 +678,6 @@ static int32_t mgmtSetDbDropping(SDbObj *pDb) { ...@@ -678,8 +678,6 @@ static int32_t mgmtSetDbDropping(SDbObj *pDb) {
} }
static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) {
if (mpeerCheckRedirect(pMsg->thandle)) return;
SCMCreateDbMsg *pCreate = pMsg->pCont; SCMCreateDbMsg *pCreate = pMsg->pCont;
pCreate->maxSessions = htonl(pCreate->maxSessions); pCreate->maxSessions = htonl(pCreate->maxSessions);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
......
...@@ -141,8 +141,6 @@ static void clusterProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) { ...@@ -141,8 +141,6 @@ static void clusterProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) {
} }
void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
if (mpeerCheckRedirect(rpcMsg->handle)) return;
SDMStatusMsg *pStatus = rpcMsg->pCont; SDMStatusMsg *pStatus = rpcMsg->pCont;
pStatus->dnodeId = htonl(pStatus->dnodeId); pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->privateIp = htonl(pStatus->privateIp); pStatus->privateIp = htonl(pStatus->privateIp);
...@@ -221,7 +219,7 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -221,7 +219,7 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
return; return;
} }
mpeerGetPrivateIpList(&pRsp->ipList); mpeerGetMpeerInfos(&pRsp->mpeers);
pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId); pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId);
pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus); pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus);
......
...@@ -17,41 +17,36 @@ ...@@ -17,41 +17,36 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "trpc.h" #include "trpc.h"
#include "tsync.h"
#include "mpeer.h" #include "mpeer.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtUser.h" #include "mgmtUser.h"
#ifndef _MPEER extern int32_t mpeerInitMnodes();
extern void mpeerCleanupMnodes();
static SMnodeObj tsMnodeObj = {0};
static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mpeerInit() { #ifndef _MPEER
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes);
static SMnodeObj tsMnodeObj = {0};
int32_t mpeerInitMnodes() {
tsMnodeObj.mnodeId = 1; tsMnodeObj.mnodeId = 1;
tsMnodeObj.dnodeId = 1;
tsMnodeObj.privateIp = inet_addr(tsPrivateIp); tsMnodeObj.privateIp = inet_addr(tsPrivateIp);
tsMnodeObj.publicIp = inet_addr(tsPublicIp); tsMnodeObj.publicIp = inet_addr(tsPublicIp);
tsMnodeObj.createdTime = taosGetTimestampMs(); tsMnodeObj.createdTime = taosGetTimestampMs();
tsMnodeObj.role = TSDB_MN_ROLE_MASTER; tsMnodeObj.role = TAOS_SYNC_ROLE_MASTER;
tsMnodeObj.status = TSDB_MN_STATUS_SERVING; tsMnodeObj.status = TAOS_MN_STATUS_READY;
tsMnodeObj.port = tsMnodeDnodePort;
sprintf(tsMnodeObj.mnodeName, "m%d", tsMnodeObj.mnodeId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void mpeerCleanup() {} void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) {
bool mpeerInServerStatus() { return tsMnodeObj.status == TSDB_MN_STATUS_SERVING; }
bool mpeerIsMaster() { return tsMnodeObj.role == TSDB_MN_ROLE_MASTER; }
bool mpeerCheckRedirect(void *thandle) { return false; }
static int32_t mgmtGetMnodesNum() {
return 1;
}
static void *mgmtGetNextMnode(void *pNode, SMnodeObj **pMnode) {
if (*pMnode == NULL) { if (*pMnode == NULL) {
*pMnode = &tsMnodeObj; *pMnode = &tsMnodeObj;
} else { } else {
...@@ -61,22 +56,74 @@ static void *mgmtGetNextMnode(void *pNode, SMnodeObj **pMnode) { ...@@ -61,22 +56,74 @@ static void *mgmtGetNextMnode(void *pNode, SMnodeObj **pMnode) {
return *pMnode; return *pMnode;
} }
char *taosGetMnodeStatusStr(int32_t mnodeStatus) { void mpeerGetPrivateIpList(SRpcIpSet *ipSet) {
switch (mnodeStatus) { ipSet->inUse = 0;
case TSDB_MN_STATUS_OFFLINE: return "offline"; ipSet->port = htons(tsMnodeDnodePort);
case TSDB_MN_STATUS_UNSYNCED: return "unsynced"; ipSet->numOfIps = 1;
case TSDB_MN_STATUS_SYNCING: return "syncing"; ipSet->ip[0] = htonl(tsMnodeObj.privateIp);
case TSDB_MN_STATUS_SERVING: return "serving"; }
default: return "undefined";
void mpeerGetPublicIpList(SRpcIpSet *ipSet) {
ipSet->inUse = 0;
ipSet->port = htons(tsMnodeDnodePort);
ipSet->numOfIps = 1;
ipSet->ip[0] = htonl(tsMnodeObj.publicIp);
}
void mpeerGetMpeerInfos(void *param) {
SDMNodeInfos *mpeers = param;
mpeers->nodeNum = 1;
mpeers->nodeInfos[0].nodeId = htonl(tsMnodeObj.mnodeId);
mpeers->nodeInfos[0].nodeIp = htonl(tsMnodeObj.privateIp);
mpeers->nodeInfos[0].nodePort = htons(tsMnodeObj.port);
strcpy(mpeers->nodeInfos[0].nodeName, tsMnodeObj.mnodeName);
}
void mpeerCleanupDnodes() {}
int32_t mpeerGetMnodesNum() { return 1; }
void mpeerReleaseMnode(struct _mnode_obj *pMnode) {}
bool mpeerInServerStatus() { return tsMnodeObj.status == TAOS_MN_STATUS_READY; }
bool mpeerIsMaster() { return tsMnodeObj.role == TAOS_SYNC_ROLE_MASTER; }
bool mpeerCheckRedirect() { return false; }
#endif
int32_t mpeerInit() {
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes);
return mpeerInitMnodes();
}
void mpeerCleanup() {
mpeerCleanupDnodes();
}
char *mpeerGetMnodeStatusStr(int32_t status) {
switch (status) {
case TAOS_MN_STATUS_OFFLINE:
return "offline";
case TAOS_MN_STATUS_DROPPING:
return "dropping";
case TAOS_MN_STATUS_READY:
return "ready";
default:
return "undefined";
} }
} }
char *taosGetMnodeRoleStr(int32_t mnodeRole) { char *mpeerGetMnodeRoleStr(int32_t role) {
switch (mnodeRole) { switch (role) {
case TSDB_MN_ROLE_UNDECIDED: return "undicided"; case TAOS_SYNC_ROLE_OFFLINE:
case TSDB_MN_ROLE_SLAVE: return "slave"; return "offline";
case TSDB_MN_ROLE_MASTER: return "master"; case TAOS_SYNC_ROLE_UNSYNCED:
default: return "undefined"; return "unsynced";
case TAOS_SYNC_ROLE_SLAVE:
return "slave";
case TAOS_SYNC_ROLE_MASTER:
return "master";
default:
return "undefined";
} }
} }
...@@ -133,7 +180,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo ...@@ -133,7 +180,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
} }
pShow->numOfRows = mgmtGetMnodesNum(); pShow->numOfRows = mpeerGetMnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL; pShow->pNode = NULL;
mgmtReleaseUser(pUser); mgmtReleaseUser(pUser);
...@@ -149,7 +196,7 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -149,7 +196,7 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
char ipstr[32]; char ipstr[32];
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pNode = mgmtGetNextMnode(pShow->pNode, (SMnodeObj **)&pMnode); pShow->pNode = mpeerGetNextMnode(pShow->pNode, &pMnode);
if (pMnode == NULL) break; if (pMnode == NULL) break;
cols = 0; cols = 0;
...@@ -173,11 +220,11 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -173,11 +220,11 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetMnodeStatusStr(pMnode->status)); strcpy(pWrite, mpeerGetMnodeStatusStr(pMnode->status));
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, taosGetMnodeRoleStr(pMnode->role)); strcpy(pWrite, mpeerGetMnodeRoleStr(pMnode->role));
cols++; cols++;
numOfRows++; numOfRows++;
...@@ -187,19 +234,3 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -187,19 +234,3 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
return numOfRows; return numOfRows;
} }
\ No newline at end of file
void mpeerGetPrivateIpList(SRpcIpSet *ipSet) {
ipSet->inUse = 0;
ipSet->port = htons(tsMnodeDnodePort);
ipSet->numOfIps = 1;
ipSet->ip[0] = htonl(tsMnodeObj.privateIp);
}
void mpeerGetPublicIpList(SRpcIpSet *ipSet) {
ipSet->inUse = 0;
ipSet->port = htons(tsMnodeDnodePort);
ipSet->numOfIps = 1;
ipSet->ip[0] = htonl(tsMnodeObj.publicIp);
}
#endif
\ No newline at end of file
...@@ -681,7 +681,6 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn ...@@ -681,7 +681,6 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn
void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) { void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mpeerCheckRedirect(pMsg->thandle)) return;
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL); SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL);
if (pUser == NULL) { if (pUser == NULL) {
...@@ -705,7 +704,6 @@ void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) { ...@@ -705,7 +704,6 @@ void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) {
void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) { void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mpeerCheckRedirect(pMsg->thandle)) return;
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL); SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL);
if (pUser == NULL) { if (pUser == NULL) {
...@@ -729,7 +727,6 @@ void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) { ...@@ -729,7 +727,6 @@ void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) {
void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) { void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
if (mpeerCheckRedirect(pMsg->thandle)) return;
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL); SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL);
if (pUser == NULL) { if (pUser == NULL) {
......
...@@ -142,7 +142,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -142,7 +142,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return; return;
} }
if (mpeerCheckRedirect(rpcMsg->handle)) { if (mpeerCheckRedirect()) {
// rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect()); // rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect());
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NO_MASTER); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NO_MASTER);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册