提交 32a703b2 编写于 作者: G Ganlin Zhao

[TD-6452]<feature>: taoskeeper metrics collector phase 1 taosd implementation

上级 aefabc73
...@@ -22,6 +22,11 @@ extern "C" { ...@@ -22,6 +22,11 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#define monSaveLogs(level, ...) { \
monSaveLog(level, __VA_ARGS__); \
monSaveDnodeLog(level, __VA_ARGS__); \
}
typedef struct { typedef struct {
char * acctId; char * acctId;
int64_t currentPointsPerSecond; int64_t currentPointsPerSecond;
...@@ -53,6 +58,7 @@ void monStopSystem(); ...@@ -53,6 +58,7 @@ void monStopSystem();
void monCleanupSystem(); void monCleanupSystem();
void monSaveAcctLog(SAcctMonitorObj *pMonObj); void monSaveAcctLog(SAcctMonitorObj *pMonObj);
void monSaveLog(int32_t level, const char *const format, ...); void monSaveLog(int32_t level, const char *const format, ...);
void monSaveDnodeLog(int32_t level, const char *const format, ...);
void monExecuteSQL(char *sql); void monExecuteSQL(char *sql);
typedef void (*MonExecuteSQLCbFP)(void *param, TAOS_RES *, int code); typedef void (*MonExecuteSQLCbFP)(void *param, TAOS_RES *, int code);
void monExecuteSQLWithResultCallback(char *sql, MonExecuteSQLCbFP callback, void* param); void monExecuteSQLWithResultCallback(char *sql, MonExecuteSQLCbFP callback, void* param);
......
...@@ -41,9 +41,9 @@ extern int32_t sdbDebugFlag; ...@@ -41,9 +41,9 @@ extern int32_t sdbDebugFlag;
#define sdbDebug(...) { if (sdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }} #define sdbDebug(...) { if (sdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }}
#define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }} #define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }}
#define mLError(...) { monSaveLog(2, __VA_ARGS__); mError(__VA_ARGS__) } #define mLError(...) { monSaveLogs(2, __VA_ARGS__); mError(__VA_ARGS__) }
#define mLWarn(...) { monSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) } #define mLWarn(...) { monSaveLogs(1, __VA_ARGS__); mWarn(__VA_ARGS__) }
#define mLInfo(...) { monSaveLog(0, __VA_ARGS__); mInfo(__VA_ARGS__) } #define mLInfo(...) { monSaveLogs(0, __VA_ARGS__); mInfo(__VA_ARGS__) }
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -38,6 +38,7 @@ ...@@ -38,6 +38,7 @@
#define IP_LEN_STR TSDB_EP_LEN #define IP_LEN_STR TSDB_EP_LEN
#define VGROUP_STATUS_LEN 512 #define VGROUP_STATUS_LEN 512
#define DNODE_INFO_LEN 128 #define DNODE_INFO_LEN 128
#define QUERY_ID_LEN 24
#define CHECK_INTERVAL 1000 #define CHECK_INTERVAL 1000
typedef enum { typedef enum {
...@@ -54,6 +55,8 @@ typedef enum { ...@@ -54,6 +55,8 @@ typedef enum {
MON_CMD_CREATE_TB_DNODE, MON_CMD_CREATE_TB_DNODE,
MON_CMD_CREATE_MT_DISKS, MON_CMD_CREATE_MT_DISKS,
MON_CMD_CREATE_MT_VGROUPS, MON_CMD_CREATE_MT_VGROUPS,
MON_CMD_CREATE_MT_LOGS,
MON_CMD_CREATE_TB_DNODE_LOG,
MON_CMD_MAX MON_CMD_MAX
} EMonCmd; } EMonCmd;
...@@ -77,6 +80,8 @@ static SMonConn tsMonitor = {0}; ...@@ -77,6 +80,8 @@ static SMonConn tsMonitor = {0};
static void monSaveSystemInfo(); static void monSaveSystemInfo();
static void monSaveClusterInfo(); static void monSaveClusterInfo();
static void monSaveDnodesInfo(); static void monSaveDnodesInfo();
static void monSaveVgroupsInfo();
static void monSaveSlowQueryInfo();
static void *monThreadFunc(void *param); static void *monThreadFunc(void *param);
static void monBuildMonitorSql(char *sql, int32_t cmd); static void monBuildMonitorSql(char *sql, int32_t cmd);
extern int32_t (*monStartSystemFp)(); extern int32_t (*monStartSystemFp)();
...@@ -187,6 +192,8 @@ static void *monThreadFunc(void *param) { ...@@ -187,6 +192,8 @@ static void *monThreadFunc(void *param) {
monSaveSystemInfo(); monSaveSystemInfo();
monSaveClusterInfo(); monSaveClusterInfo();
monSaveDnodesInfo(); monSaveDnodesInfo();
monSaveVgroupsInfo();
monSaveSlowQueryInfo();
} }
} }
} }
...@@ -244,9 +251,9 @@ static void monBuildMonitorSql(char *sql, int32_t cmd) { ...@@ -244,9 +251,9 @@ static void monBuildMonitorSql(char *sql, int32_t cmd) {
tsMonitorDbName, TSDB_DEFAULT_USER); tsMonitorDbName, TSDB_DEFAULT_USER);
} else if (cmd == MON_CMD_CREATE_TB_SLOWQUERY) { } else if (cmd == MON_CMD_CREATE_TB_SLOWQUERY) {
snprintf(sql, SQL_LENGTH, snprintf(sql, SQL_LENGTH,
"create table if not exists %s.slowquery(ts timestamp, username " "create table if not exists %s.slowquery(ts timestamp, query_id "
"binary(%d), created_time timestamp, time bigint, sql binary(%d))", "binary(%d), username binary(%d), qid binary(%d), created_time timestamp, time bigint, end_point binary(%d), sql binary(%d))",
tsMonitorDbName, TSDB_TABLE_FNAME_LEN - 1, TSDB_SLOW_QUERY_SQL_LEN); tsMonitorDbName, QUERY_ID_LEN, TSDB_TABLE_FNAME_LEN - 1, QUERY_ID_LEN, TSDB_EP_LEN, TSDB_SLOW_QUERY_SQL_LEN);
} else if (cmd == MON_CMD_CREATE_TB_LOG) { } else if (cmd == MON_CMD_CREATE_TB_LOG) {
snprintf(sql, SQL_LENGTH, snprintf(sql, SQL_LENGTH,
"create table if not exists %s.log(ts timestamp, level tinyint, " "create table if not exists %s.log(ts timestamp, level tinyint, "
...@@ -295,13 +302,21 @@ static void monBuildMonitorSql(char *sql, int32_t cmd) { ...@@ -295,13 +302,21 @@ static void monBuildMonitorSql(char *sql, int32_t cmd) {
} else if (cmd == MON_CMD_CREATE_MT_VGROUPS) { } else if (cmd == MON_CMD_CREATE_MT_VGROUPS) {
snprintf(sql, SQL_LENGTH, snprintf(sql, SQL_LENGTH,
"create table if not exists %s.vgroups_info(ts timestamp" "create table if not exists %s.vgroups_info(ts timestamp"
", database_name binary(%d), is_mnode bool" ", database_name binary(%d)"
", tables_num int, status binary(%d)" ", tables_num int, status binary(%d)"
", online_vnodes tinyint" ", online_vnodes tinyint"
", dnode_ids binary(%d), dnode_roles binary(%d)" ", dnode_ids binary(%d), dnode_roles binary(%d)"
") tags (vgroup_id int)", ") tags (vgroup_id int)",
tsMonitorDbName, TSDB_DB_NAME_LEN, VGROUP_STATUS_LEN, tsMonitorDbName, TSDB_DB_NAME_LEN, VGROUP_STATUS_LEN,
DNODE_INFO_LEN, DNODE_INFO_LEN); DNODE_INFO_LEN, DNODE_INFO_LEN);
} else if (cmd == MON_CMD_CREATE_MT_LOGS) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.logs(ts timestamp, level tinyint, "
"content binary(%d)) tags (dnode_id int, dnode_ep binary(%d))",
tsMonitorDbName, LOG_LEN_STR, TSDB_EP_LEN);
} else if (cmd == MON_CMD_CREATE_TB_DNODE_LOG) {
snprintf(sql, SQL_LENGTH, "create table if not exists %s.dnode_%d_log using %s.logs tags(%d, '%s')", tsMonitorDbName,
dnodeGetDnodeId(), tsMonitorDbName, dnodeGetDnodeId(), tsLocalEp);
} }
sql[SQL_LENGTH] = 0; sql[SQL_LENGTH] = 0;
...@@ -787,6 +802,163 @@ static void monSaveDnodesInfo() { ...@@ -787,6 +802,163 @@ static void monSaveDnodesInfo() {
} }
} }
static int32_t checkCreateVgroupTable(int32_t vgId) {
char subsql[256];
bool create_table = false;
int32_t code = TSDB_CODE_SUCCESS;
memset(subsql, 0, sizeof(subsql));
sprintf(subsql, "describe %s.vgroup_%d", tsMonitorDbName, vgId);
TAOS_RES *result = taos_query(tsMonitor.conn, subsql);
code = taos_errno(result);
if (code != 0) {
create_table = true;
snprintf(subsql, SQL_LENGTH, "create table if not exists %s.vgroup_%d using %s.vgroups_info tags(%d)",
tsMonitorDbName, vgId, tsMonitorDbName, vgId);
monError("table vgroup_%d not exist, create table vgroup_%d", vgId, vgId);
}
taos_free_result(result);
if (create_table == true) {
result = taos_query(tsMonitor.conn, subsql);
code = taos_errno(result);
taos_free_result(result);
}
return code;
}
static uint32_t monBuildVgroupsInfoSql(char *sql, char *dbName) {
char v_dnode_ids[256], v_dnode_status[1024];
int64_t ts = taosGetTimestampUs();
memset(sql, 0, SQL_LENGTH + 1);
sprintf(sql, "show %s.vgroups", dbName);
TAOS_RES *result = taos_query(tsMonitor.conn, sql);
TAOS_ROW row;
int32_t num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result))) {
int32_t vgId;
int32_t pos = 0;
for (int i = 0; i < num_fields; ++i) {
const char *v_dnode_str = strchr(fields[i].name, '_');
if (strcmp(fields[i].name, "vgId") == 0) {
vgId = *(int32_t *)row[i];
if (checkCreateVgroupTable(vgId) == TSDB_CODE_SUCCESS) {
memset(sql, 0, SQL_LENGTH + 1);
pos += snprintf(sql, SQL_LENGTH, "insert into %s.vgroup_%d values(%" PRId64 ", \"%s\"",
tsMonitorDbName, vgId, ts, dbName);
} else {
return TSDB_CODE_SUCCESS;
}
} else if (strcmp(fields[i].name, "tables") == 0) {
pos += snprintf(sql + pos, SQL_LENGTH, ", %d", *(int32_t *)row[i]);
} else if (strcmp(fields[i].name, "status") == 0) {
pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\"", (char *)row[i]);
} else if (strcmp(fields[i].name, "onlines") == 0) {
pos += snprintf(sql + pos, SQL_LENGTH, ", %d", *(int32_t *)row[i]);
} else if (v_dnode_str && strcmp(v_dnode_str, "_dnode") == 0) {
snprintf(v_dnode_ids, sizeof(v_dnode_ids), "%d;", *(int16_t *)row[i]);
} else if (v_dnode_str && strcmp(v_dnode_str, "_status") == 0) {
snprintf(v_dnode_status, sizeof(v_dnode_status), "%s;", (char *)row[i]);
} else if (strcmp(fields[i].name, "compacting") == 0) {
//flush dnode_ids and dnode_role in to sql
pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\", \"%s\")", v_dnode_ids, v_dnode_status);
}
}
monError("sql:%s", sql);
TAOS_RES *res = taos_query(tsMonitor.conn, sql);
int32_t code = taos_errno(res);
taos_free_result(res);
if (code != 0) {
monError("failed to save vgroup_%d info, reason:%s, sql:%s", vgId, tstrerror(code), tsMonitor.sql);
} else {
monDebug("successfully to save vgroup_%d info, sql:%s", vgId, tsMonitor.sql);
}
}
taos_free_result(result);
return TSDB_CODE_SUCCESS;
}
static void monSaveVgroupsInfo() {
char * sql = tsMonitor.sql;
TAOS_RES *result = taos_query(tsMonitor.conn, "show databases");
TAOS_ROW row;
int32_t num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result))) {
for (int i = 0; i < num_fields; ++i) {
//database name
if (strcmp(fields[i].name, "name") == 0) {
monBuildVgroupsInfoSql(sql, (char *)row[i]);
}
}
}
taos_free_result(result);
}
static void monSaveSlowQueryInfo() {
int64_t ts = taosGetTimestampUs();
char * sql = tsMonitor.sql;
int32_t pos = snprintf(sql, SQL_LENGTH, "insert into %s.slowquery values(%" PRId64, tsMonitorDbName, ts);
bool has_slowquery = false;
TAOS_RES *result = taos_query(tsMonitor.conn, "show queries");
TAOS_ROW row;
int32_t num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
while ((row = taos_fetch_row(result))) {
for (int i = 0; i < num_fields; ++i) {
if (strcmp(fields[i].name, "query_id") == 0) {
has_slowquery = true;
pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\"", (char *)row[i]);
} else if (strcmp(fields[i].name, "user") == 0) {
pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\"", (char *)row[i]);
} else if (strcmp(fields[i].name, "qid") == 0) {
pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\"", (char *)row[i]);
} else if (strcmp(fields[i].name, "created_time") == 0) {
pos += snprintf(sql + pos, SQL_LENGTH, ", %" PRId64 "", *(int64_t *)row[i]);
} else if (strcmp(fields[i].name, "time") == 0) {
pos += snprintf(sql + pos, SQL_LENGTH, ", %" PRId64 "", *(int64_t *)row[i]);
} else if (strcmp(fields[i].name, "ep") == 0) {
pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\"", (char *)row[i]);
} else if (strcmp(fields[i].name, "sql") == 0) {
pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\")", (char *)row[i]);
}
}
}
monError("sql:%s", sql);
taos_free_result(result);
if (!has_slowquery) {
return;
}
void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
int32_t code = taos_errno(res);
taos_free_result(res);
if (code != 0) {
monError("failed to save slowquery info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql);
} else {
monDebug("successfully to save slowquery info, sql:%s", tsMonitor.sql);
}
}
static void monExecSqlCb(void *param, TAOS_RES *result, int32_t code) { static void monExecSqlCb(void *param, TAOS_RES *result, int32_t code) {
int32_t c = taos_errno(result); int32_t c = taos_errno(result);
if (c != TSDB_CODE_SUCCESS) { if (c != TSDB_CODE_SUCCESS) {
...@@ -854,6 +1026,27 @@ void monSaveLog(int32_t level, const char *const format, ...) { ...@@ -854,6 +1026,27 @@ void monSaveLog(int32_t level, const char *const format, ...) {
taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "log"); taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "log");
} }
void monSaveDnodeLog(int32_t level, const char *const format, ...) {
if (tsMonitor.state != MON_STATE_INITED) return;
va_list argpointer;
char sql[SQL_LENGTH] = {0};
int32_t max_length = SQL_LENGTH - 30;
int32_t len = snprintf(sql, (size_t)max_length, "insert into %s.dnode_%d_log values(%" PRId64 ", %d,'", tsMonitorDbName,
dnodeGetDnodeId(), taosGetTimestampUs(), level);
va_start(argpointer, format);
len += vsnprintf(sql + len, (size_t)(max_length - len), format, argpointer);
va_end(argpointer);
if (len > max_length) len = max_length;
len += sprintf(sql + len, "')");
sql[len++] = 0;
monError("save log, sql: %s", sql);
taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "log");
}
void monExecuteSQL(char *sql) { void monExecuteSQL(char *sql) {
if (tsMonitor.state != MON_STATE_INITED) return; if (tsMonitor.state != MON_STATE_INITED) return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册