提交 aefabc73 编写于 作者: G Ganlin Zhao

[TD-6452]<feature>: taoskeeper metrics collector phase 1 taosd implementation

上级 29238fa4
......@@ -34,6 +34,7 @@ bool taosReadProcIO(int64_t* rchars, int64_t* wchars);
bool taosGetProcIO(float *readKB, float *writeKB);
bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes);
bool taosGetBandSpeed(float *bandSpeedKb);
bool taosGetNetworkIO(float *netInKb, float *netOutKb);
void taosGetDisk();
bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) ;
bool taosGetProcMemory(float *memoryUsedMB) ;
......
......@@ -333,7 +333,9 @@ int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
}
bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
*bytes = 0;
if (bytes) *bytes = 0;
if (rbytes) *rbytes = 0;
if (tbytes) *tbytes = 0;
FILE *fp = fopen(tsSysNetFile, "r");
if (fp == NULL) {
uError("open file:%s failed", tsSysNetFile);
......@@ -348,7 +350,7 @@ bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
memset(line, 0, len);
int64_t o_rbytes = 0;
int64_t rpackts = 0;
int64_t rpackets = 0;
int64_t o_tbytes = 0;
int64_t tpackets = 0;
int64_t nouse1 = 0;
......@@ -374,10 +376,10 @@ bool taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
sscanf(line,
"%s %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64
" %" PRId64,
nouse0, &o_rbytes, &rpackts, &nouse1, &nouse2, &nouse3, &nouse4, &nouse5, &nouse6, &o_tbytes, &tpackets);
nouse0, &o_rbytes, &rpackets, &nouse1, &nouse2, &nouse3, &nouse4, &nouse5, &nouse6, &o_tbytes, &tpackets);
if (rbytes) *rbytes = o_rbytes;
if (tbytes) *tbytes = o_tbytes;
*bytes += (o_rbytes + o_tbytes);
if (bytes) *bytes += (o_rbytes + o_tbytes);
}
tfree(line);
......@@ -422,6 +424,45 @@ bool taosGetBandSpeed(float *bandSpeedKb) {
return true;
}
bool taosGetNetworkIO(float *netInKb, float *netOutKb) {
static int64_t lastBytesIn = 0, lastBytesOut = 0;
static time_t lastTimeIO = 0;
int64_t curBytesIn = 0, curBytesOut = 0;
time_t curTime = time(NULL);
if (!taosGetCardInfo(NULL, &curBytesIn, &curBytesOut)) {
return false;
}
if (lastTimeIO == 0 || lastBytesIn == 0 || lastBytesOut == 0) {
lastTimeIO = curTime;
lastBytesIn = curBytesIn; lastBytesOut = curBytesOut;
*netInKb = 0;
*netOutKb = 0;
return true;
}
if (lastTimeIO >= curTime || lastBytesIn > curBytesIn || lastBytesOut > curBytesOut) {
lastTimeIO = curTime;
lastBytesIn = curBytesIn; lastBytesOut = curBytesOut;
*netInKb = 0;
*netOutKb = 0;
return true;
}
double totalBytesIn = (double)(curBytesIn - lastBytesIn) / 1024 * 8; // Kb
*netInKb = (float)(totalBytesIn / (double)(curTime - lastTimeIO));
double totalBytesOut = (double)(curBytesOut - lastBytesOut) / 1024 * 8; // Kb
*netOutKb = (float)(totalBytesOut / (double)(curTime - lastTimeIO));
lastTimeIO = curTime;
lastBytesIn = curBytesIn;
lastBytesOut = curBytesOut;
return true;
}
bool taosReadProcIO(int64_t *rchars, int64_t *wchars) {
FILE *fp = fopen(tsProcIOFile, "r");
if (fp == NULL) {
......
......@@ -51,6 +51,7 @@ typedef enum {
//followings are extension for taoskeeper
MON_CMD_CREATE_TB_CLUSTER,
MON_CMD_CREATE_MT_DNODES,
MON_CMD_CREATE_TB_DNODE,
MON_CMD_CREATE_MT_DISKS,
MON_CMD_CREATE_MT_VGROUPS,
MON_CMD_MAX
......@@ -75,6 +76,7 @@ typedef struct {
static SMonConn tsMonitor = {0};
static void monSaveSystemInfo();
static void monSaveClusterInfo();
static void monSaveDnodesInfo();
static void *monThreadFunc(void *param);
static void monBuildMonitorSql(char *sql, int32_t cmd);
extern int32_t (*monStartSystemFp)();
......@@ -184,6 +186,7 @@ static void *monThreadFunc(void *param) {
if (accessTimes % tsMonitorInterval == 0 || accessTimes == 1) {
monSaveSystemInfo();
monSaveClusterInfo();
monSaveDnodesInfo();
}
}
}
......@@ -278,6 +281,9 @@ static void monBuildMonitorSql(char *sql, int32_t cmd) {
", has_mnode bool"
") tags (dnode_id int, dnode_ep binary(%d))",
tsMonitorDbName, TSDB_EP_LEN);
} else if (cmd == MON_CMD_CREATE_TB_DNODE) {
snprintf(sql, SQL_LENGTH, "create table if not exists %s.dnode_%d using %s.dnodes_info tags(%d, '%s')", tsMonitorDbName,
dnodeGetDnodeId(), tsMonitorDbName, dnodeGetDnodeId(), tsLocalEp);
} else if (cmd == MON_CMD_CREATE_MT_DISKS) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.disks_info(ts timestamp"
......@@ -620,6 +626,110 @@ static int32_t monBuildConnsTotalSql(char *sql) {
return sprintf(sql, ", %d)", totalConns);
}
static int32_t monBuildDnodeUptimeSql(char *sql) {
int64_t dnodeUptime = 0;
TAOS_RES *result = taos_query(tsMonitor.conn, "show dnodes");
TAOS_ROW row;
int32_t num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result))) {
for (int i = 0; i < num_fields; ++i) {
if (strcmp(fields[i].name, "create_time") == 0) {
int64_t now = taosGetTimestamp(TSDB_TIME_PRECISION_MILLI);
//dnodes uptime in seconds
dnodeUptime = (now - *(int64_t *)row[i]) / 1000;
}
}
}
taos_free_result(result);
return sprintf(sql, ", %" PRId64, dnodeUptime);
}
static int32_t monBuildNetworkIOSql(char *sql) {
float netInKb = 0, netOutKb = 0;
bool suc = taosGetNetworkIO(&netInKb, &netOutKb);
if (!suc) {
monDebug("failed to get network I/O info");
}
return sprintf(sql, ", %f, %f", netInKb, netOutKb);
}
static int32_t monBuildReqRateSql(char *sql) {
SStatisInfo info = dnodeGetStatisInfo();
float interval = tsMonitorInterval * 1.0;
float httpReqRate = info.httpReqNum / interval;
float queryReqRate = info.queryReqNum / interval;
float submitReqRate = info.submitReqNum / interval;
return sprintf(sql, ", %d, %f, %d, %f, %d, %f", info.httpReqNum, httpReqRate,
info.queryReqNum, queryReqRate,
info.submitReqNum, submitReqRate);
}
static int32_t monBuildDnodeErrorsSql(char *sql) {
int32_t dnode_err = 0;
return sprintf(sql, ", %d", dnode_err);
}
static int32_t monBuildDnodeVnodesSql(char *sql) {
int32_t vnodeNum = 0, masterNum = 0;
char sqlStr[TSDB_EP_LEN + 15];
memset(sqlStr, 0, sizeof(sqlStr));
sprintf(sqlStr, "show vnodes \"%s\"", tsLocalEp);
TAOS_RES *result = taos_query(tsMonitor.conn, sqlStr);
TAOS_ROW row;
int32_t num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result))) {
vnodeNum += 1;
for (int i = 0; i < num_fields; ++i) {
if (strcmp(fields[i].name, "status") == 0) {
int32_t charLen = monGetRowElemCharLen(fields[i], (char *)row[i]);
if (strncmp((char *)row[i], "master", charLen) == 0) {
masterNum += 1;
}
}
}
}
taos_free_result(result);
return sprintf(sql, ", %d, %d", vnodeNum, masterNum);
}
static int32_t monBuildDnodeMnodeSql(char *sql) {
bool has_mnode = false;
TAOS_RES *result = taos_query(tsMonitor.conn, "show mnodes");
TAOS_ROW row;
int32_t num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result))) {
for (int i = 0; i < num_fields; ++i) {
if (strcmp(fields[i].name, "end_point") == 0) {
int32_t charLen = monGetRowElemCharLen(fields[i], (char *)row[i]);
if (strncmp((char *)row[i], tsLocalEp, charLen) == 0) {
has_mnode = true;
}
}
}
}
taos_free_result(result);
return sprintf(sql, ", %s)", has_mnode ? "true" : "false");
}
static int32_t monBuildDnodeDiskSql(char *sql) {
float taosdDataDirGB = 0;
return sprintf(sql, ", %f, %f, %f", taosdDataDirGB, tsUsedDataDirGB, tsTotalDataDirGB);
}
static void monSaveClusterInfo() {
int64_t ts = taosGetTimestampUs();
char * sql = tsMonitor.sql;
......@@ -648,6 +758,35 @@ static void monSaveClusterInfo() {
}
}
static void monSaveDnodesInfo() {
int64_t ts = taosGetTimestampUs();
char * sql = tsMonitor.sql;
int32_t pos = snprintf(sql, SQL_LENGTH, "insert into %s.dnode_%d values(%" PRId64, tsMonitorDbName, dnodeGetDnodeId(), ts);
pos += monBuildDnodeUptimeSql(sql + pos);
pos += monBuildCpuSql(sql + pos);
pos += monBuildMemorySql(sql + pos);
pos += monBuildDnodeDiskSql(sql + pos);
pos += monBuildNetworkIOSql(sql + pos);
pos += monBuildIoSql(sql + pos);
pos += monBuildReqRateSql(sql + pos);
pos += monBuildDnodeErrorsSql(sql + pos);
pos += monBuildDnodeVnodesSql(sql + pos);
pos += monBuildDnodeMnodeSql(sql + pos);
monError("sql:%s", sql);
void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
int32_t code = taos_errno(res);
taos_free_result(res);
if (code != 0) {
monError("failed to save dnode_%d info, reason:%s, sql:%s", dnodeGetDnodeId(), tstrerror(code), tsMonitor.sql);
} else {
monDebug("successfully to save dnode_%d info, sql:%s", dnodeGetDnodeId(), tsMonitor.sql);
}
}
static void monExecSqlCb(void *param, TAOS_RES *result, int32_t code) {
int32_t c = taos_errno(result);
if (c != TSDB_CODE_SUCCESS) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册