From 9a6880bc9ce1fbf99b8720015a8b31434fea635f Mon Sep 17 00:00:00 2001 From: Carman Zhang Date: Fri, 23 Nov 2018 11:41:56 +0800 Subject: [PATCH] support node-pod/namespace-workload sorting and paging --- .../v1alpha/monitoring/monitor_handler.go | 22 +++- pkg/client/prometheusclient.go | 3 + pkg/models/metrics/metrics.go | 104 +++++++++++++++--- pkg/models/metrics/metricsrule.go | 13 ++- pkg/models/metrics/metricsruleconst.go | 34 ++++-- 5 files changed, 147 insertions(+), 29 deletions(-) diff --git a/pkg/apis/v1alpha/monitoring/monitor_handler.go b/pkg/apis/v1alpha/monitoring/monitor_handler.go index 2cbbf812..6471b2f3 100755 --- a/pkg/apis/v1alpha/monitoring/monitor_handler.go +++ b/pkg/apis/v1alpha/monitoring/monitor_handler.go @@ -59,10 +59,22 @@ func (u Monitor) monitorContainer(request *restful.Request, response *restful.Re func (u Monitor) monitorWorkload(request *restful.Request, response *restful.Response) { requestParams := client.ParseMonitoringRequestParams(request) wlKind := requestParams.WorkloadKind + tp := requestParams.Tp if wlKind == "" { // count all workloads figure - res := metrics.MonitorWorkloadCount(requestParams.NsName) - response.WriteAsJson(res) + if tp == "rank" { + rawMetrics := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkload) + // sorting + sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics, metrics.MetricLevelWorkload) + // paging + pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount) + + response.WriteAsJson(pagedMetrics) + + } else { + res := metrics.MonitorWorkloadCount(requestParams.NsName) + response.WriteAsJson(res) + } } else { res := metrics.MonitorAllMetrics(requestParams, metrics.MetricLevelWorkload) response.WriteAsJson(res) @@ -349,6 +361,12 @@ func Register(ws *restful.WebService, subPath string) { Doc("monitor all workload level metrics"). Param(ws.PathParameter("ns_name", "namespace").DataType("string").Required(true).DefaultValue("kube-system")). Param(ws.QueryParameter("metrics_filter", "metrics name cpu memory...").DataType("string").Required(false)). + Param(ws.QueryParameter("workloads_filter", "pod re2 expression filter").DataType("string").Required(false).DefaultValue("")). + Param(ws.QueryParameter("sort_metric", "sort metric").DataType("string").Required(false)). + Param(ws.QueryParameter("sort_type", "ascending descending order").DataType("string").Required(false)). + Param(ws.QueryParameter("page", "page number").DataType("string").Required(false).DefaultValue("1")). + Param(ws.QueryParameter("limit", "metrics name cpu memory...in re2 regex").DataType("string").Required(false).DefaultValue("4")). + Param(ws.QueryParameter("type", "rank, statistic").DataType("string").Required(false).DefaultValue("rank")). Metadata(restfulspec.KeyOpenAPITags, tags)). Consumes(restful.MIME_JSON, restful.MIME_XML). Produces(restful.MIME_JSON) diff --git a/pkg/client/prometheusclient.go b/pkg/client/prometheusclient.go index c879db3c..32d9d1e2 100644 --- a/pkg/client/prometheusclient.go +++ b/pkg/client/prometheusclient.go @@ -63,6 +63,7 @@ type MonitoringRequestParams struct { ContainersFilter string MetricsName string WorkloadName string + WlFilter string NodeId string WsName string NsName string @@ -108,6 +109,7 @@ func ParseMonitoringRequestParams(request *restful.Request) *MonitoringRequestPa nodesFilter := strings.Trim(request.QueryParameter("nodes_filter"), " ") wsFilter := strings.Trim(request.QueryParameter("workspaces_filter"), " ") nsFilter := strings.Trim(request.QueryParameter("namespaces_filter"), " ") + wlFilter := strings.Trim(request.QueryParameter("workloads_filter"), " ") podsFilter := strings.Trim(request.QueryParameter("pods_filter"), " ") containersFilter := strings.Trim(request.QueryParameter("containers_filter"), " ") @@ -135,6 +137,7 @@ func ParseMonitoringRequestParams(request *restful.Request) *MonitoringRequestPa ContainersFilter: containersFilter, MetricsName: metricsName, WorkloadName: workloadName, + WlFilter: wlFilter, NodeId: nodeId, WsName: wsName, NsName: nsName, diff --git a/pkg/models/metrics/metrics.go b/pkg/models/metrics/metrics.go index 1d839d9e..46375167 100644 --- a/pkg/models/metrics/metrics.go +++ b/pkg/models/metrics/metrics.go @@ -120,6 +120,51 @@ type OneComponentStatus struct { Error string `json:"error,omitempty"` } +func renameWorkload(formatedMetric *FormatedMetric, relationMap map[string]string) { + if formatedMetric.Status == MetricStatusSuccess { + for i := 0; i < len(formatedMetric.Data.Result); i++ { + metricDesc := formatedMetric.Data.Result[i][ResultItemMetric] + metricDescMap, ensure := metricDesc.(map[string]interface{}) + if ensure { + if wl, exist := metricDescMap[MetricLevelWorkload]; exist { + if deployName, exist := relationMap[wl.(string)]; exist { + metricDescMap[MetricLevelWorkload] = deployName + } + } + } + } + } +} + +func getReplicaAndDeployRelation(nsName string) map[string]string { + rule := strings.Replace(WorkloadReplicaSetOwnerRule, "$1", nsName, -1) + + params := makeRequestParamString(rule, make(url.Values)) + + res := client.SendMonitoringRequest(client.DefaultQueryType, params) + formatedMetric := ReformatJson(res, "") + var relationMap = make(map[string]string) + if formatedMetric.Status == MetricStatusSuccess { + for i := 0; i < len(formatedMetric.Data.Result); i++ { + metricDesc := formatedMetric.Data.Result[i][ResultItemMetric] + metricDescMap, ensure := metricDesc.(map[string]interface{}) + if ensure { + if ownerKind, exist := metricDescMap["owner_kind"]; exist && ownerKind == ReplicaSet { + if ownerName, exist := metricDescMap["owner_name"]; exist { + replicaName, sure := ownerName.(string) + if sure { + deployName := replicaName[:strings.LastIndex(replicaName, "-")] + relationMap[replicaName] = deployName + } + } + } + } + + } + } + return relationMap +} + func getPodNameRegexInWorkload(res string) string { data := []byte(res) @@ -192,12 +237,12 @@ func unifyMetricHistoryTimeRange(fmtMetrics *FormatedMetric) { } } -func AssembleWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string) { +func AssembleSpecificWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string) { nsName := monitoringRequest.NsName wkName := monitoringRequest.WorkloadName - rule := MakeWorkloadRule(monitoringRequest.WorkloadKind, wkName, nsName) + rule := MakeSpecificWorkloadRule(monitoringRequest.WorkloadKind, wkName, nsName) paramValues := monitoringRequest.Params params := makeRequestParamString(rule, paramValues) @@ -212,6 +257,16 @@ func AssembleWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringReque return queryType, params } +func AssembleAllWorkloadMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string) { + queryType := monitoringRequest.QueryType + + paramValues := monitoringRequest.Params + + rule := MakeWorkloadPromQL(metricName, monitoringRequest.NsName, monitoringRequest.WlFilter) + params := makeRequestParamString(rule, paramValues) + return queryType, params +} + func AssemblePodMetricRequestInfo(monitoringRequest *client.MonitoringRequestParams, metricName string) (string, string, bool) { queryType := monitoringRequest.QueryType @@ -590,18 +645,39 @@ func MonitorAllMetrics(monitoringRequest *client.MonitoringRequestParams, resour } case MetricLevelWorkload: { - for _, metricName := range WorkloadMetricsNames { - bol, err := regexp.MatchString(metricsFilter, metricName) - if err == nil && bol { - wg.Add(1) - go func(metricName string) { - metricName = strings.TrimLeft(metricName, "workload_") - queryType, params := AssembleWorkloadMetricRequestInfo(monitoringRequest, metricName) - fmtMetrics := GetMetric(queryType, params, metricName) - unifyMetricHistoryTimeRange(fmtMetrics) - ch <- fmtMetrics - wg.Done() - }(metricName) + if monitoringRequest.Tp == "rank" { + // get relationship between replicaset and deployment + relationMap := getReplicaAndDeployRelation(monitoringRequest.NsName) + for _, metricName := range WorkloadMetricsNames { + bol, err := regexp.MatchString(metricsFilter, metricName) + if err == nil && bol { + wg.Add(1) + go func(metricName string) { + queryType, params := AssembleAllWorkloadMetricRequestInfo(monitoringRequest, metricName) + fmtMetrics := GetMetric(queryType, params, metricName) + + // rename replica workload name + renameWorkload(fmtMetrics, relationMap) + + ch <- fmtMetrics + wg.Done() + }(metricName) + } + } + } else { + for _, metricName := range WorkloadMetricsNames { + bol, err := regexp.MatchString(metricsFilter, metricName) + if err == nil && bol { + wg.Add(1) + go func(metricName string) { + metricName = strings.TrimLeft(metricName, "workload_") + queryType, params := AssembleSpecificWorkloadMetricRequestInfo(monitoringRequest, metricName) + fmtMetrics := GetMetric(queryType, params, metricName) + unifyMetricHistoryTimeRange(fmtMetrics) + ch <- fmtMetrics + wg.Done() + }(metricName) + } } } } diff --git a/pkg/models/metrics/metricsrule.go b/pkg/models/metrics/metricsrule.go index b0fe506a..e575696b 100755 --- a/pkg/models/metrics/metricsrule.go +++ b/pkg/models/metrics/metricsrule.go @@ -17,7 +17,18 @@ import ( "strings" ) -func MakeWorkloadRule(wkKind, wkName, namespace string) string { +func MakeWorkloadPromQL(metricName, nsName, wlFilter string) string { + if wlFilter == "" { + wlFilter = ".*" + } + + var promql = RulePromQLTmplMap[metricName] + promql = strings.Replace(promql, "$2", nsName, -1) + promql = strings.Replace(promql, "$3", wlFilter, -1) + return promql +} + +func MakeSpecificWorkloadRule(wkKind, wkName, namespace string) string { var rule = PodInfoRule if namespace == "" { namespace = ".*" diff --git a/pkg/models/metrics/metricsruleconst.go b/pkg/models/metrics/metricsruleconst.go index 5c5ac1f5..82332386 100644 --- a/pkg/models/metrics/metricsruleconst.go +++ b/pkg/models/metrics/metricsruleconst.go @@ -73,9 +73,10 @@ const ( ) const ( - NodeStatusRule = `kube_node_status_condition{condition="Ready"} > 0` - PodInfoRule = `kube_pod_info{created_by_kind="$1",created_by_name=$2,namespace="$3"}` - NamespaceLabelRule = `kube_namespace_labels` + NodeStatusRule = `kube_node_status_condition{condition="Ready"} > 0` + PodInfoRule = `kube_pod_info{created_by_kind="$1",created_by_name=$2,namespace="$3"}` + NamespaceLabelRule = `kube_namespace_labels` + WorkloadReplicaSetOwnerRule = `kube_pod_owner{namespace="$1", owner_name!="", owner_kind="ReplicaSet"}` ) const ( @@ -472,21 +473,30 @@ var RulePromQLTmplMap = MetricMap{ "namespace_ingresses_extensions_count": `sum(kube_ingress_labels{namespace!="", namespace=~"$1"}) by (namespace)`, // pod - "pod_cpu_usage": `sum(irate(container_cpu_usage_seconds_total{job="kubelet", namespace="$1", pod_name="$2", image!=""}[5m])) by (namespace, pod_name)`, - "pod_memory_usage": `sum(container_memory_usage_bytes{job="kubelet", namespace="$1", pod_name="$2", image!=""}) by (namespace, pod_name)`, - "pod_memory_usage_wo_cache": `sum(container_memory_usage_bytes{job="kubelet", namespace="$1", pod_name="$2", image!=""} - container_memory_cache{job="kubelet", namespace="$1", pod_name="$2",image!=""}) by (namespace, pod_name)`, + "pod_cpu_usage": `sum(irate(container_cpu_usage_seconds_total{job="kubelet", namespace="$1", pod_name!="", pod_name="$2", image!=""}[5m])) by (namespace, pod_name)`, + "pod_memory_usage": `sum(container_memory_usage_bytes{job="kubelet", namespace="$1", pod_name!="", pod_name="$2", image!=""}) by (namespace, pod_name)`, + "pod_memory_usage_wo_cache": `sum(container_memory_usage_bytes{job="kubelet", namespace="$1", pod_name!="", pod_name="$2", image!=""} - container_memory_cache{job="kubelet", namespace="$1", pod_name!="", pod_name="$2",image!=""}) by (namespace, pod_name)`, "pod_net_bytes_transmitted": `sum by (namespace, pod_name) (irate(container_network_transmit_bytes_total{namespace="$1", pod_name!="", pod_name="$2", interface="eth0", job="kubelet"}[5m]))`, "pod_net_bytes_received": `sum by (namespace, pod_name) (irate(container_network_receive_bytes_total{namespace="$1", pod_name!="", pod_name="$2", interface="eth0", job="kubelet"}[5m]))`, - "pod_cpu_usage_all": `sum(irate(container_cpu_usage_seconds_total{job="kubelet", namespace="$1", pod_name=~"$2", image!=""}[5m])) by (namespace, pod_name)`, - "pod_memory_usage_all": `sum(container_memory_usage_bytes{job="kubelet", namespace="$1", pod_name=~"$2", image!=""}) by (namespace, pod_name)`, - "pod_memory_usage_wo_cache_all": `sum(container_memory_usage_bytes{job="kubelet", namespace="$1", pod_name=~"$2", image!=""} - container_memory_cache{job="kubelet", namespace="$1", pod_name=~"$2", image!=""}) by (namespace, pod_name)`, + "pod_cpu_usage_all": `sum(irate(container_cpu_usage_seconds_total{job="kubelet", namespace="$1", pod_name!="", pod_name=~"$2", image!=""}[5m])) by (namespace, pod_name)`, + "pod_memory_usage_all": `sum(container_memory_usage_bytes{job="kubelet", namespace="$1", pod_name!="", pod_name=~"$2", image!=""}) by (namespace, pod_name)`, + "pod_memory_usage_wo_cache_all": `sum(container_memory_usage_bytes{job="kubelet", namespace="$1", pod_name!="", pod_name=~"$2", image!=""} - container_memory_cache{job="kubelet", namespace="$1", pod_name!="", pod_name=~"$2", image!=""}) by (namespace, pod_name)`, "pod_net_bytes_transmitted_all": `sum by (namespace, pod_name) (irate(container_network_transmit_bytes_total{namespace="$1", pod_name!="", pod_name=~"$2", interface="eth0", job="kubelet"}[5m]))`, "pod_net_bytes_received_all": `sum by (namespace, pod_name) (irate(container_network_receive_bytes_total{namespace="$1", pod_name!="", pod_name=~"$2", interface="eth0", job="kubelet"}[5m]))`, - "pod_cpu_usage_node": `sum by (node, pod) (label_join(irate(container_cpu_usage_seconds_total{job="kubelet",pod_name=~"$2", image!=""}[5m]), "pod", " ", "pod_name") * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:{node=~"$3"})`, - "pod_memory_usage_node": `sum by (node, pod) (label_join(container_memory_usage_bytes{job="kubelet",pod_name=~"$2", image!=""}, "pod", " ", "pod_name") * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:{node=~"$3"})`, - "pod_memory_usage_wo_cache_node": `sum by (node, pod) ((label_join(container_memory_usage_bytes{job="kubelet",pod_name=~"$2", image!=""}, "pod", " ", "pod_name") - label_join(container_memory_cache{job="kubelet",pod_name=~"$2", image!=""}, "pod", " ", "pod_name")) * on (namespace, pod) group_left(node) node_namespace_pod:kube_pod_info:{node=~"$3"})`, + "pod_cpu_usage_node": `sum by (node, pod_name) (irate(container_cpu_usage_seconds_total{job="kubelet",pod_name!="", pod_name=~"$2", image!=""}[5m]) * on (namespace, pod_name) group_left(node) label_join(node_namespace_pod:kube_pod_info:{node=~"$3"}, "pod_name", "", "pod", "_name"))`, + "pod_memory_usage_node": `sum by (node, pod_name) (container_memory_usage_bytes{job="kubelet",pod_name!="", pod_name=~"$2", image!=""} * on (namespace, pod_name) group_left(node) label_join(node_namespace_pod:kube_pod_info:{node=~"$3"}, "pod_name", "", "pod", "_name"))`, + "pod_memory_usage_wo_cache_node": `sum by (node, pod_name) ((container_memory_usage_bytes{job="kubelet",pod_name!="", pod_name=~"$2", image!=""} - container_memory_cache{job="kubelet",pod_name!="", pod_name=~"$2", image!=""}) * on (namespace, pod_name) group_left(node) label_join(node_namespace_pod:kube_pod_info:{node=~"$3"}, "pod_name", "", "pod", "_name"))`, + "pod_net_bytes_transmitted_node": `sum by (node, pod_name) (irate(container_network_transmit_bytes_total{pod_name!="", pod_name=~"$2", interface="eth0", job="kubelet"}[5m]) * on (pod_name) group_left(node) label_join(node_namespace_pod:kube_pod_info:{node=~"$3"}, "pod_name", "", "pod", "_name"))`, + "pod_net_bytes_received_node": `sum by (node, pod_name) (irate(container_network_receive_bytes_total{pod_name!="", pod_name=~"$2", interface="eth0", job="kubelet"}[5m]) * on (pod_name) group_left(node) label_join(node_namespace_pod:kube_pod_info:{node=~"$3"}, "pod_name", "", "pod", "_name"))`, + + // workload + "workload_pod_cpu_usage": `sum(label_replace(sum(irate(container_cpu_usage_seconds_total{job="kubelet", namespace="$2", pod_name!="", pod_name=~"$3", image!=""}[5m])) by (namespace, pod_name) and on (pod_name) label_join(kube_pod_owner{namespace="$2", owner_name!=""}, "pod_name", "", "pod", "_name"), "workload", "$1", "pod_name", "(.+)-(.+)")) by (namespace, workload)`, + "workload_pod_memory_usage": `sum(label_replace(sum(container_memory_usage_bytes{job="kubelet", namespace="$2", pod_name!="", pod_name=~"$3", image!=""}) by (namespace, pod_name) and on (pod_name) label_join(kube_pod_owner{namespace="$2", owner_name!=""}, "pod_name", "", "pod", "_name"), "workload", "$1", "pod_name", "(.+)-(.+)")) by (namespace, workload)`, + "workload_pod_memory_usage_wo_cache": `sum(label_replace(sum(container_memory_usage_bytes{job="kubelet", namespace="$2", pod_name!="", pod_name=~"$3", image!=""} - container_memory_cache{job="kubelet", namespace="$2", pod_name!="", pod_name=~"$3", image!=""}) by (namespace, pod_name) and on (pod_name) label_join(kube_pod_owner{namespace="$2", owner_name!=""}, "pod_name", "", "pod", "_name"), "workload", "$1", "pod_name", "(.+)-(.+)")) by (namespace, workload)`, + "workload_pod_net_bytes_transmitted": `sum(label_replace(sum(irate(container_network_transmit_bytes_total{namespace="$2", pod_name!="", pod_name=~"$3", interface="eth0", job="kubelet"}[5m])) by (namespace, pod_name) and on (pod_name) label_join(kube_pod_owner{namespace="$2", owner_name!=""}, "pod_name", "", "pod", "_name"), "workload", "$1", "pod_name", "(.+)-(.+)")) by (namespace, workload)`, + "workload_pod_net_bytes_received": `sum(label_replace(sum(irate(container_network_receive_bytes_total{namespace="$2", pod_name!="", pod_name=~"$3", interface="eth0", job="kubelet"}[5m])) by (namespace, pod_name) and on (pod_name) label_join(kube_pod_owner{namespace="$2", owner_name!=""}, "pod_name", "", "pod", "_name"), "workload", "$1", "pod_name", "(.+)-(.+)")) by (namespace, workload)`, // container "container_cpu_usage": `sum(irate(container_cpu_usage_seconds_total{namespace="$1", pod_name="$2", container_name="$3"}[5m])) by (namespace, pod_name, container_name)`, -- GitLab