提交 54644652 编写于 作者: S Shengliang Guan

mnode monitor

上级 2bf46ddc
......@@ -85,6 +85,11 @@ typedef struct {
ESyncState state;
} SSyncMgmt;
typedef struct {
int64_t expireTimeMS;
int64_t timeseriesAllowed;
} SGrantInfo;
typedef struct SMnode {
int32_t dnodeId;
int64_t clusterId;
......@@ -105,6 +110,7 @@ typedef struct SMnode {
STelemMgmt telemMgmt;
SSyncMgmt syncMgmt;
SHashObj *infosMeta;
SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX];
SendReqToDnodeFp sendReqToDnodeFp;
SendReqToMnodeFp sendReqToMnodeFp;
......@@ -120,7 +126,7 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
uint64_t mndGenerateUid(char *name, int32_t len);
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
#ifdef __cplusplus
}
......
......@@ -24,6 +24,7 @@ extern "C" {
int32_t mndInitProfile(SMnode *pMnode);
void mndCleanupProfile(SMnode *pMnode);
int32_t mndGetNumOfConnections(SMnode *pMnode);
#ifdef __cplusplus
}
......
......@@ -99,11 +99,15 @@ void mndUpdateMnodeRole(SMnode *pMnode) {
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
ESyncRole lastRole = pObj->role;
if (pObj->id == 1) {
pObj->role = TAOS_SYNC_STATE_LEADER;
} else {
pObj->role = TAOS_SYNC_STATE_CANDIDATE;
}
if (pObj->role != lastRole) {
pObj->roleTime = taosGetTimestampMs();
}
sdbRelease(pSdb, pObj);
}
......
......@@ -916,3 +916,5 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}
int32_t mndGetNumOfConnections(SMnode *pMnode) { return taosHashGetSize(pMnode->profileMgmt.cache->pHashTable); }
\ No newline at end of file
......@@ -18,15 +18,15 @@
#include "mndCluster.h"
#include "mndSync.h"
#include "tbuffer.h"
#include "tjson.h"
#include "thttp.h"
#include "tjson.h"
#define TELEMETRY_SERVER "telemetry.taosdata.com"
#define TELEMETRY_PORT 80
static void mndBuildRuntimeInfo(SMnode* pMnode, SJson* pJson) {
SMnodeLoad load = {0};
if (mndGetLoad(pMnode, &load) != 0) return;
mndGetLoad(pMnode, &load);
tjsonAddDoubleToObject(pJson, "numOfDnode", load.numOfDnode);
tjsonAddDoubleToObject(pJson, "numOfMnode", load.numOfMnode);
......
......@@ -359,6 +359,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
return NULL;
}
mndUpdateMnodeRole(pMnode);
mDebug("mnode open successfully ");
return pMnode;
}
......@@ -385,26 +386,6 @@ void mndDestroy(const char *path) {
mDebug("mnode is destroyed");
}
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
pLoad->numOfDnode = 0;
pLoad->numOfMnode = 0;
pLoad->numOfVgroup = 0;
pLoad->numOfDatabase = 0;
pLoad->numOfSuperTable = 0;
pLoad->numOfChildTable = 0;
pLoad->numOfColumn = 0;
pLoad->totalPoints = 0;
pLoad->totalStorage = 0;
pLoad->compStorage = 0;
return 0;
}
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) {
return 0;
}
SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
if (pMsg == NULL) {
......@@ -523,3 +504,129 @@ uint64_t mndGenerateUid(char *name, int32_t len) {
}
} while (true);
}
void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
memset(pLoad, 0, sizeof(SMnodeLoad));
SSdb *pSdb = pMnode->pSdb;
pLoad->numOfDnode = sdbGetSize(pSdb, SDB_DNODE);
pLoad->numOfMnode = sdbGetSize(pSdb, SDB_MNODE);
pLoad->numOfVgroup = sdbGetSize(pSdb, SDB_VGROUP);
pLoad->numOfDatabase = sdbGetSize(pSdb, SDB_DB);
pLoad->numOfSuperTable = sdbGetSize(pSdb, SDB_STB);
void *pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
pLoad->numOfChildTable += pVgroup->numOfTables;
pLoad->numOfColumn += pVgroup->numOfTimeSeries;
pLoad->totalPoints += pVgroup->pointsWritten;
pLoad->totalStorage += pVgroup->totalStorage;
pLoad->compStorage += pVgroup->compStorage;
sdbRelease(pSdb, pVgroup);
}
}
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) {
SSdb *pSdb = pMnode->pSdb;
int64_t ms = taosGetTimestampMs();
pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
return -1;
}
// cluster info
tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
pClusterInfo->monitor_interval = tsMonitorInterval;
pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
void *pIter = NULL;
while (1) {
SDnodeObj *pObj = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
SMonDnodeDesc desc = {0};
desc.dnode_id = pObj->id;
tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
if (mndIsDnodeOnline(pMnode, pObj, ms)) {
tstrncpy(desc.status, "ready", sizeof(desc.status));
} else {
tstrncpy(desc.status, "offline", sizeof(desc.status));
}
taosArrayPush(pClusterInfo->dnodes, &desc);
sdbRelease(pSdb, pObj);
}
pIter = NULL;
while (1) {
SMnodeObj *pObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
SMonMnodeDesc desc = {0};
desc.mnode_id = pObj->id;
tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));
tstrncpy(desc.role, mndGetRoleStr(pObj->role), sizeof(desc.role));
taosArrayPush(pClusterInfo->mnodes, &desc);
sdbRelease(pSdb, pObj);
if (pObj->role == TAOS_SYNC_STATE_LEADER) {
pClusterInfo->first_ep_dnode_id = pObj->id;
tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
pClusterInfo->master_uptime = (ms - pObj->roleTime) / (86400000.0f);
}
}
// vgroup info
pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
pClusterInfo->vgroups_total++;
SMonVgroupDesc desc = {0};
desc.vgroup_id = pVgroup->vgId;
strncpy(desc.database_name, pVgroup->dbName, sizeof(desc.database_name));
desc.tables_num = pVgroup->numOfTables;
pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
tstrncpy(desc.status, "unsynced", sizeof(desc.status));
for (int32_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
pVnDesc->dnode_id = pVgid->dnodeId;
tstrncpy(pVnDesc->vnode_role, mndGetRoleStr(pVgid->role), sizeof(pVnDesc->vnode_role));
if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
tstrncpy(desc.status, "ready", sizeof(desc.status));
pClusterInfo->vgroups_alive++;
}
if (pVgid->role == TAOS_SYNC_STATE_LEADER || pVgid->role == TAOS_SYNC_STATE_CANDIDATE) {
pClusterInfo->vnodes_alive++;
}
pClusterInfo->vnodes_total++;
}
taosArrayPush(pVgroupInfo->vgroups, &desc);
sdbRelease(pSdb, pVgroup);
}
// grant info
pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
if (pMnode->grant.expireTimeMS == 0) {
pGrantInfo->expire_time = INT32_MAX;
pGrantInfo->timeseries_total = INT32_MAX;
}
return 0;
}
\ No newline at end of file
......@@ -128,11 +128,11 @@ void monSetClusterInfo(SMonInfo *pMonitor, SMonClusterInfo *pInfo) {
SJson *pMnodesJson = tjsonAddArrayToObject(pJson, "mnodes");
if (pMnodesJson == NULL) return;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->dnodes); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->mnodes); ++i) {
SJson *pMnodeJson = tjsonCreateObject();
if (pMnodeJson == NULL) continue;
SMonMnodeDesc *pMnodeDesc = taosArrayGet(pInfo->dnodes, i);
SMonMnodeDesc *pMnodeDesc = taosArrayGet(pInfo->mnodes, i);
tjsonAddDoubleToObject(pMnodeJson, "mnode_id", pMnodeDesc->mnode_id);
tjsonAddStringToObject(pMnodeJson, "mnode_ep", pMnodeDesc->mnode_ep);
tjsonAddStringToObject(pMnodeJson, "role", pMnodeDesc->role);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册