提交 0f202d9f 编写于 作者: S slguan

rearrage some codes

上级 fe12d0fe
CMAKE_MINIMUM_REQUIRED(VERSION 2.8) CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
PROJECT(TDengine) PROJECT(TDengine)
IF (TD_CLUSTER) IF (TD_SYNC)
ADD_DEFINITIONS(-D_CLUSTER) ADD_DEFINITIONS(-D_SYNC)
ENDIF ()
IF (TD_MPEER)
ADD_DEFINITIONS(-D_MPEER)
ENDIF ()
IF (TD_VPEER)
ADD_DEFINITIONS(-D_VPEER)
#ADD_DEFINITIONS(-DTSDB_REPLICA_MAX_NUM=3)
ELSE ()
#ADD_DEFINITIONS(-DTSDB_REPLICA_MAX_NUM=1)
ENDIF () ENDIF ()
IF (TD_ACCOUNT) IF (TD_ACCOUNT)
ADD_DEFINITIONS(-D_ACCOUNT) ADD_DEFINITIONS(-D_ACCOUNT)
ENDIF () ENDIF ()
IF (TD_ADMIN)
ADD_DEFINITIONS(-D_ADMIN)
ENDIF ()
IF (TD_GRANT) IF (TD_GRANT)
ADD_DEFINITIONS(-D_GRANT) ADD_DEFINITIONS(-D_GRANT)
ENDIF () ENDIF ()
......
...@@ -21,20 +21,13 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -21,20 +21,13 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
IF (TD_ACCOUNT) IF (TD_ACCOUNT)
TARGET_LINK_LIBRARIES(taosd account) TARGET_LINK_LIBRARIES(taosd account)
ENDIF () ENDIF ()
IF (TD_GRANT) IF (TD_GRANT)
TARGET_LINK_LIBRARIES(taosd grant) TARGET_LINK_LIBRARIES(taosd grant)
ENDIF () ENDIF ()
IF (TD_CLUSTER) IF (TD_SYNC)
TARGET_LINK_LIBRARIES(taosd cluster) TARGET_LINK_LIBRARIES(taosd replica sync)
ENDIF ()
IF (TD_VPEER)
TARGET_LINK_LIBRARIES(taosd balance sync)
ENDIF ()
IF (TD_MPEER)
TARGET_LINK_LIBRARIES(taosd mpeer sync)
ENDIF () ENDIF ()
SET(PREPARE_ENV_CMD "prepare_env_cmd") SET(PREPARE_ENV_CMD "prepare_env_cmd")
......
...@@ -23,12 +23,12 @@ ...@@ -23,12 +23,12 @@
#include "tsync.h" #include "tsync.h"
#include "ttime.h" #include "ttime.h"
#include "ttimer.h" #include "ttimer.h"
#include "treplica.h"
#include "dnode.h" #include "dnode.h"
#include "dnodeMClient.h" #include "dnodeMClient.h"
#include "dnodeModule.h" #include "dnodeModule.h"
#include "dnodeMgmt.h" #include "dnodeMgmt.h"
#include "vnode.h" #include "vnode.h"
#include "mpeer.h"
#define MPEER_CONTENT_LEN 2000 #define MPEER_CONTENT_LEN 2000
...@@ -181,7 +181,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { ...@@ -181,7 +181,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
tsMnodeInfos.nodeInfos[i].nodeName); tsMnodeInfos.nodeInfos[i].nodeName);
} }
dnodeSaveMnodeIpList(); dnodeSaveMnodeIpList();
mpeerUpdateSync(); replicaNotify();
} }
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
......
...@@ -43,19 +43,6 @@ struct _acct_obj; ...@@ -43,19 +43,6 @@ struct _acct_obj;
struct _user_obj; struct _user_obj;
struct _mnode_obj; struct _mnode_obj;
typedef struct _mnode_obj {
int32_t mnodeId;
int64_t createdTime;
int8_t reserved[14];
int8_t updateEnd[1];
int32_t refCount;
uint32_t privateIp;
uint32_t publicIp;
uint16_t port;
int8_t role;
char mnodeName[TSDB_NODE_NAME_LEN + 1];
} SMnodeObj;
typedef struct _dnode_obj { typedef struct _dnode_obj {
int32_t dnodeId; int32_t dnodeId;
uint32_t privateIp; uint32_t privateIp;
...@@ -88,6 +75,17 @@ typedef struct _dnode_obj { ...@@ -88,6 +75,17 @@ typedef struct _dnode_obj {
int16_t bandwidthUsage; // calc from sys.band int16_t bandwidthUsage; // calc from sys.band
} SDnodeObj; } SDnodeObj;
typedef struct _mnode_obj {
int32_t mnodeId;
int64_t createdTime;
int8_t reserved[14];
int8_t updateEnd[1];
int32_t refCount;
int8_t role;
SDnodeObj *pDnode;
} SMnodeObj;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
uint32_t privateIp; uint32_t privateIp;
......
...@@ -13,27 +13,23 @@ ...@@ -13,27 +13,23 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_BALANCE_H #ifndef TDENGINE_REPLICA_H
#define TDENGINE_BALANCE_H #define TDENGINE_REPLICA_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
struct _db_obj;
struct _vg_obj; struct _vg_obj;
struct _dnode_obj; struct _dnode_obj;
int32_t balanceInit(); int32_t replicaInit();
void balanceCleanUp(); void replicaCleanUp();
void balanceNotify(); void replicaNotify();
void balanceReset(); void replicaReset();
int32_t balanceAllocVnodes(struct _vg_obj *pVgroup); int32_t replicaAllocVnodes(struct _vg_obj *pVgroup);
int32_t balanceDropDnode(struct _dnode_obj *pDnode); int32_t replicaForwardReqToPeer(void *pHead);
int32_t replicaDropDnode(struct _dnode_obj *pDnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_MPEER_H #ifndef TDENGINE_MGMT_MNODE_H
#define TDENGINE_MPEER_H #define TDENGINE_MGMT_MNODE_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -28,29 +28,21 @@ enum _TAOS_MN_STATUS { ...@@ -28,29 +28,21 @@ enum _TAOS_MN_STATUS {
TAOS_MN_STATUS_READY TAOS_MN_STATUS_READY
}; };
// general implementation int32_t mgmtInitMnodes();
int32_t mpeerInit(); void mgmtCleanupMnodes();
void mpeerCleanup();
// special implementation int32_t mgmtAddMnode(int32_t dnodeId);
int32_t mpeerInitMnodes(); int32_t mgmtDropMnode(int32_t dnodeId);
void mpeerCleanupMnodes();
int32_t mpeerAddMnode(int32_t dnodeId);
int32_t mpeerRemoveMnode(int32_t dnodeId);
void * mpeerGetMnode(int32_t mnodeId); void * mgmtGetMnode(int32_t mnodeId);
int32_t mpeerGetMnodesNum(); int32_t mgmtGetMnodesNum();
void * mpeerGetNextMnode(void *pNode, struct _mnode_obj **pMnode); void * mgmtGetNextMnode(void *pNode, struct _mnode_obj **pMnode);
void mpeerReleaseMnode(struct _mnode_obj *pMnode); void mgmtReleaseMnode(struct _mnode_obj *pMnode);
bool mpeerIsMaster(); bool mgmtIsMaster();
void mpeerGetPrivateIpList(SRpcIpSet *ipSet); void mgmtGetMnodeIpList(SRpcIpSet *ipSet, bool usePublicIp);
void mpeerGetPublicIpList(SRpcIpSet *ipSet); void mgmtGetMnodeList(void *mpeers);
void mpeerGetMpeerInfos(void *mpeers);
int32_t mpeerForwardReqToPeer(void *pHead);
void mpeerUpdateSync();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include "tutil.h" #include "tutil.h"
#include "dnode.h" #include "dnode.h"
#include "mnode.h" #include "mnode.h"
#include "tbalance.h" #include "mgmtMnode.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "tgrant.h" #include "tgrant.h"
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
#include "tutil.h" #include "tutil.h"
#include "dnode.h" #include "dnode.h"
#include "mnode.h" #include "mnode.h"
#include "tbalance.h" #include "treplica.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDServer.h" #include "mgmtDServer.h"
#include "tgrant.h" #include "tgrant.h"
......
...@@ -19,12 +19,11 @@ ...@@ -19,12 +19,11 @@
#include "tutil.h" #include "tutil.h"
#include "name.h" #include "name.h"
#include "mnode.h" #include "mnode.h"
#include "tbalance.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "tgrant.h" #include "tgrant.h"
#include "mpeer.h" #include "mgmtMnode.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
......
...@@ -16,13 +16,13 @@ ...@@ -16,13 +16,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tmodule.h" #include "tmodule.h"
#include "tbalance.h"
#include "tgrant.h" #include "tgrant.h"
#include "mgmtDnode.h" #include "treplica.h"
#include "mnode.h" #include "mnode.h"
#include "mpeer.h"
#include "mgmtDClient.h" #include "mgmtDClient.h"
#include "mgmtDServer.h" #include "mgmtDServer.h"
#include "mgmtDnode.h"
#include "mgmtMnode.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtUser.h" #include "mgmtUser.h"
...@@ -119,13 +119,15 @@ static int32_t mgmtDnodeActionDecode(SSdbOperDesc *pOper) { ...@@ -119,13 +119,15 @@ static int32_t mgmtDnodeActionDecode(SSdbOperDesc *pOper) {
static int32_t mgmtDnodeActionRestored() { static int32_t mgmtDnodeActionRestored() {
int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb); int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb);
if (numOfRows <= 0) { if (numOfRows <= 0 && strcmp(tsMasterIp, tsPrivateIp) == 0) {
if (strcmp(tsMasterIp, tsPrivateIp) == 0) { uint32_t ip = inet_addr(tsPrivateIp);
mgmtCreateDnode(inet_addr(tsPrivateIp)); mgmtCreateDnode(ip);
} SDnodeObj *pDnode = mgmtGetDnodeByIp(ip);
mgmtAddMnode(pDnode->dnodeId);
mgmtReleaseDnode(pDnode);
} }
return 0; return TSDB_CODE_SUCCESS;
} }
int32_t mgmtInitDnodes() { int32_t mgmtInitDnodes() {
...@@ -326,7 +328,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -326,7 +328,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) { if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
mTrace("dnode:%d, from offline to online", pDnode->dnodeId); mTrace("dnode:%d, from offline to online", pDnode->dnodeId);
pDnode->status = TAOS_DN_STATUS_READY; pDnode->status = TAOS_DN_STATUS_READY;
balanceNotify(); replicaNotify();
mgmtMonitorDnodeModule(); mgmtMonitorDnodeModule();
} }
...@@ -339,7 +341,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { ...@@ -339,7 +341,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
return; return;
} }
mpeerGetMpeerInfos(&pRsp->mpeers); mgmtGetMnodeList(&pRsp->mpeers);
pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId); pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId);
pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus); pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus);
...@@ -417,7 +419,7 @@ int32_t mgmtDropDnode(SDnodeObj *pDnode) { ...@@ -417,7 +419,7 @@ int32_t mgmtDropDnode(SDnodeObj *pDnode) {
return code; return code;
} }
static int32_t clusterDropDnodeByIp(uint32_t ip) { static int32_t mgmtDropDnodeByIp(uint32_t ip) {
SDnodeObj *pDnode = mgmtGetDnodeByIp(ip); SDnodeObj *pDnode = mgmtGetDnodeByIp(ip);
if (pDnode == NULL) { if (pDnode == NULL) {
mError("dnode:%s, is not exist", taosIpStr(ip)); mError("dnode:%s, is not exist", taosIpStr(ip));
...@@ -465,7 +467,7 @@ static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg) { ...@@ -465,7 +467,7 @@ static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg) {
rpcRsp.code = TSDB_CODE_NO_RIGHTS; rpcRsp.code = TSDB_CODE_NO_RIGHTS;
} else { } else {
uint32_t ip = inet_addr(pDrop->ip); uint32_t ip = inet_addr(pDrop->ip);
rpcRsp.code = clusterDropDnodeByIp(ip); rpcRsp.code = mgmtDropDnodeByIp(ip);
if (rpcRsp.code == TSDB_CODE_SUCCESS) { if (rpcRsp.code == TSDB_CODE_SUCCESS) {
mLPrint("dnode:%s is dropped by %s", pDrop->ip, pMsg->pUser->user); mLPrint("dnode:%s is dropped by %s", pDrop->ip, pMsg->pUser->user);
} else { } else {
...@@ -709,7 +711,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo ...@@ -709,7 +711,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo
return numOfRows; return numOfRows;
} }
static bool clusterCheckConfigShow(SGlobalConfig *cfg) { static bool mgmtCheckConfigShow(SGlobalConfig *cfg) {
if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_SHOW)) if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_SHOW))
return false; return false;
return true; return true;
...@@ -746,7 +748,7 @@ static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC ...@@ -746,7 +748,7 @@ static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow->numOfRows = 0; pShow->numOfRows = 0;
for (int32_t i = tsGlobalConfigNum - 1; i >= 0; --i) { for (int32_t i = tsGlobalConfigNum - 1; i >= 0; --i) {
SGlobalConfig *cfg = tsGlobalConfig + i; SGlobalConfig *cfg = tsGlobalConfig + i;
if (!clusterCheckConfigShow(cfg)) continue; if (!mgmtCheckConfigShow(cfg)) continue;
pShow->numOfRows++; pShow->numOfRows++;
} }
...@@ -762,7 +764,7 @@ static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, vo ...@@ -762,7 +764,7 @@ static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, vo
for (int32_t i = tsGlobalConfigNum - 1; i >= 0 && numOfRows < rows; --i) { for (int32_t i = tsGlobalConfigNum - 1; i >= 0 && numOfRows < rows; --i) {
SGlobalConfig *cfg = tsGlobalConfig + i; SGlobalConfig *cfg = tsGlobalConfig + i;
if (!clusterCheckConfigShow(cfg)) continue; if (!mgmtCheckConfigShow(cfg)) continue;
char *pWrite; char *pWrite;
int32_t cols = 0; int32_t cols = 0;
...@@ -924,7 +926,7 @@ static void clusterSetModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { ...@@ -924,7 +926,7 @@ static void clusterSetModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
mgmtUpdateDnode(pDnode); mgmtUpdateDnode(pDnode);
if (moduleType == TSDB_MOD_MGMT) { if (moduleType == TSDB_MOD_MGMT) {
mpeerAddMnode(pDnode->dnodeId); mgmtAddMnode(pDnode->dnodeId);
mPrint("dnode:%d, add it into mnode list", pDnode->dnodeId); mPrint("dnode:%d, add it into mnode list", pDnode->dnodeId);
} }
} }
...@@ -934,7 +936,7 @@ static void clusterUnSetModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { ...@@ -934,7 +936,7 @@ static void clusterUnSetModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
mgmtUpdateDnode(pDnode); mgmtUpdateDnode(pDnode);
if (moduleType == TSDB_MOD_MGMT) { if (moduleType == TSDB_MOD_MGMT) {
mpeerRemoveMnode(pDnode->dnodeId); mgmtDropMnode(pDnode->dnodeId);
mPrint("dnode:%d, remove it from mnode list", pDnode->dnodeId); mPrint("dnode:%d, remove it from mnode list", pDnode->dnodeId);
} }
} }
......
...@@ -20,10 +20,10 @@ ...@@ -20,10 +20,10 @@
#include "tsched.h" #include "tsched.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
#include "tbalance.h" #include "treplica.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "tgrant.h" #include "tgrant.h"
#include "mpeer.h" #include "mgmtMnode.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDClient.h" #include "mgmtDClient.h"
#include "mgmtDServer.h" #include "mgmtDServer.h"
...@@ -109,7 +109,7 @@ int32_t mgmtStartSystem() { ...@@ -109,7 +109,7 @@ int32_t mgmtStartSystem() {
return -1; return -1;
} }
if (mpeerInit() < 0) { if (mgmtInitMnodes() < 0) {
mError("failed to init mpeers"); mError("failed to init mpeers");
return -1; return -1;
} }
...@@ -127,7 +127,7 @@ int32_t mgmtStartSystem() { ...@@ -127,7 +127,7 @@ int32_t mgmtStartSystem() {
return -1; return -1;
} }
if (balanceInit() < 0) { if (replicaInit() < 0) {
mError("failed to init dnode balance") mError("failed to init dnode balance")
} }
...@@ -140,7 +140,7 @@ int32_t mgmtStartSystem() { ...@@ -140,7 +140,7 @@ int32_t mgmtStartSystem() {
void mgmtStopSystem() { void mgmtStopSystem() {
if (mpeerIsMaster()) { if (mgmtIsMaster()) {
mTrace("it is a master mgmt node, it could not be stopped"); mTrace("it is a master mgmt node, it could not be stopped");
return; return;
} }
...@@ -152,8 +152,8 @@ void mgmtStopSystem() { ...@@ -152,8 +152,8 @@ void mgmtStopSystem() {
void mgmtCleanUpSystem() { void mgmtCleanUpSystem() {
mPrint("starting to clean up mgmt"); mPrint("starting to clean up mgmt");
grantCleanUp(); grantCleanUp();
mpeerCleanup(); mgmtCleanupMnodes();
balanceCleanUp(); replicaCleanUp();
mgmtCleanUpShell(); mgmtCleanUpShell();
mgmtCleanupDClient(); mgmtCleanupDClient();
mgmtCleanupDServer(); mgmtCleanupDServer();
......
...@@ -16,91 +16,129 @@ ...@@ -16,91 +16,129 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tmodule.h"
#include "trpc.h" #include "trpc.h"
#include "tsync.h" #include "tsync.h"
#include "mpeer.h" #include "treplica.h"
#include "mnode.h"
#include "mgmtMnode.h"
#include "mgmtDnode.h"
#include "mgmtSdb.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtUser.h" #include "mgmtUser.h"
static void * tsMnodeSdb = NULL;
static int32_t tsMnodeUpdateSize = 0;
static int32_t tsMnodeIsMaster = true;
static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
#ifndef _MPEER static int32_t mgmtMnodeActionDestroy(SSdbOperDesc *pOper) {
tfree(pOper->pObj);
static SMnodeObj tsMnodeObj = {0}; return TSDB_CODE_SUCCESS;
}
int32_t mpeerInitMnodes() { static int32_t mgmtMnodeActionInsert(SSdbOperDesc *pOper) {
tsMnodeObj.mnodeId = 1; SMnodeObj *pMnode = pOper->pObj;
tsMnodeObj.privateIp = inet_addr(tsPrivateIp); SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId);
tsMnodeObj.publicIp = inet_addr(tsPublicIp); if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST;
tsMnodeObj.createdTime = taosGetTimestampMs(); pMnode->pDnode = pDnode;
tsMnodeObj.role = TAOS_SYNC_ROLE_MASTER; mgmtReleaseDnode(pDnode);
tsMnodeObj.port = tsMnodeDnodePort;
sprintf(tsMnodeObj.mnodeName, "m%d", tsMnodeObj.mnodeId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void mpeerCleanupMnodes() {} static int32_t mgmtMnodeActionDelete(SSdbOperDesc *pOper) {
int32_t mpeerAddMnode(int32_t dnodeId) { return TSDB_CODE_SUCCESS; } SMnodeObj *pMnode = pOper->pObj;
int32_t mpeerRemoveMnode(int32_t dnodeId) { return TSDB_CODE_SUCCESS; } mTrace("mnode:%d, is dropped from sdb", pMnode->mnodeId);
void * mpeerGetMnode(int32_t mnodeId) { return &tsMnodeObj; } return TSDB_CODE_SUCCESS;
int32_t mpeerGetMnodesNum() { return 1; } }
void mpeerReleaseMnode(struct _mnode_obj *pMnode) {}
bool mpeerIsMaster() { return tsMnodeObj.role == TAOS_SYNC_ROLE_MASTER; }
void mpeerUpdateSync() {}
void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) { static int32_t mgmtMnodeActionUpdate(SSdbOperDesc *pOper) {
if (*pMnode == NULL) { SMnodeObj *pMnode = pOper->pObj;
*pMnode = &tsMnodeObj; SMnodeObj *pSaved = mgmtGetMnode(pMnode->mnodeId);
} else { if (pMnode != pSaved) {
*pMnode = NULL; memcpy(pSaved, pMnode, pOper->rowSize);
free(pMnode);
} }
return *pMnode; return TSDB_CODE_SUCCESS;
} }
void mpeerGetPrivateIpList(SRpcIpSet *ipSet) { static int32_t mgmtMnodeActionEncode(SSdbOperDesc *pOper) {
ipSet->inUse = 0; SMnodeObj *pMnode = pOper->pObj;
ipSet->numOfIps = 1; memcpy(pOper->rowData, pMnode, tsMnodeUpdateSize);
ipSet->port = htons(tsMnodeObj.port); pOper->rowSize = tsMnodeUpdateSize;
ipSet->ip[0] = htonl(tsMnodeObj.privateIp); return TSDB_CODE_SUCCESS;
} }
void mpeerGetPublicIpList(SRpcIpSet *ipSet) { static int32_t mgmtMnodeActionDecode(SSdbOperDesc *pOper) {
ipSet->inUse = 0; SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
ipSet->numOfIps = 1; if (pMnode == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
ipSet->port = htons(tsMnodeObj.port);
ipSet->ip[0] = htonl(tsMnodeObj.publicIp);
}
void mpeerGetMpeerInfos(void *param) { memcpy(pMnode, pOper->rowData, tsMnodeUpdateSize);
SDMNodeInfos *mpeers = param; pOper->pObj = pMnode;
mpeers->inUse = 0; return TSDB_CODE_SUCCESS;
mpeers->nodeNum = 1;
mpeers->nodeInfos[0].nodeId = htonl(tsMnodeObj.mnodeId);
mpeers->nodeInfos[0].nodeIp = htonl(tsMnodeObj.privateIp);
mpeers->nodeInfos[0].nodePort = htons(tsMnodeObj.port);
strcpy(mpeers->nodeInfos[0].nodeName, tsMnodeObj.mnodeName);
} }
int32_t mpeerForwardReqToPeer(void *pHead) { static int32_t mgmtMnodeActionRestored() {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif int32_t mgmtInitMnodes() {
SMnodeObj tObj;
tsMnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj;
SSdbTableDesc tableDesc = {
.tableId = SDB_TABLE_MNODE,
.tableName = "mnodes",
.hashSessions = TSDB_MAX_MNODES,
.maxRowSize = tsMnodeUpdateSize,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_INT,
.insertFp = mgmtMnodeActionInsert,
.deleteFp = mgmtMnodeActionDelete,
.updateFp = mgmtMnodeActionUpdate,
.encodeFp = mgmtMnodeActionEncode,
.decodeFp = mgmtMnodeActionDecode,
.destroyFp = mgmtMnodeActionDestroy,
.restoredFp = mgmtMnodeActionRestored
};
tsMnodeSdb = sdbOpenTable(&tableDesc);
if (tsMnodeSdb == NULL) {
mError("failed to init mnodes data");
return -1;
}
int32_t mpeerInit() {
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta); mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes);
return mpeerInitMnodes();
mTrace("mnodes table is created");
return TSDB_CODE_SUCCESS;
}
void mgmtCleanupMnodes() {
sdbCloseTable(tsMnodeSdb);
}
int32_t mgmtGetMnodesNum() {
return sdbGetNumOfRows(tsMnodeSdb);
}
void *mgmtGetMnode(int32_t mnodeId) {
return sdbGetRow(tsMnodeSdb, &mnodeId);
}
void mgmtReleaseMnode(struct _mnode_obj *pMnode) {
sdbDecRef(tsMnodeSdb, pMnode);
} }
void mpeerCleanup() { void *mgmtGetNextMnode(void *pNode, SMnodeObj **pMnode) {
mpeerCleanupMnodes(); return sdbFetchRow(tsMnodeSdb, pNode, (void **)pMnode);
} }
static char *mpeerGetMnodeRoleStr(int32_t role) { static char *mgmtGetMnodeRoleStr(int32_t role) {
switch (role) { switch (role) {
case TAOS_SYNC_ROLE_OFFLINE: case TAOS_SYNC_ROLE_OFFLINE:
return "offline"; return "offline";
...@@ -115,6 +153,101 @@ static char *mpeerGetMnodeRoleStr(int32_t role) { ...@@ -115,6 +153,101 @@ static char *mpeerGetMnodeRoleStr(int32_t role) {
} }
} }
bool mgmtIsMaster() { return tsMnodeIsMaster; }
void mgmtGetMnodeIpList(SRpcIpSet *ipSet, bool usePublicIp) {
void *pNode = NULL;
while (1) {
SMnodeObj *pMnode = NULL;
pNode = mgmtGetNextMnode(pNode, &pMnode);
if (pMnode == NULL) break;
if (usePublicIp) {
ipSet->ip[ipSet->numOfIps] = htonl(pMnode->pDnode->publicIp);
} else {
ipSet->ip[ipSet->numOfIps] = htonl(pMnode->pDnode->privateIp);
}
if (pMnode->role == TAOS_SYNC_ROLE_MASTER) {
ipSet->inUse = ipSet->numOfIps;
}
ipSet->numOfIps++;
ipSet->port = htons(pMnode->pDnode->mnodeShellPort);
mgmtReleaseMnode(pMnode);
}
}
void mgmtGetMnodeList(void *param) {
SDMNodeInfos *mnodes = param;
mnodes->inUse = 0;
int32_t index = 0;
void *pNode = NULL;
while (1) {
SMnodeObj *pMnode = NULL;
pNode = mgmtGetNextMnode(pNode, &pMnode);
if (pMnode == NULL) break;
mnodes->nodeInfos[index].nodeId = htonl(pMnode->mnodeId);
mnodes->nodeInfos[index].nodeIp = htonl(pMnode->pDnode->privateIp);
mnodes->nodeInfos[index].nodePort = htons(pMnode->pDnode->mnodeDnodePort);
strcpy(mnodes->nodeInfos[index].nodeName, pMnode->pDnode->dnodeName);
mPrint("node:%d role:%s", pMnode->mnodeId, mgmtGetMnodeRoleStr(pMnode->role));
if (pMnode->role == TAOS_SYNC_ROLE_MASTER) {
mnodes->inUse = index;
mPrint("node:%d inUse:%d", pMnode->mnodeId, mnodes->inUse);
}
index++;
mgmtReleaseMnode(pMnode);
}
mnodes->nodeNum = index;
}
int32_t mgmtAddMnode(int32_t dnodeId) {
SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj));
pMnode->mnodeId = dnodeId;
pMnode->createdTime = taosGetTimestampMs();
SSdbOperDesc oper = {
.type = SDB_OPER_GLOBAL,
.table = tsMnodeSdb,
.pObj = pMnode,
};
int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
tfree(pMnode);
code = TSDB_CODE_SDB_ERROR;
}
return code;
}
int32_t mgmtDropMnode(int32_t dnodeId) {
SMnodeObj *pMnode = sdbGetRow(tsMnodeSdb, &dnodeId);
if (pMnode == NULL) {
return TSDB_CODE_DNODE_NOT_EXIST;
}
SSdbOperDesc oper = {
.type = SDB_OPER_GLOBAL,
.table = tsMnodeSdb,
.pObj = pMnode
};
int32_t code = sdbDeleteRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_SDB_ERROR;
}
sdbDecRef(tsMnodeSdb, pMnode);
return code;
}
static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL);
if (pUser == NULL) return 0; if (pUser == NULL) return 0;
...@@ -162,7 +295,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo ...@@ -162,7 +295,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
} }
pShow->numOfRows = mpeerGetMnodesNum(); pShow->numOfRows = mgmtGetMnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL; pShow->pNode = NULL;
mgmtReleaseUser(pUser); mgmtReleaseUser(pUser);
...@@ -178,7 +311,7 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -178,7 +311,7 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
char ipstr[32]; char ipstr[32];
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pNode = mpeerGetNextMnode(pShow->pNode, &pMnode); pShow->pNode = mgmtGetNextMnode(pShow->pNode, &pMnode);
if (pMnode == NULL) break; if (pMnode == NULL) break;
cols = 0; cols = 0;
...@@ -187,12 +320,12 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -187,12 +320,12 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
*(int16_t *)pWrite = pMnode->mnodeId; *(int16_t *)pWrite = pMnode->mnodeId;
cols++; cols++;
tinet_ntoa(ipstr, pMnode->privateIp); tinet_ntoa(ipstr, pMnode->pDnode->privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr); strcpy(pWrite, ipstr);
cols++; cols++;
tinet_ntoa(ipstr, pMnode->publicIp); tinet_ntoa(ipstr, pMnode->pDnode->publicIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr); strcpy(pWrite, ipstr);
cols++; cols++;
...@@ -202,12 +335,12 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi ...@@ -202,12 +335,12 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, mpeerGetMnodeRoleStr(pMnode->role)); strcpy(pWrite, mgmtGetMnodeRoleStr(pMnode->role));
cols++; cols++;
numOfRows++; numOfRows++;
mpeerReleaseMnode(pMnode); mgmtReleaseMnode(pMnode);
} }
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "mgmtAcct.h" #include "mgmtAcct.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mpeer.h" #include "mgmtMnode.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtTable.h" #include "mgmtTable.h"
......
...@@ -14,17 +14,23 @@ ...@@ -14,17 +14,23 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tbalance.h" #include "os.h"
#include "trpc.h"
#include "treplica.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtMnode.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "mgmtVgroup.h" #include "mgmtVgroup.h"
#ifndef _VPEER #ifndef _SYNC
int32_t balanceInit() { return 0; }
void balanceCleanUp() {}
void balanceNotify() {}
int32_t balanceAllocVnodes(SVgObj *pVgroup) { int32_t replicaInit() { return TSDB_CODE_SUCCESS; }
void replicaCleanUp() {}
void replicaNotify() {}
void replicaReset() {}
int32_t replicaForwardReqToPeer(void *pHead) { return TSDB_CODE_SUCCESS; }
int32_t replicaAllocVnodes(SVgObj *pVgroup) {
void * pNode = NULL; void * pNode = NULL;
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
SDnodeObj *pSelDnode = NULL; SDnodeObj *pSelDnode = NULL;
......
...@@ -18,11 +18,12 @@ ...@@ -18,11 +18,12 @@
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "treplica.h"
#include "tqueue.h" #include "tqueue.h"
#include "twal.h" #include "twal.h"
#include "hashint.h" #include "hashint.h"
#include "hashstr.h" #include "hashstr.h"
#include "mpeer.h" #include "mgmtMnode.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
typedef struct _SSdbTable { typedef struct _SSdbTable {
...@@ -131,7 +132,7 @@ int32_t sdbInit() { ...@@ -131,7 +132,7 @@ int32_t sdbInit() {
sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbObj->version, totalRows, numOfTables); sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbObj->version, totalRows, numOfTables);
mpeerUpdateSync(); replicaNotify();
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -264,7 +265,7 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_ ...@@ -264,7 +265,7 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_
tsSdbObj->version++; tsSdbObj->version++;
pHead->version = tsSdbObj->version; pHead->version = tsSdbObj->version;
code = mpeerForwardReqToPeer(pHead); code = replicaForwardReqToPeer(pHead);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pthread_mutex_unlock(&tsSdbObj->mutex); pthread_mutex_unlock(&tsSdbObj->mutex);
sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName, sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName,
......
...@@ -23,11 +23,10 @@ ...@@ -23,11 +23,10 @@
#include "dnode.h" #include "dnode.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
#include "tbalance.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "tgrant.h" #include "tgrant.h"
#include "mpeer.h" #include "mgmtMnode.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
#include "mgmtShell.h" #include "mgmtShell.h"
...@@ -141,7 +140,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { ...@@ -141,7 +140,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
return; return;
} }
if (!mpeerIsMaster()) { if (!mgmtIsMaster()) {
// rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect()); // rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect());
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NO_MASTER); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NO_MASTER);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
...@@ -329,11 +328,7 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) { ...@@ -329,11 +328,7 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) {
return; return;
} }
if (pMsg->usePublicIp) { mgmtGetMnodeIpList(&pHBRsp->ipList, pMsg->usePublicIp);
mpeerGetPublicIpList(&pHBRsp->ipList);
} else {
mpeerGetPrivateIpList(&pHBRsp->ipList);
}
/* /*
* TODO * TODO
...@@ -415,11 +410,7 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) { ...@@ -415,11 +410,7 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->writeAuth = pUser->writeAuth;
pConnectRsp->superAuth = pUser->superAuth; pConnectRsp->superAuth = pUser->superAuth;
if (pMsg->usePublicIp) { mgmtGetMnodeIpList(&pConnectRsp->ipList, pMsg->usePublicIp);
mpeerGetPublicIpList(&pConnectRsp->ipList);
} else {
mpeerGetPrivateIpList(&pConnectRsp->ipList);
}
connect_over: connect_over:
rpcRsp.code = code; rpcRsp.code = code;
......
...@@ -29,7 +29,7 @@ ...@@ -29,7 +29,7 @@
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "mgmtDServer.h" #include "mgmtDServer.h"
#include "tgrant.h" #include "tgrant.h"
#include "mpeer.h" #include "mgmtMnode.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
#include "mgmtShell.h" #include "mgmtShell.h"
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include "tutil.h" #include "tutil.h"
#include "mgmtAcct.h" #include "mgmtAcct.h"
#include "tgrant.h" #include "tgrant.h"
#include "mpeer.h" #include "mgmtMnode.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
#include "mgmtShell.h" #include "mgmtShell.h"
#include "mgmtUser.h" #include "mgmtUser.h"
......
...@@ -17,14 +17,14 @@ ...@@ -17,14 +17,14 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "tbalance.h"
#include "tsync.h" #include "tsync.h"
#include "treplica.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "mnode.h" #include "mnode.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDClient.h" #include "mgmtDClient.h"
#include "mgmtDServer.h" #include "mgmtDServer.h"
#include "mpeer.h" #include "mgmtMnode.h"
#include "mgmtProfile.h" #include "mgmtProfile.h"
#include "mgmtSdb.h" #include "mgmtSdb.h"
#include "mgmtShell.h" #include "mgmtShell.h"
...@@ -244,7 +244,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) { ...@@ -244,7 +244,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) {
strcpy(pVgroup->dbName, pDb->name); strcpy(pVgroup->dbName, pDb->name);
pVgroup->numOfVnodes = pDb->cfg.replications; pVgroup->numOfVnodes = pDb->cfg.replications;
pVgroup->createdTime = taosGetTimestampMs(); pVgroup->createdTime = taosGetTimestampMs();
if (balanceAllocVnodes(pVgroup) != 0) { if (replicaAllocVnodes(pVgroup) != 0) {
mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes); mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes);
free(pVgroup); free(pVgroup);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册