diff --git a/cmake/define.inc b/cmake/define.inc index 5f17ee121633f4c1c4d65a5bd6241072989dbafb..9f56dc654dea949784e885a98fb5f78e36a30e7e 100755 --- a/cmake/define.inc +++ b/cmake/define.inc @@ -1,25 +1,18 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) PROJECT(TDengine) -IF (TD_CLUSTER) - ADD_DEFINITIONS(-D_CLUSTER) -ENDIF () - -IF (TD_MPEER) - ADD_DEFINITIONS(-D_MPEER) -ENDIF () - -IF (TD_VPEER) - ADD_DEFINITIONS(-D_VPEER) - #ADD_DEFINITIONS(-DTSDB_REPLICA_MAX_NUM=3) -ELSE () - #ADD_DEFINITIONS(-DTSDB_REPLICA_MAX_NUM=1) +IF (TD_SYNC) + ADD_DEFINITIONS(-D_SYNC) ENDIF () IF (TD_ACCOUNT) ADD_DEFINITIONS(-D_ACCOUNT) ENDIF () +IF (TD_ADMIN) + ADD_DEFINITIONS(-D_ADMIN) +ENDIF () + IF (TD_GRANT) ADD_DEFINITIONS(-D_GRANT) ENDIF () diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index 899977061832bf1ba4c76ba2d802d3e1c619c1fd..5735e1a8c18677f7323db6942d115fca681ab105 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -21,20 +21,13 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) IF (TD_ACCOUNT) TARGET_LINK_LIBRARIES(taosd account) ENDIF () + IF (TD_GRANT) TARGET_LINK_LIBRARIES(taosd grant) ENDIF () - IF (TD_CLUSTER) - TARGET_LINK_LIBRARIES(taosd cluster) - ENDIF () - - IF (TD_VPEER) - TARGET_LINK_LIBRARIES(taosd balance sync) - ENDIF () - - IF (TD_MPEER) - TARGET_LINK_LIBRARIES(taosd mpeer sync) + IF (TD_SYNC) + TARGET_LINK_LIBRARIES(taosd replica sync) ENDIF () SET(PREPARE_ENV_CMD "prepare_env_cmd") diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 85454af095aed7782c4fbbd3f2f3be0bf3210024..460c866ce17195e0a2ba0dd0bc4a4d838db219fb 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -23,12 +23,12 @@ #include "tsync.h" #include "ttime.h" #include "ttimer.h" +#include "treplica.h" #include "dnode.h" #include "dnodeMClient.h" #include "dnodeModule.h" #include "dnodeMgmt.h" #include "vnode.h" -#include "mpeer.h" #define MPEER_CONTENT_LEN 2000 @@ -181,7 +181,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { tsMnodeInfos.nodeInfos[i].nodeName); } dnodeSaveMnodeIpList(); - mpeerUpdateSync(); + replicaNotify(); } taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); diff --git a/src/inc/mnode.h b/src/inc/mnode.h index a5817ac9df69049c49becf1a79fec276147ab40a..cbd768c29562150451ec18ed5355c807a5d91ddd 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -43,19 +43,6 @@ struct _acct_obj; struct _user_obj; struct _mnode_obj; -typedef struct _mnode_obj { - int32_t mnodeId; - int64_t createdTime; - int8_t reserved[14]; - int8_t updateEnd[1]; - int32_t refCount; - uint32_t privateIp; - uint32_t publicIp; - uint16_t port; - int8_t role; - char mnodeName[TSDB_NODE_NAME_LEN + 1]; -} SMnodeObj; - typedef struct _dnode_obj { int32_t dnodeId; uint32_t privateIp; @@ -88,6 +75,17 @@ typedef struct _dnode_obj { int16_t bandwidthUsage; // calc from sys.band } SDnodeObj; +typedef struct _mnode_obj { + int32_t mnodeId; + int64_t createdTime; + int8_t reserved[14]; + int8_t updateEnd[1]; + int32_t refCount; + int8_t role; + SDnodeObj *pDnode; +} SMnodeObj; + + typedef struct { int32_t dnodeId; uint32_t privateIp; diff --git a/src/inc/tbalance.h b/src/inc/treplica.h similarity index 68% rename from src/inc/tbalance.h rename to src/inc/treplica.h index c73d6a91a93bb03fc188fa72889d4e8a23c6cc85..b8915d64a04ea53b2eae49b788bf575b5f774be3 100644 --- a/src/inc/tbalance.h +++ b/src/inc/treplica.h @@ -13,27 +13,23 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_BALANCE_H -#define TDENGINE_BALANCE_H +#ifndef TDENGINE_REPLICA_H +#define TDENGINE_REPLICA_H #ifdef __cplusplus extern "C" { #endif -#include -#include -#include - -struct _db_obj; struct _vg_obj; struct _dnode_obj; -int32_t balanceInit(); -void balanceCleanUp(); -void balanceNotify(); -void balanceReset(); -int32_t balanceAllocVnodes(struct _vg_obj *pVgroup); -int32_t balanceDropDnode(struct _dnode_obj *pDnode); +int32_t replicaInit(); +void replicaCleanUp(); +void replicaNotify(); +void replicaReset(); +int32_t replicaAllocVnodes(struct _vg_obj *pVgroup); +int32_t replicaForwardReqToPeer(void *pHead); +int32_t replicaDropDnode(struct _dnode_obj *pDnode); #ifdef __cplusplus } diff --git a/src/inc/mpeer.h b/src/mnode/inc/mgmtMnode.h similarity index 53% rename from src/inc/mpeer.h rename to src/mnode/inc/mgmtMnode.h index ba1b7d32cf27a22540d3d3cbfd7388c617e24880..d89ebb0c0111ef17e47f91f663d9d910b5a32de8 100644 --- a/src/inc/mpeer.h +++ b/src/mnode/inc/mgmtMnode.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_MPEER_H -#define TDENGINE_MPEER_H +#ifndef TDENGINE_MGMT_MNODE_H +#define TDENGINE_MGMT_MNODE_H #ifdef __cplusplus extern "C" { @@ -28,29 +28,21 @@ enum _TAOS_MN_STATUS { TAOS_MN_STATUS_READY }; -// general implementation -int32_t mpeerInit(); -void mpeerCleanup(); +int32_t mgmtInitMnodes(); +void mgmtCleanupMnodes(); -// special implementation -int32_t mpeerInitMnodes(); -void mpeerCleanupMnodes(); -int32_t mpeerAddMnode(int32_t dnodeId); -int32_t mpeerRemoveMnode(int32_t dnodeId); +int32_t mgmtAddMnode(int32_t dnodeId); +int32_t mgmtDropMnode(int32_t dnodeId); -void * mpeerGetMnode(int32_t mnodeId); -int32_t mpeerGetMnodesNum(); -void * mpeerGetNextMnode(void *pNode, struct _mnode_obj **pMnode); -void mpeerReleaseMnode(struct _mnode_obj *pMnode); +void * mgmtGetMnode(int32_t mnodeId); +int32_t mgmtGetMnodesNum(); +void * mgmtGetNextMnode(void *pNode, struct _mnode_obj **pMnode); +void mgmtReleaseMnode(struct _mnode_obj *pMnode); -bool mpeerIsMaster(); +bool mgmtIsMaster(); -void mpeerGetPrivateIpList(SRpcIpSet *ipSet); -void mpeerGetPublicIpList(SRpcIpSet *ipSet); -void mpeerGetMpeerInfos(void *mpeers); - -int32_t mpeerForwardReqToPeer(void *pHead); -void mpeerUpdateSync(); +void mgmtGetMnodeIpList(SRpcIpSet *ipSet, bool usePublicIp); +void mgmtGetMnodeList(void *mpeers); #ifdef __cplusplus } diff --git a/src/mnode/src/mgmtDClient.c b/src/mnode/src/mgmtDClient.c index 699a1551d4be111e17692362f6eab393a720d00d..8552519e02d6aaa05534815a2c86c6cb25b1af61 100644 --- a/src/mnode/src/mgmtDClient.c +++ b/src/mnode/src/mgmtDClient.c @@ -21,7 +21,7 @@ #include "tutil.h" #include "dnode.h" #include "mnode.h" -#include "tbalance.h" +#include "mgmtMnode.h" #include "mgmtDb.h" #include "mgmtDnode.h" #include "tgrant.h" diff --git a/src/mnode/src/mgmtDServer.c b/src/mnode/src/mgmtDServer.c index 0ac93d429cae4768b27894737990a7fef88de4ba..e4c5e797b3f551888c3a1177f94f22648595adcd 100644 --- a/src/mnode/src/mgmtDServer.c +++ b/src/mnode/src/mgmtDServer.c @@ -22,7 +22,7 @@ #include "tutil.h" #include "dnode.h" #include "mnode.h" -#include "tbalance.h" +#include "treplica.h" #include "mgmtDb.h" #include "mgmtDServer.h" #include "tgrant.h" diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index 99bcc365aeb3c31877996da61b3173a5c78c4cfe..5b77e6b60e955a3d317bbe9e007343c4213af4ce 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -19,12 +19,11 @@ #include "tutil.h" #include "name.h" #include "mnode.h" -#include "tbalance.h" #include "mgmtAcct.h" #include "mgmtDb.h" #include "mgmtDnode.h" #include "tgrant.h" -#include "mpeer.h" +#include "mgmtMnode.h" #include "mgmtShell.h" #include "mgmtProfile.h" #include "mgmtSdb.h" diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 97e9a89c8c53cd367454afc0a28af1a6f14c7819..17212806a99f2e5d660f0c1ff3f70980fac710b6 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -16,13 +16,13 @@ #define _DEFAULT_SOURCE #include "os.h" #include "tmodule.h" -#include "tbalance.h" #include "tgrant.h" -#include "mgmtDnode.h" +#include "treplica.h" #include "mnode.h" -#include "mpeer.h" #include "mgmtDClient.h" #include "mgmtDServer.h" +#include "mgmtDnode.h" +#include "mgmtMnode.h" #include "mgmtSdb.h" #include "mgmtShell.h" #include "mgmtUser.h" @@ -119,13 +119,15 @@ static int32_t mgmtDnodeActionDecode(SSdbOperDesc *pOper) { static int32_t mgmtDnodeActionRestored() { int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb); - if (numOfRows <= 0) { - if (strcmp(tsMasterIp, tsPrivateIp) == 0) { - mgmtCreateDnode(inet_addr(tsPrivateIp)); - } + if (numOfRows <= 0 && strcmp(tsMasterIp, tsPrivateIp) == 0) { + uint32_t ip = inet_addr(tsPrivateIp); + mgmtCreateDnode(ip); + SDnodeObj *pDnode = mgmtGetDnodeByIp(ip); + mgmtAddMnode(pDnode->dnodeId); + mgmtReleaseDnode(pDnode); } - return 0; + return TSDB_CODE_SUCCESS; } int32_t mgmtInitDnodes() { @@ -326,7 +328,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { if (pDnode->status == TAOS_DN_STATUS_OFFLINE) { mTrace("dnode:%d, from offline to online", pDnode->dnodeId); pDnode->status = TAOS_DN_STATUS_READY; - balanceNotify(); + replicaNotify(); mgmtMonitorDnodeModule(); } @@ -339,7 +341,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { return; } - mpeerGetMpeerInfos(&pRsp->mpeers); + mgmtGetMnodeList(&pRsp->mpeers); pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId); pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus); @@ -417,7 +419,7 @@ int32_t mgmtDropDnode(SDnodeObj *pDnode) { return code; } -static int32_t clusterDropDnodeByIp(uint32_t ip) { +static int32_t mgmtDropDnodeByIp(uint32_t ip) { SDnodeObj *pDnode = mgmtGetDnodeByIp(ip); if (pDnode == NULL) { mError("dnode:%s, is not exist", taosIpStr(ip)); @@ -465,7 +467,7 @@ static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg) { rpcRsp.code = TSDB_CODE_NO_RIGHTS; } else { uint32_t ip = inet_addr(pDrop->ip); - rpcRsp.code = clusterDropDnodeByIp(ip); + rpcRsp.code = mgmtDropDnodeByIp(ip); if (rpcRsp.code == TSDB_CODE_SUCCESS) { mLPrint("dnode:%s is dropped by %s", pDrop->ip, pMsg->pUser->user); } else { @@ -709,7 +711,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo return numOfRows; } -static bool clusterCheckConfigShow(SGlobalConfig *cfg) { +static bool mgmtCheckConfigShow(SGlobalConfig *cfg) { if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_SHOW)) return false; return true; @@ -746,7 +748,7 @@ static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pShow->numOfRows = 0; for (int32_t i = tsGlobalConfigNum - 1; i >= 0; --i) { SGlobalConfig *cfg = tsGlobalConfig + i; - if (!clusterCheckConfigShow(cfg)) continue; + if (!mgmtCheckConfigShow(cfg)) continue; pShow->numOfRows++; } @@ -762,7 +764,7 @@ static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, vo for (int32_t i = tsGlobalConfigNum - 1; i >= 0 && numOfRows < rows; --i) { SGlobalConfig *cfg = tsGlobalConfig + i; - if (!clusterCheckConfigShow(cfg)) continue; + if (!mgmtCheckConfigShow(cfg)) continue; char *pWrite; int32_t cols = 0; @@ -924,7 +926,7 @@ static void clusterSetModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { mgmtUpdateDnode(pDnode); if (moduleType == TSDB_MOD_MGMT) { - mpeerAddMnode(pDnode->dnodeId); + mgmtAddMnode(pDnode->dnodeId); mPrint("dnode:%d, add it into mnode list", pDnode->dnodeId); } } @@ -934,7 +936,7 @@ static void clusterUnSetModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { mgmtUpdateDnode(pDnode); if (moduleType == TSDB_MOD_MGMT) { - mpeerRemoveMnode(pDnode->dnodeId); + mgmtDropMnode(pDnode->dnodeId); mPrint("dnode:%d, remove it from mnode list", pDnode->dnodeId); } } diff --git a/src/mnode/src/mgmtMain.c b/src/mnode/src/mgmtMain.c index 46721f4834a531f575b7ef88841c5a4496d4121a..a4ce22f15845d441d7bd0dc2924c504611c37f92 100644 --- a/src/mnode/src/mgmtMain.c +++ b/src/mnode/src/mgmtMain.c @@ -20,10 +20,10 @@ #include "tsched.h" #include "mnode.h" #include "mgmtAcct.h" -#include "tbalance.h" +#include "treplica.h" #include "mgmtDnode.h" #include "tgrant.h" -#include "mpeer.h" +#include "mgmtMnode.h" #include "mgmtDb.h" #include "mgmtDClient.h" #include "mgmtDServer.h" @@ -109,7 +109,7 @@ int32_t mgmtStartSystem() { return -1; } - if (mpeerInit() < 0) { + if (mgmtInitMnodes() < 0) { mError("failed to init mpeers"); return -1; } @@ -127,7 +127,7 @@ int32_t mgmtStartSystem() { return -1; } - if (balanceInit() < 0) { + if (replicaInit() < 0) { mError("failed to init dnode balance") } @@ -140,7 +140,7 @@ int32_t mgmtStartSystem() { void mgmtStopSystem() { - if (mpeerIsMaster()) { + if (mgmtIsMaster()) { mTrace("it is a master mgmt node, it could not be stopped"); return; } @@ -152,8 +152,8 @@ void mgmtStopSystem() { void mgmtCleanUpSystem() { mPrint("starting to clean up mgmt"); grantCleanUp(); - mpeerCleanup(); - balanceCleanUp(); + mgmtCleanupMnodes(); + replicaCleanUp(); mgmtCleanUpShell(); mgmtCleanupDClient(); mgmtCleanupDServer(); diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index e2edb201b99a88189b6d0da32f9f1af4520084bf..eff6e8e01ac26055ddc593ab24c376373a16899f 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -16,91 +16,129 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" +#include "tmodule.h" #include "trpc.h" #include "tsync.h" -#include "mpeer.h" +#include "treplica.h" +#include "mnode.h" +#include "mgmtMnode.h" +#include "mgmtDnode.h" +#include "mgmtSdb.h" #include "mgmtShell.h" #include "mgmtUser.h" +static void * tsMnodeSdb = NULL; +static int32_t tsMnodeUpdateSize = 0; +static int32_t tsMnodeIsMaster = true; static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); -#ifndef _MPEER - -static SMnodeObj tsMnodeObj = {0}; +static int32_t mgmtMnodeActionDestroy(SSdbOperDesc *pOper) { + tfree(pOper->pObj); + return TSDB_CODE_SUCCESS; +} -int32_t mpeerInitMnodes() { - tsMnodeObj.mnodeId = 1; - tsMnodeObj.privateIp = inet_addr(tsPrivateIp); - tsMnodeObj.publicIp = inet_addr(tsPublicIp); - tsMnodeObj.createdTime = taosGetTimestampMs(); - tsMnodeObj.role = TAOS_SYNC_ROLE_MASTER; - tsMnodeObj.port = tsMnodeDnodePort; - sprintf(tsMnodeObj.mnodeName, "m%d", tsMnodeObj.mnodeId); +static int32_t mgmtMnodeActionInsert(SSdbOperDesc *pOper) { + SMnodeObj *pMnode = pOper->pObj; + SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId); + if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST; + pMnode->pDnode = pDnode; + mgmtReleaseDnode(pDnode); return TSDB_CODE_SUCCESS; } -void mpeerCleanupMnodes() {} -int32_t mpeerAddMnode(int32_t dnodeId) { return TSDB_CODE_SUCCESS; } -int32_t mpeerRemoveMnode(int32_t dnodeId) { return TSDB_CODE_SUCCESS; } -void * mpeerGetMnode(int32_t mnodeId) { return &tsMnodeObj; } -int32_t mpeerGetMnodesNum() { return 1; } -void mpeerReleaseMnode(struct _mnode_obj *pMnode) {} -bool mpeerIsMaster() { return tsMnodeObj.role == TAOS_SYNC_ROLE_MASTER; } -void mpeerUpdateSync() {} - -void *mpeerGetNextMnode(void *pNode, SMnodeObj **pMnode) { - if (*pMnode == NULL) { - *pMnode = &tsMnodeObj; - } else { - *pMnode = NULL; +static int32_t mgmtMnodeActionDelete(SSdbOperDesc *pOper) { + SMnodeObj *pMnode = pOper->pObj; + mTrace("mnode:%d, is dropped from sdb", pMnode->mnodeId); + return TSDB_CODE_SUCCESS; +} + +static int32_t mgmtMnodeActionUpdate(SSdbOperDesc *pOper) { + SMnodeObj *pMnode = pOper->pObj; + SMnodeObj *pSaved = mgmtGetMnode(pMnode->mnodeId); + if (pMnode != pSaved) { + memcpy(pSaved, pMnode, pOper->rowSize); + free(pMnode); } - return *pMnode; + return TSDB_CODE_SUCCESS; } -void mpeerGetPrivateIpList(SRpcIpSet *ipSet) { - ipSet->inUse = 0; - ipSet->numOfIps = 1; - ipSet->port = htons(tsMnodeObj.port); - ipSet->ip[0] = htonl(tsMnodeObj.privateIp); +static int32_t mgmtMnodeActionEncode(SSdbOperDesc *pOper) { + SMnodeObj *pMnode = pOper->pObj; + memcpy(pOper->rowData, pMnode, tsMnodeUpdateSize); + pOper->rowSize = tsMnodeUpdateSize; + return TSDB_CODE_SUCCESS; } -void mpeerGetPublicIpList(SRpcIpSet *ipSet) { - ipSet->inUse = 0; - ipSet->numOfIps = 1; - ipSet->port = htons(tsMnodeObj.port); - ipSet->ip[0] = htonl(tsMnodeObj.publicIp); -} +static int32_t mgmtMnodeActionDecode(SSdbOperDesc *pOper) { + SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj)); + if (pMnode == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; -void mpeerGetMpeerInfos(void *param) { - SDMNodeInfos *mpeers = param; - mpeers->inUse = 0; - mpeers->nodeNum = 1; - mpeers->nodeInfos[0].nodeId = htonl(tsMnodeObj.mnodeId); - mpeers->nodeInfos[0].nodeIp = htonl(tsMnodeObj.privateIp); - mpeers->nodeInfos[0].nodePort = htons(tsMnodeObj.port); - strcpy(mpeers->nodeInfos[0].nodeName, tsMnodeObj.mnodeName); + memcpy(pMnode, pOper->rowData, tsMnodeUpdateSize); + pOper->pObj = pMnode; + return TSDB_CODE_SUCCESS; } -int32_t mpeerForwardReqToPeer(void *pHead) { +static int32_t mgmtMnodeActionRestored() { return TSDB_CODE_SUCCESS; } -#endif +int32_t mgmtInitMnodes() { + SMnodeObj tObj; + tsMnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; + + SSdbTableDesc tableDesc = { + .tableId = SDB_TABLE_MNODE, + .tableName = "mnodes", + .hashSessions = TSDB_MAX_MNODES, + .maxRowSize = tsMnodeUpdateSize, + .refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj, + .keyType = SDB_KEY_INT, + .insertFp = mgmtMnodeActionInsert, + .deleteFp = mgmtMnodeActionDelete, + .updateFp = mgmtMnodeActionUpdate, + .encodeFp = mgmtMnodeActionEncode, + .decodeFp = mgmtMnodeActionDecode, + .destroyFp = mgmtMnodeActionDestroy, + .restoredFp = mgmtMnodeActionRestored + }; + + tsMnodeSdb = sdbOpenTable(&tableDesc); + if (tsMnodeSdb == NULL) { + mError("failed to init mnodes data"); + return -1; + } -int32_t mpeerInit() { mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MNODE, mgmtGetMnodeMeta); mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MNODE, mgmtRetrieveMnodes); - return mpeerInitMnodes(); + + mTrace("mnodes table is created"); + return TSDB_CODE_SUCCESS; +} + +void mgmtCleanupMnodes() { + sdbCloseTable(tsMnodeSdb); +} + +int32_t mgmtGetMnodesNum() { + return sdbGetNumOfRows(tsMnodeSdb); +} + +void *mgmtGetMnode(int32_t mnodeId) { + return sdbGetRow(tsMnodeSdb, &mnodeId); +} + +void mgmtReleaseMnode(struct _mnode_obj *pMnode) { + sdbDecRef(tsMnodeSdb, pMnode); } -void mpeerCleanup() { - mpeerCleanupMnodes(); +void *mgmtGetNextMnode(void *pNode, SMnodeObj **pMnode) { + return sdbFetchRow(tsMnodeSdb, pNode, (void **)pMnode); } -static char *mpeerGetMnodeRoleStr(int32_t role) { +static char *mgmtGetMnodeRoleStr(int32_t role) { switch (role) { case TAOS_SYNC_ROLE_OFFLINE: return "offline"; @@ -115,6 +153,101 @@ static char *mpeerGetMnodeRoleStr(int32_t role) { } } +bool mgmtIsMaster() { return tsMnodeIsMaster; } + +void mgmtGetMnodeIpList(SRpcIpSet *ipSet, bool usePublicIp) { + void *pNode = NULL; + while (1) { + SMnodeObj *pMnode = NULL; + pNode = mgmtGetNextMnode(pNode, &pMnode); + if (pMnode == NULL) break; + + if (usePublicIp) { + ipSet->ip[ipSet->numOfIps] = htonl(pMnode->pDnode->publicIp); + } else { + ipSet->ip[ipSet->numOfIps] = htonl(pMnode->pDnode->privateIp); + } + + if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { + ipSet->inUse = ipSet->numOfIps; + } + + ipSet->numOfIps++; + ipSet->port = htons(pMnode->pDnode->mnodeShellPort); + + mgmtReleaseMnode(pMnode); + } +} + +void mgmtGetMnodeList(void *param) { + SDMNodeInfos *mnodes = param; + mnodes->inUse = 0; + + int32_t index = 0; + void *pNode = NULL; + while (1) { + SMnodeObj *pMnode = NULL; + pNode = mgmtGetNextMnode(pNode, &pMnode); + if (pMnode == NULL) break; + + mnodes->nodeInfos[index].nodeId = htonl(pMnode->mnodeId); + mnodes->nodeInfos[index].nodeIp = htonl(pMnode->pDnode->privateIp); + mnodes->nodeInfos[index].nodePort = htons(pMnode->pDnode->mnodeDnodePort); + strcpy(mnodes->nodeInfos[index].nodeName, pMnode->pDnode->dnodeName); + mPrint("node:%d role:%s", pMnode->mnodeId, mgmtGetMnodeRoleStr(pMnode->role)); + if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { + mnodes->inUse = index; + mPrint("node:%d inUse:%d", pMnode->mnodeId, mnodes->inUse); + } + + index++; + mgmtReleaseMnode(pMnode); + } + + mnodes->nodeNum = index; +} + +int32_t mgmtAddMnode(int32_t dnodeId) { + SMnodeObj *pMnode = calloc(1, sizeof(SMnodeObj)); + pMnode->mnodeId = dnodeId; + pMnode->createdTime = taosGetTimestampMs(); + + SSdbOperDesc oper = { + .type = SDB_OPER_GLOBAL, + .table = tsMnodeSdb, + .pObj = pMnode, + }; + + int32_t code = sdbInsertRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + tfree(pMnode); + code = TSDB_CODE_SDB_ERROR; + } + + return code; +} + +int32_t mgmtDropMnode(int32_t dnodeId) { + SMnodeObj *pMnode = sdbGetRow(tsMnodeSdb, &dnodeId); + if (pMnode == NULL) { + return TSDB_CODE_DNODE_NOT_EXIST; + } + + SSdbOperDesc oper = { + .type = SDB_OPER_GLOBAL, + .table = tsMnodeSdb, + .pObj = pMnode + }; + + int32_t code = sdbDeleteRow(&oper); + if (code != TSDB_CODE_SUCCESS) { + code = TSDB_CODE_SDB_ERROR; + } + + sdbDecRef(tsMnodeSdb, pMnode); + return code; +} + static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { SUserObj *pUser = mgmtGetUserFromConn(pConn, NULL); if (pUser == NULL) return 0; @@ -162,7 +295,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; } - pShow->numOfRows = mpeerGetMnodesNum(); + pShow->numOfRows = mgmtGetMnodesNum(); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->pNode = NULL; mgmtReleaseUser(pUser); @@ -178,7 +311,7 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi char ipstr[32]; while (numOfRows < rows) { - pShow->pNode = mpeerGetNextMnode(pShow->pNode, &pMnode); + pShow->pNode = mgmtGetNextMnode(pShow->pNode, &pMnode); if (pMnode == NULL) break; cols = 0; @@ -187,12 +320,12 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi *(int16_t *)pWrite = pMnode->mnodeId; cols++; - tinet_ntoa(ipstr, pMnode->privateIp); + tinet_ntoa(ipstr, pMnode->pDnode->privateIp); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; strcpy(pWrite, ipstr); cols++; - tinet_ntoa(ipstr, pMnode->publicIp); + tinet_ntoa(ipstr, pMnode->pDnode->publicIp); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; strcpy(pWrite, ipstr); cols++; @@ -202,15 +335,15 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, mpeerGetMnodeRoleStr(pMnode->role)); + strcpy(pWrite, mgmtGetMnodeRoleStr(pMnode->role)); cols++; numOfRows++; - mpeerReleaseMnode(pMnode); + mgmtReleaseMnode(pMnode); } pShow->numOfReads += numOfRows; return numOfRows; -} \ No newline at end of file +} diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index db1b764ca7492c85aff10e0cee4fc67c816b5c76..eb9efc8e5df59ba9762ac6f0e72ff73990795b4e 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -19,7 +19,7 @@ #include "mgmtAcct.h" #include "mgmtDnode.h" #include "mgmtDb.h" -#include "mpeer.h" +#include "mgmtMnode.h" #include "mgmtProfile.h" #include "mgmtShell.h" #include "mgmtTable.h" diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtReplica.c similarity index 82% rename from src/mnode/src/mgmtBalance.c rename to src/mnode/src/mgmtReplica.c index 0b9e025acf1af7e0b36b725cfe22e8ddf7dbcaf6..bb906fa5ec138d02fb0796cfbadf9a0dc5d68619 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtReplica.c @@ -14,17 +14,23 @@ */ #define _DEFAULT_SOURCE -#include "tbalance.h" +#include "os.h" +#include "trpc.h" +#include "treplica.h" #include "mnode.h" +#include "mgmtMnode.h" #include "mgmtDnode.h" #include "mgmtVgroup.h" -#ifndef _VPEER -int32_t balanceInit() { return 0; } -void balanceCleanUp() {} -void balanceNotify() {} +#ifndef _SYNC -int32_t balanceAllocVnodes(SVgObj *pVgroup) { +int32_t replicaInit() { return TSDB_CODE_SUCCESS; } +void replicaCleanUp() {} +void replicaNotify() {} +void replicaReset() {} +int32_t replicaForwardReqToPeer(void *pHead) { return TSDB_CODE_SUCCESS; } + +int32_t replicaAllocVnodes(SVgObj *pVgroup) { void * pNode = NULL; SDnodeObj *pDnode = NULL; SDnodeObj *pSelDnode = NULL; diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 12fab5587550159d8095dd4d94b765b47105fac2..7fdac8fd6085c5fe25cac84d8a40ffecbeb54090 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -18,11 +18,12 @@ #include "taoserror.h" #include "tlog.h" #include "trpc.h" +#include "treplica.h" #include "tqueue.h" #include "twal.h" #include "hashint.h" #include "hashstr.h" -#include "mpeer.h" +#include "mgmtMnode.h" #include "mgmtSdb.h" typedef struct _SSdbTable { @@ -131,7 +132,7 @@ int32_t sdbInit() { sdbTrace("sdb is initialized, version:%d totalRows:%d numOfTables:%d", tsSdbObj->version, totalRows, numOfTables); - mpeerUpdateSync(); + replicaNotify(); return TSDB_CODE_SUCCESS; } @@ -264,7 +265,7 @@ static int32_t sdbProcessWriteFromApp(SSdbTable *pTable, SWalHead *pHead, int32_ tsSdbObj->version++; pHead->version = tsSdbObj->version; - code = mpeerForwardReqToPeer(pHead); + code = replicaForwardReqToPeer(pHead); if (code != TSDB_CODE_SUCCESS) { pthread_mutex_unlock(&tsSdbObj->mutex); sdbError("table:%s, failed to forward %s record:%s from file, version:%" PRId64 ", reason:%s", pTable->tableName, diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 82eb2bae1e0f6e15f690b3e00ac796092e565dd3..8633a6359ef37af768a1c00fb27144db00fcabc4 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -23,11 +23,10 @@ #include "dnode.h" #include "mnode.h" #include "mgmtAcct.h" -#include "tbalance.h" #include "mgmtDb.h" #include "mgmtDnode.h" #include "tgrant.h" -#include "mpeer.h" +#include "mgmtMnode.h" #include "mgmtProfile.h" #include "mgmtSdb.h" #include "mgmtShell.h" @@ -141,7 +140,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { return; } - if (!mpeerIsMaster()) { + if (!mgmtIsMaster()) { // rpcSendRedirectRsp(rpcMsg->handle, mgmtGetMnodeIpListForRedirect()); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NO_MASTER); rpcFreeCont(rpcMsg->pCont); @@ -329,12 +328,8 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) { return; } - if (pMsg->usePublicIp) { - mpeerGetPublicIpList(&pHBRsp->ipList); - } else { - mpeerGetPrivateIpList(&pHBRsp->ipList); - } - + mgmtGetMnodeIpList(&pHBRsp->ipList, pMsg->usePublicIp); + /* * TODO * Dispose kill stream or kill query message @@ -415,12 +410,8 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) { pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->superAuth = pUser->superAuth; - if (pMsg->usePublicIp) { - mpeerGetPublicIpList(&pConnectRsp->ipList); - } else { - mpeerGetPrivateIpList(&pConnectRsp->ipList); - } - + mgmtGetMnodeIpList(&pConnectRsp->ipList, pMsg->usePublicIp); + connect_over: rpcRsp.code = code; if (code != TSDB_CODE_SUCCESS) { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 65dfb06ad5ecb76321f45707adbb1ba33d1f2b0d..dff6f9720934e02cafcb0363bea9f7d5e07a6b45 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -29,7 +29,7 @@ #include "mgmtDnode.h" #include "mgmtDServer.h" #include "tgrant.h" -#include "mpeer.h" +#include "mgmtMnode.h" #include "mgmtProfile.h" #include "mgmtSdb.h" #include "mgmtShell.h" diff --git a/src/mnode/src/mgmtUser.c b/src/mnode/src/mgmtUser.c index 7a7f2999b3ad9c7bc5f0419896e30b6458f922dd..3077f0005df436052e0f9d7980b1020f96e2f8dd 100644 --- a/src/mnode/src/mgmtUser.c +++ b/src/mnode/src/mgmtUser.c @@ -20,7 +20,7 @@ #include "tutil.h" #include "mgmtAcct.h" #include "tgrant.h" -#include "mpeer.h" +#include "mgmtMnode.h" #include "mgmtSdb.h" #include "mgmtShell.h" #include "mgmtUser.h" diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index ee9afd9586e1406de62e0adab4318c7790a431c1..0af2af93ab17453913278ce2e7c9034e9fd1ad48 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -17,14 +17,14 @@ #include "os.h" #include "taoserror.h" #include "tlog.h" -#include "tbalance.h" #include "tsync.h" +#include "treplica.h" #include "mgmtDnode.h" #include "mnode.h" #include "mgmtDb.h" #include "mgmtDClient.h" #include "mgmtDServer.h" -#include "mpeer.h" +#include "mgmtMnode.h" #include "mgmtProfile.h" #include "mgmtSdb.h" #include "mgmtShell.h" @@ -244,7 +244,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) { strcpy(pVgroup->dbName, pDb->name); pVgroup->numOfVnodes = pDb->cfg.replications; pVgroup->createdTime = taosGetTimestampMs(); - if (balanceAllocVnodes(pVgroup) != 0) { + if (replicaAllocVnodes(pVgroup) != 0) { mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes); free(pVgroup); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_ENOUGH_DNODES);