From 32a703b2a0b30841f04f9bf7953483777883dacf Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 23 Sep 2021 09:45:31 +0800 Subject: [PATCH] [TD-6452]: taoskeeper metrics collector phase 1 taosd implementation --- src/inc/monitor.h | 6 + src/mnode/inc/mnodeInt.h | 6 +- src/plugins/monitor/src/monMain.c | 201 +++++++++++++++++++++++++++++- 3 files changed, 206 insertions(+), 7 deletions(-) diff --git a/src/inc/monitor.h b/src/inc/monitor.h index d2e5e06487..3744d0b11a 100644 --- a/src/inc/monitor.h +++ b/src/inc/monitor.h @@ -22,6 +22,11 @@ extern "C" { #include +#define monSaveLogs(level, ...) { \ + monSaveLog(level, __VA_ARGS__); \ + monSaveDnodeLog(level, __VA_ARGS__); \ +} + typedef struct { char * acctId; int64_t currentPointsPerSecond; @@ -53,6 +58,7 @@ void monStopSystem(); void monCleanupSystem(); void monSaveAcctLog(SAcctMonitorObj *pMonObj); void monSaveLog(int32_t level, const char *const format, ...); +void monSaveDnodeLog(int32_t level, const char *const format, ...); void monExecuteSQL(char *sql); typedef void (*MonExecuteSQLCbFP)(void *param, TAOS_RES *, int code); void monExecuteSQLWithResultCallback(char *sql, MonExecuteSQLCbFP callback, void* param); diff --git a/src/mnode/inc/mnodeInt.h b/src/mnode/inc/mnodeInt.h index 7a791d76e6..aefdf23bda 100644 --- a/src/mnode/inc/mnodeInt.h +++ b/src/mnode/inc/mnodeInt.h @@ -41,9 +41,9 @@ extern int32_t sdbDebugFlag; #define sdbDebug(...) { if (sdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }} #define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }} -#define mLError(...) { monSaveLog(2, __VA_ARGS__); mError(__VA_ARGS__) } -#define mLWarn(...) { monSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) } -#define mLInfo(...) { monSaveLog(0, __VA_ARGS__); mInfo(__VA_ARGS__) } +#define mLError(...) { monSaveLogs(2, __VA_ARGS__); mError(__VA_ARGS__) } +#define mLWarn(...) { monSaveLogs(1, __VA_ARGS__); mWarn(__VA_ARGS__) } +#define mLInfo(...) { monSaveLogs(0, __VA_ARGS__); mInfo(__VA_ARGS__) } #ifdef __cplusplus } diff --git a/src/plugins/monitor/src/monMain.c b/src/plugins/monitor/src/monMain.c index 68eeb5ca2f..5a26722c79 100644 --- a/src/plugins/monitor/src/monMain.c +++ b/src/plugins/monitor/src/monMain.c @@ -38,6 +38,7 @@ #define IP_LEN_STR TSDB_EP_LEN #define VGROUP_STATUS_LEN 512 #define DNODE_INFO_LEN 128 +#define QUERY_ID_LEN 24 #define CHECK_INTERVAL 1000 typedef enum { @@ -54,6 +55,8 @@ typedef enum { MON_CMD_CREATE_TB_DNODE, MON_CMD_CREATE_MT_DISKS, MON_CMD_CREATE_MT_VGROUPS, + MON_CMD_CREATE_MT_LOGS, + MON_CMD_CREATE_TB_DNODE_LOG, MON_CMD_MAX } EMonCmd; @@ -77,6 +80,8 @@ static SMonConn tsMonitor = {0}; static void monSaveSystemInfo(); static void monSaveClusterInfo(); static void monSaveDnodesInfo(); +static void monSaveVgroupsInfo(); +static void monSaveSlowQueryInfo(); static void *monThreadFunc(void *param); static void monBuildMonitorSql(char *sql, int32_t cmd); extern int32_t (*monStartSystemFp)(); @@ -187,6 +192,8 @@ static void *monThreadFunc(void *param) { monSaveSystemInfo(); monSaveClusterInfo(); monSaveDnodesInfo(); + monSaveVgroupsInfo(); + monSaveSlowQueryInfo(); } } } @@ -244,9 +251,9 @@ static void monBuildMonitorSql(char *sql, int32_t cmd) { tsMonitorDbName, TSDB_DEFAULT_USER); } else if (cmd == MON_CMD_CREATE_TB_SLOWQUERY) { snprintf(sql, SQL_LENGTH, - "create table if not exists %s.slowquery(ts timestamp, username " - "binary(%d), created_time timestamp, time bigint, sql binary(%d))", - tsMonitorDbName, TSDB_TABLE_FNAME_LEN - 1, TSDB_SLOW_QUERY_SQL_LEN); + "create table if not exists %s.slowquery(ts timestamp, query_id " + "binary(%d), username binary(%d), qid binary(%d), created_time timestamp, time bigint, end_point binary(%d), sql binary(%d))", + tsMonitorDbName, QUERY_ID_LEN, TSDB_TABLE_FNAME_LEN - 1, QUERY_ID_LEN, TSDB_EP_LEN, TSDB_SLOW_QUERY_SQL_LEN); } else if (cmd == MON_CMD_CREATE_TB_LOG) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.log(ts timestamp, level tinyint, " @@ -295,13 +302,21 @@ static void monBuildMonitorSql(char *sql, int32_t cmd) { } else if (cmd == MON_CMD_CREATE_MT_VGROUPS) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.vgroups_info(ts timestamp" - ", database_name binary(%d), is_mnode bool" + ", database_name binary(%d)" ", tables_num int, status binary(%d)" ", online_vnodes tinyint" ", dnode_ids binary(%d), dnode_roles binary(%d)" ") tags (vgroup_id int)", tsMonitorDbName, TSDB_DB_NAME_LEN, VGROUP_STATUS_LEN, DNODE_INFO_LEN, DNODE_INFO_LEN); + } else if (cmd == MON_CMD_CREATE_MT_LOGS) { + snprintf(sql, SQL_LENGTH, + "create table if not exists %s.logs(ts timestamp, level tinyint, " + "content binary(%d)) tags (dnode_id int, dnode_ep binary(%d))", + tsMonitorDbName, LOG_LEN_STR, TSDB_EP_LEN); + } else if (cmd == MON_CMD_CREATE_TB_DNODE_LOG) { + snprintf(sql, SQL_LENGTH, "create table if not exists %s.dnode_%d_log using %s.logs tags(%d, '%s')", tsMonitorDbName, + dnodeGetDnodeId(), tsMonitorDbName, dnodeGetDnodeId(), tsLocalEp); } sql[SQL_LENGTH] = 0; @@ -787,6 +802,163 @@ static void monSaveDnodesInfo() { } } + +static int32_t checkCreateVgroupTable(int32_t vgId) { + char subsql[256]; + bool create_table = false; + int32_t code = TSDB_CODE_SUCCESS; + + memset(subsql, 0, sizeof(subsql)); + sprintf(subsql, "describe %s.vgroup_%d", tsMonitorDbName, vgId); + + TAOS_RES *result = taos_query(tsMonitor.conn, subsql); + code = taos_errno(result); + if (code != 0) { + create_table = true; + snprintf(subsql, SQL_LENGTH, "create table if not exists %s.vgroup_%d using %s.vgroups_info tags(%d)", + tsMonitorDbName, vgId, tsMonitorDbName, vgId); + monError("table vgroup_%d not exist, create table vgroup_%d", vgId, vgId); + } + taos_free_result(result); + + if (create_table == true) { + result = taos_query(tsMonitor.conn, subsql); + code = taos_errno(result); + taos_free_result(result); + } + + return code; +} + +static uint32_t monBuildVgroupsInfoSql(char *sql, char *dbName) { + char v_dnode_ids[256], v_dnode_status[1024]; + int64_t ts = taosGetTimestampUs(); + + memset(sql, 0, SQL_LENGTH + 1); + sprintf(sql, "show %s.vgroups", dbName); + TAOS_RES *result = taos_query(tsMonitor.conn, sql); + + TAOS_ROW row; + int32_t num_fields = taos_num_fields(result); + TAOS_FIELD *fields = taos_fetch_fields(result); + + while ((row = taos_fetch_row(result))) { + int32_t vgId; + int32_t pos = 0; + + for (int i = 0; i < num_fields; ++i) { + const char *v_dnode_str = strchr(fields[i].name, '_'); + if (strcmp(fields[i].name, "vgId") == 0) { + vgId = *(int32_t *)row[i]; + if (checkCreateVgroupTable(vgId) == TSDB_CODE_SUCCESS) { + memset(sql, 0, SQL_LENGTH + 1); + pos += snprintf(sql, SQL_LENGTH, "insert into %s.vgroup_%d values(%" PRId64 ", \"%s\"", + tsMonitorDbName, vgId, ts, dbName); + } else { + return TSDB_CODE_SUCCESS; + } + } else if (strcmp(fields[i].name, "tables") == 0) { + pos += snprintf(sql + pos, SQL_LENGTH, ", %d", *(int32_t *)row[i]); + + } else if (strcmp(fields[i].name, "status") == 0) { + pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\"", (char *)row[i]); + } else if (strcmp(fields[i].name, "onlines") == 0) { + pos += snprintf(sql + pos, SQL_LENGTH, ", %d", *(int32_t *)row[i]); + } else if (v_dnode_str && strcmp(v_dnode_str, "_dnode") == 0) { + snprintf(v_dnode_ids, sizeof(v_dnode_ids), "%d;", *(int16_t *)row[i]); + } else if (v_dnode_str && strcmp(v_dnode_str, "_status") == 0) { + snprintf(v_dnode_status, sizeof(v_dnode_status), "%s;", (char *)row[i]); + } else if (strcmp(fields[i].name, "compacting") == 0) { + //flush dnode_ids and dnode_role in to sql + pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\", \"%s\")", v_dnode_ids, v_dnode_status); + } + } + monError("sql:%s", sql); + TAOS_RES *res = taos_query(tsMonitor.conn, sql); + int32_t code = taos_errno(res); + taos_free_result(res); + if (code != 0) { + monError("failed to save vgroup_%d info, reason:%s, sql:%s", vgId, tstrerror(code), tsMonitor.sql); + } else { + monDebug("successfully to save vgroup_%d info, sql:%s", vgId, tsMonitor.sql); + } + } + taos_free_result(result); + + return TSDB_CODE_SUCCESS; +} + +static void monSaveVgroupsInfo() { + char * sql = tsMonitor.sql; + TAOS_RES *result = taos_query(tsMonitor.conn, "show databases"); + + TAOS_ROW row; + int32_t num_fields = taos_num_fields(result); + TAOS_FIELD *fields = taos_fetch_fields(result); + + while ((row = taos_fetch_row(result))) { + for (int i = 0; i < num_fields; ++i) { + //database name + if (strcmp(fields[i].name, "name") == 0) { + monBuildVgroupsInfoSql(sql, (char *)row[i]); + } + } + } + + taos_free_result(result); +} + +static void monSaveSlowQueryInfo() { + int64_t ts = taosGetTimestampUs(); + char * sql = tsMonitor.sql; + int32_t pos = snprintf(sql, SQL_LENGTH, "insert into %s.slowquery values(%" PRId64, tsMonitorDbName, ts); + bool has_slowquery = false; + + TAOS_RES *result = taos_query(tsMonitor.conn, "show queries"); + + TAOS_ROW row; + int32_t num_fields = taos_num_fields(result); + TAOS_FIELD *fields = taos_fetch_fields(result); + + while ((row = taos_fetch_row(result))) { + for (int i = 0; i < num_fields; ++i) { + if (strcmp(fields[i].name, "query_id") == 0) { + has_slowquery = true; + pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\"", (char *)row[i]); + } else if (strcmp(fields[i].name, "user") == 0) { + pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\"", (char *)row[i]); + } else if (strcmp(fields[i].name, "qid") == 0) { + pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\"", (char *)row[i]); + } else if (strcmp(fields[i].name, "created_time") == 0) { + pos += snprintf(sql + pos, SQL_LENGTH, ", %" PRId64 "", *(int64_t *)row[i]); + } else if (strcmp(fields[i].name, "time") == 0) { + pos += snprintf(sql + pos, SQL_LENGTH, ", %" PRId64 "", *(int64_t *)row[i]); + } else if (strcmp(fields[i].name, "ep") == 0) { + pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\"", (char *)row[i]); + } else if (strcmp(fields[i].name, "sql") == 0) { + pos += snprintf(sql + pos, SQL_LENGTH, ", \"%s\")", (char *)row[i]); + } + } + } + + monError("sql:%s", sql); + taos_free_result(result); + if (!has_slowquery) { + return; + } + void *res = taos_query(tsMonitor.conn, tsMonitor.sql); + int32_t code = taos_errno(res); + taos_free_result(res); + + if (code != 0) { + monError("failed to save slowquery info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql); + } else { + monDebug("successfully to save slowquery info, sql:%s", tsMonitor.sql); + } + + +} + static void monExecSqlCb(void *param, TAOS_RES *result, int32_t code) { int32_t c = taos_errno(result); if (c != TSDB_CODE_SUCCESS) { @@ -854,6 +1026,27 @@ void monSaveLog(int32_t level, const char *const format, ...) { taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "log"); } +void monSaveDnodeLog(int32_t level, const char *const format, ...) { + if (tsMonitor.state != MON_STATE_INITED) return; + + va_list argpointer; + char sql[SQL_LENGTH] = {0}; + int32_t max_length = SQL_LENGTH - 30; + int32_t len = snprintf(sql, (size_t)max_length, "insert into %s.dnode_%d_log values(%" PRId64 ", %d,'", tsMonitorDbName, + dnodeGetDnodeId(), taosGetTimestampUs(), level); + + va_start(argpointer, format); + len += vsnprintf(sql + len, (size_t)(max_length - len), format, argpointer); + va_end(argpointer); + if (len > max_length) len = max_length; + + len += sprintf(sql + len, "')"); + sql[len++] = 0; + + monError("save log, sql: %s", sql); + taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "log"); +} + void monExecuteSQL(char *sql) { if (tsMonitor.state != MON_STATE_INITED) return; -- GitLab