diff --git a/src/dnode/inc/dnodeMClient.h b/src/dnode/inc/dnodeMClient.h index cab9ea9be4de577561b580e0376efcd721003370..ba638946318edcb38f45b0db75767f240ad5f23d 100644 --- a/src/dnode/inc/dnodeMClient.h +++ b/src/dnode/inc/dnodeMClient.h @@ -24,6 +24,7 @@ int32_t dnodeInitMClient(); void dnodeCleanupMClient(); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); uint32_t dnodeGetMnodeMasteIp(); +void * dnodeGetMpeerInfos(); #ifdef __cplusplus } diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 5630872318a83ee4b59159af12e18fc7d7a49281..c09583bd65cc4fec01b62e8dc4295b5dab93bc45 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -15,33 +15,46 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "cJSON.h" #include "taosmsg.h" #include "tlog.h" #include "trpc.h" #include "tutil.h" +#include "tsync.h" #include "dnode.h" #include "dnodeMClient.h" #include "dnodeModule.h" #include "dnodeMgmt.h" +#define MPEER_CONTENT_LEN 2000 + static bool dnodeReadMnodeIpList(); static void dnodeSaveMnodeIpList(); static void dnodeProcessRspFromMnode(SRpcMsg *pMsg); static void dnodeProcessStatusRsp(SRpcMsg *pMsg); static void (*tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *); static void *tsDnodeMClientRpc = NULL; -static SRpcIpSet tsDnodeMnodeIpList = {0}; +static SRpcIpSet tsMpeerIpList = {0}; +static SDMNodeInfos tsMpeerInfos = {0}; int32_t dnodeInitMClient() { if (!dnodeReadMnodeIpList()) { - dTrace("failed to read mnode iplist, set it from cfg file"); - memset(&tsDnodeMnodeIpList, 0, sizeof(SRpcIpSet)); - tsDnodeMnodeIpList.port = tsMnodeDnodePort; - tsDnodeMnodeIpList.numOfIps = 1; - tsDnodeMnodeIpList.ip[0] = inet_addr(tsMasterIp); + memset(&tsMpeerIpList, 0, sizeof(SRpcIpSet)); + memset(&tsMpeerInfos, 0, sizeof(SDMNodeInfos)); + tsMpeerIpList.port = tsMnodeDnodePort; + tsMpeerIpList.numOfIps = 1; + tsMpeerIpList.ip[0] = inet_addr(tsMasterIp); if (tsSecondIp[0]) { - tsDnodeMnodeIpList.numOfIps = 2; - tsDnodeMnodeIpList.ip[1] = inet_addr(tsSecondIp); + tsMpeerIpList.numOfIps = 2; + tsMpeerIpList.ip[1] = inet_addr(tsSecondIp); + } + } else { + SRpcIpSet mgmtIpSet = {0}; + tsMpeerIpList.inUse = tsMpeerInfos.inUse; + tsMpeerIpList.numOfIps = tsMpeerInfos.nodeNum; + tsMpeerIpList.port = tsMpeerInfos.nodeInfos[0].nodePort; + for (int32_t i = 0; i < tsMpeerInfos.nodeNum; i++) { + tsMpeerIpList.ip[i] = tsMpeerInfos.nodeInfos[i].nodeIp; } } @@ -96,23 +109,31 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { } SDMStatusRsp *pStatusRsp = pMsg->pCont; - if (pStatusRsp->ipList.numOfIps <= 0) { - dError("status msg is invalid, num of ips is %d", pStatusRsp->ipList.numOfIps); + SDMNodeInfos *mpeers = &pStatusRsp->mpeers; + if (mpeers->nodeNum <= 0) { + dError("status msg is invalid, num of ips is %d", mpeers->nodeNum); return; } - pStatusRsp->ipList.port = htons(pStatusRsp->ipList.port); - for (int32_t i = 0; i < pStatusRsp->ipList.numOfIps; ++i) { - pStatusRsp->ipList.ip[i] = htonl(pStatusRsp->ipList.ip[i]); + SRpcIpSet mgmtIpSet = {0}; + mgmtIpSet.inUse = mpeers->inUse; + mgmtIpSet.numOfIps = mpeers->nodeNum; + mgmtIpSet.port = htons(mpeers->nodeInfos[0].nodePort); + for (int32_t i = 0; i < mpeers->nodeNum; i++) { + mgmtIpSet.ip[i] = htonl(mpeers->nodeInfos[i].nodeIp); } - //dTrace("status msg is received, result:%s", tstrerror(pMsg->code)); - - if (memcmp(&(pStatusRsp->ipList), &tsDnodeMnodeIpList, sizeof(SRpcIpSet)) != 0) { - dPrint("mnode ip list is changed, numOfIps:%d inUse:%d", pStatusRsp->ipList.numOfIps, pStatusRsp->ipList.inUse); - memcpy(&tsDnodeMnodeIpList, &pStatusRsp->ipList, sizeof(SRpcIpSet)); - for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; ++i) { - dPrint("mnode index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i])); + if (memcmp(&mgmtIpSet, &tsMpeerIpList, sizeof(SRpcIpSet)) != 0) { + memcpy(&tsMpeerIpList, &mgmtIpSet, sizeof(SRpcIpSet)); + memcpy(&tsMpeerInfos, mpeers, sizeof(SDMNodeInfos)); + dPrint("mnode ip list is changed, numOfIps:%d inUse:%d", tsMpeerInfos.nodeNum, tsMpeerInfos.inUse); + for (int32_t i = 0; i < mpeers->nodeNum; i++) { + tsMpeerInfos.nodeInfos[i].nodeId = htonl(mpeers->nodeInfos[i].nodeId); + tsMpeerInfos.nodeInfos[i].nodeIp = htonl(mpeers->nodeInfos[i].nodeIp); + tsMpeerInfos.nodeInfos[i].nodePort = htons(mpeers->nodeInfos[i].nodePort); + dPrint("mnode:%d, ip:%s:%u name:%s", tsMpeerInfos.nodeInfos[i].nodeId, + taosIpStr(tsMpeerInfos.nodeInfos[i].nodeId), tsMpeerInfos.nodeInfos[i].nodePort, + tsMpeerInfos.nodeInfos[i].nodeName); } dnodeSaveMnodeIpList(); } @@ -129,70 +150,148 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { if (tsDnodeMClientRpc) { - rpcSendRequest(tsDnodeMClientRpc, &tsDnodeMnodeIpList, rpcMsg); + rpcSendRequest(tsDnodeMClientRpc, &tsMpeerIpList, rpcMsg); } } static bool dnodeReadMnodeIpList() { char ipFile[TSDB_FILENAME_LEN] = {0}; - sprintf(ipFile, "%s/iplist", tsDnodeDir); - + sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir); FILE *fp = fopen(ipFile, "r"); - if (!fp) return false; - - char option[32] = {0}; - int32_t value = 0; - int32_t num = 0; - - num = fscanf(fp, "%s %d", option, &value); - if (num != 2) return false; - if (strcmp(option, "inUse") != 0) return false; - tsDnodeMnodeIpList.inUse = (int8_t)value;; + if (!fp) { + dTrace("failed to read mnode mgmtIpList.json, file not exist"); + return false; + } - num = fscanf(fp, "%s %d", option, &value); - if (num != 2) return false; - if (strcmp(option, "numOfIps") != 0) return false; - tsDnodeMnodeIpList.numOfIps = (int8_t)value; + bool ret = false; + int maxLen = 2000; + char *content = calloc(1, maxLen + 1); + int len = fread(content, 1, maxLen, fp); + if (len <= 0) { + free(content); + fclose(fp); + dError("failed to read mnode mgmtIpList.json, content is null"); + return false; + } - num = fscanf(fp, "%s %d", option, &value); - if (num != 2) return false; - if (strcmp(option, "port") != 0) return false; - tsDnodeMnodeIpList.port = (uint16_t)value; + cJSON* root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read mnode mgmtIpList.json, invalid json format"); + goto PARSE_OVER; + } - for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; i++) { - num = fscanf(fp, "%s %d", option, &value); - if (num != 2) return false; - if (strncmp(option, "ip", 2) != 0) return false; - tsDnodeMnodeIpList.ip[i] = (uint32_t)value; + cJSON* inUse = cJSON_GetObjectItem(root, "inUse"); + if (!inUse || inUse->type != cJSON_Number) { + dError("failed to read mnode mgmtIpList.json, inUse not found"); + goto PARSE_OVER; } + tsMpeerInfos.inUse = inUse->valueint; - fclose(fp); - dPrint("read mnode iplist successed"); - for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; i++) { - dPrint("mnode index:%d ip:%s", i, taosIpStr(tsDnodeMnodeIpList.ip[i])); - } + cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum"); + if (!nodeNum || nodeNum->type != cJSON_Number) { + dError("failed to read mnode mgmtIpList.json, nodeNum not found"); + goto PARSE_OVER; + } + tsMpeerInfos.nodeNum = nodeNum->valueint; + + cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); + if (!nodeInfos || nodeInfos->type != cJSON_Array) { + dError("failed to read mnode mgmtIpList.json, nodeInfos not found"); + goto PARSE_OVER; + } + + int size = cJSON_GetArraySize(nodeInfos); + if (size != tsMpeerInfos.nodeNum) { + dError("failed to read mnode mgmtIpList.json, nodeInfos size not matched"); + goto PARSE_OVER; + } + + for (int i = 0; i < size; ++i) { + cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i); + if (nodeInfo == NULL) continue; + + cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); + if (!nodeId || nodeId->type != cJSON_Number) { + dError("failed to read mnode mgmtIpList.json, nodeId not found"); + goto PARSE_OVER; + } + tsMpeerInfos.nodeInfos[i].nodeId = nodeId->valueint; - return true; + cJSON *nodeIp = cJSON_GetObjectItem(nodeInfo, "nodeIp"); + if (!nodeIp || nodeIp->type != cJSON_String || nodeIp->valuestring == NULL) { + dError("failed to read mnode mgmtIpList.json, nodeIp not found"); + goto PARSE_OVER; + } + tsMpeerInfos.nodeInfos[i].nodeIp = inet_addr(nodeIp->valuestring); + + cJSON *nodePort = cJSON_GetObjectItem(nodeInfo, "nodePort"); + if (!nodePort || nodePort->type != cJSON_Number) { + dError("failed to read mnode mgmtIpList.json, nodePort not found"); + goto PARSE_OVER; + } + tsMpeerInfos.nodeInfos[i].nodePort = (uint16_t)nodePort->valueint; + + cJSON *nodeName = cJSON_GetObjectItem(nodeInfo, "nodeName"); + if (!nodeIp || nodeName->type != cJSON_String || nodeName->valuestring == NULL) { + dError("failed to read mnode mgmtIpList.json, nodeName not found"); + goto PARSE_OVER; + } + strncpy(tsMpeerInfos.nodeInfos[i].nodeName, nodeName->valuestring, TSDB_NODE_NAME_LEN); + } + + ret = true; + + dPrint("read mnode iplist successed, numOfIps:%d inUse:%d", tsMpeerInfos.nodeNum, tsMpeerInfos.inUse); + for (int32_t i = 0; i < tsMpeerInfos.nodeNum; i++) { + dPrint("mnode:%d, ip:%s:%u name:%s", tsMpeerInfos.nodeInfos[i].nodeId, + taosIpStr(tsMpeerInfos.nodeInfos[i].nodeId), tsMpeerInfos.nodeInfos[i].nodePort, + tsMpeerInfos.nodeInfos[i].nodeName); + } + +PARSE_OVER: + free(content); + fclose(fp); + return ret; } static void dnodeSaveMnodeIpList() { char ipFile[TSDB_FILENAME_LEN] = {0}; - sprintf(ipFile, "%s/iplist", tsDnodeDir); - + sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir); FILE *fp = fopen(ipFile, "w"); if (!fp) return; - fprintf(fp, "inUse %d\n", tsDnodeMnodeIpList.inUse); - fprintf(fp, "numOfIps %d\n", tsDnodeMnodeIpList.numOfIps); - fprintf(fp, "port %u\n", tsDnodeMnodeIpList.port); - for (int32_t i = 0; i < tsDnodeMnodeIpList.numOfIps; i++) { - fprintf(fp, "ip%d %u\n", i, tsDnodeMnodeIpList.ip[i]); + int32_t len = 0; + int32_t maxLen = 2000; + char * content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsMpeerInfos.inUse); + len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsMpeerInfos.nodeNum); + len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); + for (int32_t i = 0; i < tsMpeerInfos.nodeNum; i++) { + len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMpeerInfos.nodeInfos[i].nodeId); + len += snprintf(content + len, maxLen - len, " \"nodeIp\": \"%s\",\n", taosIpStr(tsMpeerInfos.nodeInfos[i].nodeIp)); + len += snprintf(content + len, maxLen - len, " \"nodePort\": %u,\n", tsMpeerInfos.nodeInfos[i].nodePort); + len += snprintf(content + len, maxLen - len, " \"nodeName\": \"%s\"\n", tsMpeerInfos.nodeInfos[i].nodeName); + if (i < tsMpeerInfos.nodeNum -1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } } - + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); fclose(fp); + free(content); + dPrint("save mnode iplist successed"); } uint32_t dnodeGetMnodeMasteIp() { - return tsDnodeMnodeIpList.ip[0]; + return tsMpeerIpList.ip[tsMpeerIpList.inUse]; +} + +void* dnodeGetMpeerInfos() { + return &tsMpeerInfos; } \ No newline at end of file diff --git a/src/inc/mnode.h b/src/inc/mnode.h index b19e91a90e886ed8c5d00866dbf7204163246dad..f2c072453f8d552ba93486948e177d9c6773de83 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -52,8 +52,10 @@ typedef struct _mnode_obj { int32_t refCount; int8_t role; int8_t status; + uint16_t port; uint32_t privateIp; uint32_t publicIp; + char mnodeName[TSDB_NODE_NAME_LEN + 1]; } SMnodeObj; typedef struct _dnode_obj { diff --git a/src/inc/mpeer.h b/src/inc/mpeer.h index 7007f19226381dfbb5955094070954c497c815a9..157ea40119963062da7b8ba1070c8e2034f0d051 100644 --- a/src/inc/mpeer.h +++ b/src/inc/mpeer.h @@ -20,28 +20,30 @@ extern "C" { #endif -enum _TSDB_MN_STATUS { - TSDB_MN_STATUS_OFFLINE, - TSDB_MN_STATUS_UNSYNCED, - TSDB_MN_STATUS_SYNCING, - TSDB_MN_STATUS_SERVING -}; +struct _mnode_obj; -enum _TSDB_MN_ROLE { - TSDB_MN_ROLE_UNDECIDED, - TSDB_MN_ROLE_SLAVE, - TSDB_MN_ROLE_MASTER +enum _TAOS_MN_STATUS { + TAOS_MN_STATUS_OFFLINE, + TAOS_MN_STATUS_DROPPING, + TAOS_MN_STATUS_READY }; int32_t mpeerInit(); void mpeerCleanup(); +int32_t mpeerGetMnodesNum(); +void * mpeerGetNextMnode(void *pNode, struct _mnode_obj **pMnode); +void mpeerReleaseMnode(struct _mnode_obj *pMnode); bool mpeerInServerStatus(); bool mpeerIsMaster(); +bool mpeerCheckRedirect(); -bool mpeerCheckRedirect(void *handle); void mpeerGetPrivateIpList(SRpcIpSet *ipSet); void mpeerGetPublicIpList(SRpcIpSet *ipSet); +void mpeerGetMpeerInfos(void *mpeers); + +char * mpeerGetMnodeStatusStr(int32_t status); +char * mpeerGetMnodeRoleStr(int32_t role); #ifdef __cplusplus } diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 09d306ffad6f7cdced85923fa55e3c2d03b74061..5378bf4bae181550938c43acb7657ac3c2860206 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -593,7 +593,20 @@ typedef struct { } SDMStatusMsg; typedef struct { - SRpcIpSet ipList; + int32_t nodeId; + uint32_t nodeIp; + uint16_t nodePort; + char nodeName[TSDB_NODE_NAME_LEN + 1]; +} SDMNodeInfo; + +typedef struct { + int8_t inUse; + int8_t nodeNum; + SDMNodeInfo nodeInfos[TSDB_MAX_MPEERS]; +} SDMNodeInfos; + +typedef struct { + SDMNodeInfos mpeers; SDnodeState dnodeState; SVnodeAccess vnodeAccess[]; } SDMStatusRsp; diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 0f18de32532c2cde3e161e93665f533a186b535d..b1931347a7f5f39b557e9b82210e35df0a4f1f66 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -678,8 +678,6 @@ static int32_t mgmtSetDbDropping(SDbObj *pDb) { } static void mgmtProcessCreateDbMsg(SQueuedMsg *pMsg) { - if (mpeerCheckRedirect(pMsg->thandle)) return; - SCMCreateDbMsg *pCreate = pMsg->pCont; pCreate->maxSessions = htonl(pCreate->maxSessions); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 1d32ded088ebf79668f36deb589e94e6b0d2ae45..51ac4e842dd8309667625218d020374f97059769 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -141,8 +141,6 @@ static void clusterProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) { } void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { - if (mpeerCheckRedirect(rpcMsg->handle)) return; - SDMStatusMsg *pStatus = rpcMsg->pCont; pStatus->dnodeId = htonl(pStatus->dnodeId); pStatus->privateIp = htonl(pStatus->privateIp); @@ -221,7 +219,7 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { return; } - mpeerGetPrivateIpList(&pRsp->ipList); + mpeerGetMpeerInfos(&pRsp->mpeers); pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId); pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus); diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index c349969019688b081dd21e66a9d4e794fc0c209a..82da45479380d04f5ea089c7ca32af988068aa10 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -17,41 +17,36 @@ #include "os.h" #include "taoserror.h" #include "trpc.h" +#include "tsync.h" #include "mpeer.h" #include "mgmtSdb.h" #include "mgmtShell.h" #include "mgmtUser.h" -#ifndef _MPEER - -static SMnodeObj tsMnodeObj = {0}; +extern int32_t mpeerInitMnodes(); +extern void mpeerCleanupMnodes(); static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); -int32_t mpeerInit() { - mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta); - mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes); +#ifndef _MPEER +static SMnodeObj tsMnodeObj = {0}; + +int32_t mpeerInitMnodes() { tsMnodeObj.mnodeId = 1; + tsMnodeObj.dnodeId = 1; tsMnodeObj.privateIp = inet_addr(tsPrivateIp); tsMnodeObj.publicIp = inet_addr(tsPublicIp); tsMnodeObj.createdTime = taosGetTimestampMs(); - tsMnodeObj.role = TSDB_MN_ROLE_MASTER; - tsMnodeObj.status = TSDB_MN_STATUS_SERVING; + tsMnodeObj.role = TAOS_SYNC_ROLE_MASTER; + tsMnodeObj.status = TAOS_MN_STATUS_READY; + tsMnodeObj.port = tsMnodeDnodePort; + sprintf(tsMnodeObj.mnodeName, "m%d", tsMnodeObj.mnodeId); return TSDB_CODE_SUCCESS; } -void mpeerCleanup() {} -bool mpeerInServerStatus() { return tsMnodeObj.status == TSDB_MN_STATUS_SERVING; } -bool mpeerIsMaster() { return tsMnodeObj.role == TSDB_MN_ROLE_MASTER; } -bool mpeerCheckRedirect(void *thandle) { return false; } - -static int32_t mgmtGetMnodesNum() { - return 1; -} - -static void *mgmtGetNextMnode(void *pNode, SMnodeObj **pMnode) { +void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) { if (*pMnode == NULL) { *pMnode = &tsMnodeObj; } else { @@ -61,22 +56,74 @@ static void *mgmtGetNextMnode(void *pNode, SMnodeObj **pMnode) { return *pMnode; } -char *taosGetMnodeStatusStr(int32_t mnodeStatus) { - switch (mnodeStatus) { - case TSDB_MN_STATUS_OFFLINE: return "offline"; - case TSDB_MN_STATUS_UNSYNCED: return "unsynced"; - case TSDB_MN_STATUS_SYNCING: return "syncing"; - case TSDB_MN_STATUS_SERVING: return "serving"; - default: return "undefined"; +void mpeerGetPrivateIpList(SRpcIpSet *ipSet) { + ipSet->inUse = 0; + ipSet->port = htons(tsMnodeDnodePort); + ipSet->numOfIps = 1; + ipSet->ip[0] = htonl(tsMnodeObj.privateIp); +} + +void mpeerGetPublicIpList(SRpcIpSet *ipSet) { + ipSet->inUse = 0; + ipSet->port = htons(tsMnodeDnodePort); + ipSet->numOfIps = 1; + ipSet->ip[0] = htonl(tsMnodeObj.publicIp); +} + +void mpeerGetMpeerInfos(void *param) { + SDMNodeInfos *mpeers = param; + mpeers->nodeNum = 1; + mpeers->nodeInfos[0].nodeId = htonl(tsMnodeObj.mnodeId); + mpeers->nodeInfos[0].nodeIp = htonl(tsMnodeObj.privateIp); + mpeers->nodeInfos[0].nodePort = htons(tsMnodeObj.port); + strcpy(mpeers->nodeInfos[0].nodeName, tsMnodeObj.mnodeName); +} + +void mpeerCleanupDnodes() {} +int32_t mpeerGetMnodesNum() { return 1; } +void mpeerReleaseMnode(struct _mnode_obj *pMnode) {} +bool mpeerInServerStatus() { return tsMnodeObj.status == TAOS_MN_STATUS_READY; } +bool mpeerIsMaster() { return tsMnodeObj.role == TAOS_SYNC_ROLE_MASTER; } +bool mpeerCheckRedirect() { return false; } + +#endif + +int32_t mpeerInit() { + mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta); + mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes); + + return mpeerInitMnodes(); +} + +void mpeerCleanup() { + mpeerCleanupDnodes(); +} + +char *mpeerGetMnodeStatusStr(int32_t status) { + switch (status) { + case TAOS_MN_STATUS_OFFLINE: + return "offline"; + case TAOS_MN_STATUS_DROPPING: + return "dropping"; + case TAOS_MN_STATUS_READY: + return "ready"; + default: + return "undefined"; } } -char *taosGetMnodeRoleStr(int32_t mnodeRole) { - switch (mnodeRole) { - case TSDB_MN_ROLE_UNDECIDED: return "undicided"; - case TSDB_MN_ROLE_SLAVE: return "slave"; - case TSDB_MN_ROLE_MASTER: return "master"; - default: return "undefined"; +char *mpeerGetMnodeRoleStr(int32_t role) { + switch (role) { + case TAOS_SYNC_ROLE_OFFLINE: + return "offline"; + case TAOS_SYNC_ROLE_UNSYNCED: + return "unsynced"; + case TAOS_SYNC_ROLE_SLAVE: + return "slave"; + case TAOS_SYNC_ROLE_MASTER: + return "master"; + default: + return "undefined"; } } @@ -133,7 +180,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; } - pShow->numOfRows = mgmtGetMnodesNum(); + pShow->numOfRows = mpeerGetMnodesNum(); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->pNode = NULL; mgmtReleaseUser(pUser); @@ -149,7 +196,7 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi char ipstr[32]; while (numOfRows < rows) { - pShow->pNode = mgmtGetNextMnode(pShow->pNode, (SMnodeObj **)&pMnode); + pShow->pNode = mpeerGetNextMnode(pShow->pNode, &pMnode); if (pMnode == NULL) break; cols = 0; @@ -173,11 +220,11 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, taosGetMnodeStatusStr(pMnode->status)); + strcpy(pWrite, mpeerGetMnodeStatusStr(pMnode->status)); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, taosGetMnodeRoleStr(pMnode->role)); + strcpy(pWrite, mpeerGetMnodeRoleStr(pMnode->role)); cols++; numOfRows++; @@ -186,20 +233,4 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi pShow->numOfReads += numOfRows; return numOfRows; -} - -void mpeerGetPrivateIpList(SRpcIpSet *ipSet) { - ipSet->inUse = 0; - ipSet->port = htons(tsMnodeDnodePort); - ipSet->numOfIps = 1; - ipSet->ip[0] = htonl(tsMnodeObj.privateIp); -} - -void mpeerGetPublicIpList(SRpcIpSet *ipSet) { - ipSet->inUse = 0; - ipSet->port = htons(tsMnodeDnodePort); - ipSet->numOfIps = 1; - ipSet->ip[0] = htonl(tsMnodeObj.publicIp); -} - -#endif \ No newline at end of file +} \ No newline at end of file diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index 36481b81b090c4b5c54929f6c999c50e41beba93..f7dec4656b668769325273eadd7d2572a049c7b7 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -681,8 +681,7 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mpeerCheckRedirect(pMsg->thandle)) return; - + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; @@ -705,8 +704,7 @@ void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) { void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mpeerCheckRedirect(pMsg->thandle)) return; - + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; @@ -729,8 +727,7 @@ void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) { void mgmtProcessKillConnectionMsg(SQueuedMsg *pMsg) { SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; - if (mpeerCheckRedirect(pMsg->thandle)) return; - + SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle, NULL); if (pUser == NULL) { rpcRsp.code = TSDB_CODE_INVALID_USER; diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index eb20225e95693080a085e909219d43ae482ad71f..dbd7627d3f1a81c41e9341c0daef954911445e43 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -142,7 +142,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { return; } - if (mpeerCheckRedirect(rpcMsg->handle)) { + if (mpeerCheckRedirect()) { // rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect()); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NO_MASTER); rpcFreeCont(rpcMsg->pCont); diff --git a/src/plugins/http/inc/cJSON.h b/src/util/inc/cJSON.h similarity index 100% rename from src/plugins/http/inc/cJSON.h rename to src/util/inc/cJSON.h diff --git a/src/plugins/http/src/cJSON.c b/src/util/src/cJSON.c similarity index 100% rename from src/plugins/http/src/cJSON.c rename to src/util/src/cJSON.c