未验证 提交 df6ed5e9 编写于 作者: H huanggze

refactor monitor module

Signed-off-by: Nhuanggze <loganhuang@yunify.com>
上级 97c9a178
......@@ -18,393 +18,186 @@
package monitoring
import (
"fmt"
"github.com/emicklei/go-restful"
"k8s.io/klog"
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/models/metrics"
"kubesphere.io/kubesphere/pkg/simple/client"
"net/url"
"strconv"
"strings"
"time"
)
func MonitorAllPodsOfSpecificNamespace(request *restful.Request, response *restful.Response) {
MonitorPod(request, response)
}
func MonitorSpecificPodOfSpecificNamespace(request *restful.Request, response *restful.Response) {
MonitorPod(request, response)
}
func MonitorAllPodsOnSpecificNode(request *restful.Request, response *restful.Response) {
MonitorPod(request, response)
}
func MonitorSpecificPodOnSpecificNode(request *restful.Request, response *restful.Response) {
MonitorPod(request, response)
}
func MonitorPod(request *restful.Request, response *restful.Response) {
requestParams := ParseMonitoringRequestParams(request)
podName := requestParams.PodName
if podName != "" {
requestParams.ResourcesFilter = fmt.Sprintf("^%s$", requestParams.PodName)
}
rawMetrics := metrics.GetPodLevelMetrics(requestParams)
// sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics)
// paging
pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount)
response.WriteAsJson(pagedMetrics)
}
func MonitorAllContainersOnSpecificNode(request *restful.Request, response *restful.Response) {
MonitorContainer(request, response)
}
func MonitorAllContainersOfSpecificNamespace(request *restful.Request, response *restful.Response) {
MonitorContainer(request, response)
}
func MonitorSpecificContainerOfSpecificNamespace(request *restful.Request, response *restful.Response) {
MonitorContainer(request, response)
}
func MonitorContainer(request *restful.Request, response *restful.Response) {
requestParams := ParseMonitoringRequestParams(request)
rawMetrics := metrics.GetContainerLevelMetrics(requestParams)
// sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics)
// paging
pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount)
response.WriteAsJson(pagedMetrics)
}
func MonitorSpecificWorkload(request *restful.Request, response *restful.Response) {
MonitorWorkload(request, response)
}
func MonitorAllWorkloadsOfSpecificKind(request *restful.Request, response *restful.Response) {
MonitorWorkload(request, response)
}
func MonitorAllWorkloadsOfSpecificNamespace(request *restful.Request, response *restful.Response) {
MonitorWorkload(request, response)
}
func MonitorWorkload(request *restful.Request, response *restful.Response) {
requestParams := ParseMonitoringRequestParams(request)
rawMetrics := metrics.GetWorkloadLevelMetrics(requestParams)
// sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics)
// paging
pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount)
response.WriteAsJson(pagedMetrics)
}
func MonitorAllWorkspaces(request *restful.Request, response *restful.Response) {
requestParams := ParseMonitoringRequestParams(request)
tp := requestParams.Tp
if tp == "statistics" {
// merge multiple metric: all-devops, all-roles, all-projects...this api is designed for admin
res := metrics.GetAllWorkspacesStatistics()
response.WriteAsJson(res)
} else {
rawMetrics := metrics.MonitorAllWorkspaces(requestParams)
// sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics)
// paging
pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount)
response.WriteAsJson(pagedMetrics)
}
}
func MonitorSpecificWorkspace(request *restful.Request, response *restful.Response) {
requestParams := ParseMonitoringRequestParams(request)
tp := requestParams.Tp
if tp == "rank" {
// multiple
rawMetrics := metrics.GetWorkspaceLevelMetrics(requestParams)
// sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics)
// paging
pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount)
response.WriteAsJson(pagedMetrics)
} else if tp == "statistics" {
wsName := requestParams.WsName
func MonitorCluster(request *restful.Request, response *restful.Response) {
r := ParseRequestParams(request)
// merge multiple metric: devops, roles, projects...
res := metrics.MonitorOneWorkspaceStatistics(wsName)
response.WriteAsJson(res)
// TODO: expose kubesphere iam and devops statistics in prometheus format
var res *metrics.Response
if r.Type == "statistics" {
res = metrics.GetClusterStatistics()
} else {
res := metrics.GetWorkspaceLevelMetrics(requestParams)
response.WriteAsJson(res)
res = metrics.GetClusterMetrics(r)
}
}
func MonitorAllNamespaces(request *restful.Request, response *restful.Response) {
MonitorNamespace(request, response)
response.WriteAsJson(res)
}
func MonitorSpecificNamespace(request *restful.Request, response *restful.Response) {
MonitorNamespace(request, response)
}
func MonitorNamespace(request *restful.Request, response *restful.Response) {
requestParams := ParseMonitoringRequestParams(request)
// multiple
rawMetrics := metrics.GetNamespaceLevelMetrics(requestParams)
// sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics)
// paging
pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount)
response.WriteAsJson(pagedMetrics)
func MonitorNode(request *restful.Request, response *restful.Response) {
r := ParseRequestParams(request)
res := metrics.GetNodeMetrics(r)
res, metricsNum := res.SortBy(r.SortMetric, r.SortType)
res = res.Page(r.PageNum, r.LimitNum, metricsNum)
response.WriteAsJson(res)
}
func MonitorCluster(request *restful.Request, response *restful.Response) {
requestParams := ParseMonitoringRequestParams(request)
metricName := requestParams.MetricsName
if metricName != "" {
prometheusClient, err := client.ClientSets().Prometheus()
if err != nil {
if _, ok := err.(client.ClientSetNotEnabledError); ok {
klog.Error("monitoring is not enabled")
return
} else {
klog.Errorf("get prometheus client failed %+v", err)
}
}
// single
queryType, params := metrics.AssembleClusterMetricRequestInfo(requestParams, metricName)
metricsStr := prometheusClient.SendMonitoringRequest(queryType, params)
res := metrics.ReformatJson(metricsStr, metricName, map[string]string{metrics.MetricLevelCluster: "local"})
func MonitorWorkspace(request *restful.Request, response *restful.Response) {
r := ParseRequestParams(request)
response.WriteAsJson(res)
// TODO: expose kubesphere iam and devops statistics in prometheus format
var res *metrics.Response
if r.Type == "statistics" && r.WorkspaceName != "" {
res = metrics.GetWorkspaceStatistics(r.WorkspaceName)
} else {
// multiple
res := metrics.GetClusterLevelMetrics(requestParams)
response.WriteAsJson(res)
res = metrics.GetWorkspaceMetrics(r)
res, metricsNum := res.SortBy(r.SortMetric, r.SortType)
res = res.Page(r.PageNum, r.LimitNum, metricsNum)
}
}
func MonitorAllNodes(request *restful.Request, response *restful.Response) {
MonitorNode(request, response)
response.WriteAsJson(res)
}
func MonitorSpecificNode(request *restful.Request, response *restful.Response) {
MonitorNode(request, response)
}
func MonitorNode(request *restful.Request, response *restful.Response) {
requestParams := ParseMonitoringRequestParams(request)
metricName := requestParams.MetricsName
if metricName != "" {
prometheusClient, err := client.ClientSets().Prometheus()
if err != nil {
if _, ok := err.(client.ClientSetNotEnabledError); ok {
klog.Error("monitoring is not enabled")
return
} else {
klog.Errorf("get prometheus client failed %+v", err)
}
}
// single
queryType, params := metrics.AssembleNodeMetricRequestInfo(requestParams, metricName)
metricsStr := prometheusClient.SendMonitoringRequest(queryType, params)
res := metrics.ReformatJson(metricsStr, metricName, map[string]string{metrics.MetricLevelNode: ""})
// The raw node-exporter result doesn't include ip address information
// Thereby, append node ip address to .data.result[].metric
nodeAddress := metrics.GetNodeAddressInfo()
metrics.AddNodeAddressMetric(res, nodeAddress)
response.WriteAsJson(res)
} else {
// multiple
rawMetrics := metrics.GetNodeLevelMetrics(requestParams)
nodeAddress := metrics.GetNodeAddressInfo()
for i := 0; i < len(rawMetrics.Results); i++ {
metrics.AddNodeAddressMetric(&rawMetrics.Results[i], nodeAddress)
}
// sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics)
// paging
pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount)
response.WriteAsJson(pagedMetrics)
}
func MonitorNamespace(request *restful.Request, response *restful.Response) {
r := ParseRequestParams(request)
res := metrics.GetNamespaceMetrics(r)
res, metricsNum := res.SortBy(r.SortMetric, r.SortType)
res = res.Page(r.PageNum, r.LimitNum, metricsNum)
response.WriteAsJson(res)
}
func MonitorAllPVCsOfSpecificNamespace(request *restful.Request, response *restful.Response) {
MonitorPVC(request, response)
func MonitorWorkload(request *restful.Request, response *restful.Response) {
r := ParseRequestParams(request)
res := metrics.GetWorkloadMetrics(r)
res, metricsNum := res.SortBy(r.SortMetric, r.SortType)
res = res.Page(r.PageNum, r.LimitNum, metricsNum)
response.WriteAsJson(res)
}
func MonitorAllPVCsOfSpecificStorageClass(request *restful.Request, response *restful.Response) {
MonitorPVC(request, response)
func MonitorPod(request *restful.Request, response *restful.Response) {
r := ParseRequestParams(request)
res := metrics.GetPodMetrics(r)
res, metricsNum := res.SortBy(r.SortMetric, r.SortType)
res = res.Page(r.PageNum, r.LimitNum, metricsNum)
response.WriteAsJson(res)
}
func MonitorSpecificPVCofSpecificNamespace(request *restful.Request, response *restful.Response) {
MonitorPVC(request, response)
func MonitorContainer(request *restful.Request, response *restful.Response) {
r := ParseRequestParams(request)
res := metrics.GetContainerMetrics(r)
res, metricsNum := res.SortBy(r.SortMetric, r.SortType)
res = res.Page(r.PageNum, r.LimitNum, metricsNum)
response.WriteAsJson(res)
}
func MonitorPVC(request *restful.Request, response *restful.Response) {
requestParams := ParseMonitoringRequestParams(request)
pvcName := requestParams.PVCName
if pvcName != "" {
requestParams.ResourcesFilter = fmt.Sprintf("^%s$", requestParams.PVCName)
}
rawMetrics := metrics.GetPVCLevelMetrics(requestParams)
// sorting
sortedMetrics, maxMetricCount := metrics.Sort(requestParams.SortMetricName, requestParams.SortType, rawMetrics)
// paging
pagedMetrics := metrics.Page(requestParams.PageNum, requestParams.LimitNum, sortedMetrics, maxMetricCount)
response.WriteAsJson(pagedMetrics)
r := ParseRequestParams(request)
res := metrics.GetPVCMetrics(r)
res, metricsNum := res.SortBy(r.SortMetric, r.SortType)
res = res.Page(r.PageNum, r.LimitNum, metricsNum)
response.WriteAsJson(res)
}
func MonitorComponent(request *restful.Request, response *restful.Response) {
requestParams := ParseMonitoringRequestParams(request)
if requestParams.MetricsFilter == "" {
requestParams.MetricsFilter = requestParams.ComponentName + "_.*"
}
rawMetrics := metrics.GetComponentLevelMetrics(requestParams)
response.WriteAsJson(rawMetrics)
r := ParseRequestParams(request)
res := metrics.GetComponentMetrics(r)
response.WriteAsJson(res)
}
func ParseMonitoringRequestParams(request *restful.Request) *metrics.MonitoringRequestParams {
instantTime := strings.Trim(request.QueryParameter("time"), " ")
func ParseRequestParams(request *restful.Request) metrics.RequestParams {
var requestParams metrics.RequestParams
queryTime := strings.Trim(request.QueryParameter("time"), " ")
start := strings.Trim(request.QueryParameter("start"), " ")
end := strings.Trim(request.QueryParameter("end"), " ")
step := strings.Trim(request.QueryParameter("step"), " ")
timeout := strings.Trim(request.QueryParameter("timeout"), " ")
sortMetricName := strings.Trim(request.QueryParameter("sort_metric"), " ")
sortMetric := strings.Trim(request.QueryParameter("sort_metric"), " ")
sortType := strings.Trim(request.QueryParameter("sort_type"), " ")
pageNum := strings.Trim(request.QueryParameter("page"), " ")
limitNum := strings.Trim(request.QueryParameter("limit"), " ")
tp := strings.Trim(request.QueryParameter("type"), " ")
metricsFilter := strings.Trim(request.QueryParameter("metrics_filter"), " ")
resourcesFilter := strings.Trim(request.QueryParameter("resources_filter"), " ")
metricsName := strings.Trim(request.QueryParameter("metrics_name"), " ")
nodeName := strings.Trim(request.PathParameter("node"), " ")
workspaceName := strings.Trim(request.PathParameter("workspace"), " ")
namespaceName := strings.Trim(request.PathParameter("namespace"), " ")
workloadKind := strings.Trim(request.PathParameter("kind"), " ")
workloadName := strings.Trim(request.PathParameter("workload"), " ")
nodeId := strings.Trim(request.PathParameter("node"), " ")
wsName := strings.Trim(request.PathParameter("workspace"), " ")
nsName := strings.Trim(request.PathParameter("namespace"), " ")
podName := strings.Trim(request.PathParameter("pod"), " ")
containerName := strings.Trim(request.PathParameter("container"), " ")
pvcName := strings.Trim(request.PathParameter("pvc"), " ")
storageClassName := strings.Trim(request.PathParameter("storageclass"), " ")
containerName := strings.Trim(request.PathParameter("container"), " ")
workloadKind := strings.Trim(request.PathParameter("kind"), " ")
componentName := strings.Trim(request.PathParameter("component"), " ")
var requestParams = metrics.MonitoringRequestParams{
SortMetricName: sortMetricName,
requestParams = metrics.RequestParams{
SortMetric: sortMetric,
SortType: sortType,
PageNum: pageNum,
LimitNum: limitNum,
Tp: tp,
Type: tp,
MetricsFilter: metricsFilter,
ResourcesFilter: resourcesFilter,
MetricsName: metricsName,
NodeName: nodeName,
WorkspaceName: workspaceName,
NamespaceName: namespaceName,
WorkloadKind: workloadKind,
WorkloadName: workloadName,
NodeId: nodeId,
WsName: wsName,
NsName: nsName,
PodName: podName,
ContainerName: containerName,
PVCName: pvcName,
StorageClassName: storageClassName,
ContainerName: containerName,
WorkloadKind: workloadKind,
ComponentName: componentName,
}
if timeout == "" {
timeout = metrics.DefaultQueryTimeout
if metricsFilter == "" {
requestParams.MetricsFilter = ".*"
}
if step == "" {
step = metrics.DefaultQueryStep
if resourcesFilter == "" {
requestParams.ResourcesFilter = ".*"
}
// Whether query or query_range request
u := url.Values{}
if start != "" && end != "" {
v := url.Values{}
u.Set("start", convertTimeGranularity(start))
u.Set("end", convertTimeGranularity(end))
u.Set("step", step)
u.Set("timeout", timeout)
if start != "" && end != "" { // range query
// range query start time must be greater than the namespace creation time
if nsName != "" {
// metrics from a deleted namespace should be hidden
// therefore, for range query, if range query start time is less than the namespace creation time, set it to creation time
// it is the same with query at a fixed time point
if namespaceName != "" {
nsLister := informers.SharedInformerFactory().Core().V1().Namespaces().Lister()
ns, err := nsLister.Get(nsName)
ns, err := nsLister.Get(namespaceName)
if err == nil {
queryStartTime := u.Get("start")
nsCreationTime := strconv.FormatInt(ns.CreationTimestamp.Unix(), 10)
if nsCreationTime > queryStartTime {
u.Set("start", nsCreationTime)
creationTime := ns.CreationTimestamp.Time.Unix()
queryStart, err := strconv.ParseInt(start, 10, 64)
if err == nil && queryStart < creationTime {
start = strconv.FormatInt(creationTime, 10)
}
}
}
requestParams.QueryType = metrics.RangeQueryType
requestParams.Params = u
v.Set("start", start)
v.Set("end", end)
return &requestParams
}
if instantTime != "" {
u.Set("time", instantTime)
u.Set("timeout", timeout)
requestParams.QueryType = metrics.DefaultQueryType
requestParams.Params = u
return &requestParams
} else {
u.Set("timeout", timeout)
requestParams.QueryType = metrics.DefaultQueryType
requestParams.Params = u
return &requestParams
}
}
if step == "" {
v.Set("step", metrics.DefaultQueryStep)
} else {
v.Set("step", step)
}
requestParams.QueryParams = v
requestParams.QueryType = metrics.RangeQuery
func convertTimeGranularity(ts string) string {
timeFloat, err := strconv.ParseFloat(ts, 64)
if err != nil {
klog.Errorf("convert second timestamp %s to minute timestamp failed", ts)
return strconv.FormatInt(int64(time.Now().Unix()), 10)
return requestParams
} else if queryTime != "" { // query
v.Set("time", queryTime)
}
timeInt := int64(timeFloat)
// convert second timestamp to minute timestamp
secondTime := time.Unix(timeInt, 0).Truncate(time.Minute).Unix()
return strconv.FormatInt(secondTime, 10)
requestParams.QueryParams = v
requestParams.QueryType = metrics.Query
return requestParams
}
/*
Copyright 2019 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
const (
MonitorLevelCluster = "cluster"
MonitorLevelNode = "node"
MonitorLevelWorkspace = "workspace"
MonitorLevelNamespace = "namespace"
MonitorLevelPod = "pod"
MonitorLevelContainer = "container"
MonitorLevelPVC = "pvc"
MonitorLevelWorkload = "workload"
MonitorLevelComponent = "component"
ChannelMaxCapacity = 100
// prometheus query type
RangeQuery = "query_range"
Query = "query"
DefaultQueryStep = "10m"
StatefulSet = "StatefulSet"
DaemonSet = "DaemonSet"
Deployment = "Deployment"
)
此差异已折叠。
此差异已折叠。
/*
Copyright 2018 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"strings"
)
// resources_filter = xxxx|xxxx
func MakeWorkloadPromQL(metricName, nsName, resources_filter, wkKind string) string {
switch wkKind {
case "deployment":
wkKind = Deployment
case "daemonset":
wkKind = DaemonSet
case "statefulset":
wkKind = StatefulSet
}
if wkKind == "" {
resources_filter = Any
} else if resources_filter == "" {
if strings.Contains(metricName, "pod") {
resources_filter = wkKind + ":" + Any
} else if strings.Contains(metricName, strings.ToLower(wkKind)) {
resources_filter = Any
}
} else {
var prefix string
// The "workload_{deployment,statefulset,daemonset}_xxx" metric uses "deployment","statefulset" or "daemonset" label selectors
// which match exactly a workload name
// eg. kube_daemonset_status_number_unavailable{daemonset=~"^xxx$"}
if strings.Contains(metricName, "deployment") || strings.Contains(metricName, "daemonset") || strings.Contains(metricName, "statefulset") {
// to pass "resources_filter" to PromQL, we reformat it
prefix = ""
} else {
// While workload_{cpu,memory,net}_xxx metrics uses "workload"
// eg. namespace:workload_cpu_usage:sum{workload="Deployment:xxx"}
prefix = wkKind + ":"
}
filters := strings.Split(resources_filter, "|")
// reshape it to match PromQL re2 syntax
resources_filter = ""
for i, filter := range filters {
resources_filter += "^" + prefix + filter + "$" // eg. ^Deployment:xxx$
if i != len(filters)-1 {
resources_filter += "|"
}
}
}
var promql = RulePromQLTmplMap[metricName]
promql = strings.Replace(promql, "$2", nsName, -1)
promql = strings.Replace(promql, "$3", resources_filter, -1)
return promql
}
func MakeSpecificWorkloadRule(wkKind, wkName, namespace string) string {
var rule = PodInfoRule
if namespace == "" {
namespace = ".*"
}
// alertnatives values: Deployment StatefulSet ReplicaSet DaemonSet
wkKind = strings.ToLower(wkKind)
switch wkKind {
case "deployment":
wkKind = ReplicaSet
if wkName != "" {
wkName = "~\"^" + wkName + `-(\\w)+$"`
} else {
wkName = "~\".*\""
}
rule = strings.Replace(rule, "$1", wkKind, -1)
rule = strings.Replace(rule, "$2", wkName, -1)
rule = strings.Replace(rule, "$3", namespace, -1)
return rule
case "replicaset":
wkKind = ReplicaSet
case "statefulset":
wkKind = StatefulSet
case "daemonset":
wkKind = DaemonSet
}
if wkName == "" {
wkName = "~\".*\""
} else {
wkName = "\"" + wkName + "\""
}
rule = strings.Replace(rule, "$1", wkKind, -1)
rule = strings.Replace(rule, "$2", wkName, -1)
rule = strings.Replace(rule, "$3", namespace, -1)
return rule
}
func MakeAllWorkspacesPromQL(metricsName, nsFilter string) string {
var promql = RulePromQLTmplMap[metricsName]
nsFilter = "!~\"" + nsFilter + "\""
promql = strings.Replace(promql, "$1", nsFilter, -1)
return promql
}
func MakeSpecificWorkspacePromQL(metricsName, nsFilter string, workspace string) string {
var promql = RulePromQLTmplMap[metricsName]
nsFilter = "=~\"" + nsFilter + "\""
workspace = "=~\"^(" + workspace + ")$\""
promql = strings.Replace(promql, "$1", nsFilter, -1)
promql = strings.Replace(promql, "$2", workspace, -1)
return promql
}
func MakeContainerPromQL(nsName, nodeId, podName, containerName, metricName, containerFilter string) string {
var promql string
if nsName != "" {
// get container metrics from namespace-pod
promql = RulePromQLTmplMap[metricName]
promql = strings.Replace(promql, "$1", nsName, -1)
} else {
// get container metrics from node-pod
promql = RulePromQLTmplMap[metricName+"_node"]
promql = strings.Replace(promql, "$1", nodeId, -1)
}
promql = strings.Replace(promql, "$2", podName, -1)
if containerName == "" {
if containerFilter == "" {
containerFilter = ".*"
}
promql = strings.Replace(promql, "$3", containerFilter, -1)
} else {
promql = strings.Replace(promql, "$3", containerName, -1)
}
return promql
}
func MakePodPromQL(metricName, nsName, nodeID, podName, podFilter string) string {
if podFilter == "" {
podFilter = ".*"
}
var promql = ""
if nsName != "" {
// get pod metrics by namespace
if podName != "" {
// specific pod
promql = RulePromQLTmplMap[metricName]
promql = strings.Replace(promql, "$1", nsName, -1)
promql = strings.Replace(promql, "$2", podName, -1)
} else {
// all pods
metricName += "_all"
promql = RulePromQLTmplMap[metricName]
promql = strings.Replace(promql, "$1", nsName, -1)
promql = strings.Replace(promql, "$2", podFilter, -1)
}
} else if nodeID != "" {
// get pod metrics by nodeid
metricName += "_node"
promql = RulePromQLTmplMap[metricName]
promql = strings.Replace(promql, "$3", nodeID, -1)
if podName != "" {
// specific pod
promql = strings.Replace(promql, "$2", podName, -1)
} else {
promql = strings.Replace(promql, "$2", podFilter, -1)
}
}
return promql
}
func MakePVCPromQL(metricName, nsName, pvcName, scName, pvcFilter string) string {
if pvcFilter == "" {
pvcFilter = ".*"
}
var promql = ""
if nsName != "" {
// get pvc metrics by namespace
if pvcName != "" {
// specific pvc
promql = RulePromQLTmplMap[metricName]
promql = strings.Replace(promql, "$1", nsName, -1)
promql = strings.Replace(promql, "$2", pvcName, -1)
} else {
// all pvc in a specific namespace
metricName += "_ns"
promql = RulePromQLTmplMap[metricName]
promql = strings.Replace(promql, "$1", nsName, -1)
promql = strings.Replace(promql, "$2", pvcFilter, -1)
}
} else {
if scName != "" {
// all pvc in a specific storageclass
metricName += "_sc"
promql = RulePromQLTmplMap[metricName]
promql = strings.Replace(promql, "$1", scName, -1)
}
}
return promql
}
func MakeNamespacePromQL(nsName string, nsFilter string, metricsName string) string {
var recordingRule = RulePromQLTmplMap[metricsName]
if nsName != "" {
nsFilter = nsName
} else {
if nsFilter == "" {
nsFilter = ".*"
}
}
recordingRule = strings.Replace(recordingRule, "$1", nsFilter, -1)
return recordingRule
}
// cluster rule
func MakeClusterRule(metricsName string) string {
var rule = RulePromQLTmplMap[metricsName]
return rule
}
// node rule
func MakeNodeRule(nodeID string, nodesFilter string, metricsName string) string {
var rule = RulePromQLTmplMap[metricsName]
if nodesFilter == "" {
nodesFilter = ".*"
}
if strings.Contains(metricsName, "disk_size") || strings.Contains(metricsName, "pod") || strings.Contains(metricsName, "usage") || strings.Contains(metricsName, "inode") || strings.Contains(metricsName, "load") {
// disk size promql
if nodeID != "" {
nodesFilter = "{" + "node" + "=" + "\"" + nodeID + "\"" + "}"
} else {
nodesFilter = "{" + "node" + "=~" + "\"" + nodesFilter + "\"" + "}"
}
rule = strings.Replace(rule, "$1", nodesFilter, -1)
} else {
// cpu, memory, network, disk_iops rules
if nodeID != "" {
// specific node
rule = rule + "{" + "node" + "=" + "\"" + nodeID + "\"" + "}"
} else {
// all nodes or specific nodes filted with re2 syntax
rule = rule + "{" + "node" + "=~" + "\"" + nodesFilter + "\"" + "}"
}
}
return rule
}
func MakeComponentRule(metricsName string) string {
var rule = RulePromQLTmplMap[metricsName]
return rule
}
此差异已折叠。
......@@ -32,29 +32,25 @@ func GetNamespacesWithMetrics(namespaces []*v1.Namespace) []*v1.Namespace {
nsFilter := "^(" + strings.Join(nsNameList, "|") + ")$"
var timeRelateParams = make(url.Values)
params := MonitoringRequestParams{
params := RequestParams{
ResourcesFilter: nsFilter,
Params: timeRelateParams,
QueryType: DefaultQueryType,
QueryParams: timeRelateParams,
QueryType: Query,
MetricsFilter: "namespace_cpu_usage|namespace_memory_usage_wo_cache|namespace_pod_count",
}
rawMetrics := GetNamespaceLevelMetrics(&params)
rawMetrics := GetNamespaceMetrics(params)
for _, result := range rawMetrics.Results {
for _, data := range result.Data.Result {
metricDescMap, ok := data[ResultItemMetric].(map[string]interface{})
if ok {
if ns, exist := metricDescMap[ResultItemMetricResourceName]; exist {
timeAndValue, ok := data[ResultItemValue].([]interface{})
if ok && len(timeAndValue) == 2 {
for i := 0; i < len(namespaces); i++ {
if namespaces[i].Name == ns {
if namespaces[i].Annotations == nil {
namespaces[i].Annotations = make(map[string]string, 0)
}
namespaces[i].Annotations[result.MetricName] = timeAndValue[1].(string)
if ns, exist := data.Metric["namespace"]; exist {
if len(data.Value) == 2 {
for i := 0; i < len(namespaces); i++ {
if namespaces[i].Name == ns {
if namespaces[i].Annotations == nil {
namespaces[i].Annotations = make(map[string]string, 0)
}
namespaces[i].Annotations[result.MetricName] = data.Value[1].(string)
}
}
}
......
package metrics
/*
Copyright 2019 The KubeSphere Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
import "net/url"
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
const (
DefaultQueryStep = "10m"
DefaultQueryTimeout = "10s"
RangeQueryType = "query_range?"
DefaultQueryType = "query?"
*/
package metrics
import (
"kubesphere.io/kubesphere/pkg/simple/client/prometheus"
"net/url"
)
type MonitoringRequestParams struct {
Params url.Values
type RequestParams struct {
QueryParams url.Values
QueryType string
SortMetricName string
SortMetric string
SortType string
PageNum string
LimitNum string
Tp string
Type string
MetricsFilter string
ResourcesFilter string
MetricsName string
NodeName string
WorkspaceName string
NamespaceName string
WorkloadKind string
WorkloadName string
NodeId string
WsName string
NsName string
PodName string
ContainerName string
PVCName string
StorageClassName string
ContainerName string
WorkloadKind string
ComponentName string
}
type APIResponse struct {
MetricName string `json:"metric_name,omitempty" description:"metric name, eg. scheduler_up_sum"`
prometheus.APIResponse
}
type Response struct {
MetricsLevel string `json:"metrics_level" description:"metric level, eg. cluster"`
Results []APIResponse `json:"results" description:"actual array of results"`
CurrentPage int `json:"page,omitempty" description:"current page returned"`
TotalPage int `json:"total_page,omitempty" description:"total number of pages"`
TotalItem int `json:"total_item,omitempty" description:"page size"`
}
......@@ -19,23 +19,33 @@
package metrics
import (
"k8s.io/klog"
"k8s.io/apimachinery/pkg/labels"
"kubesphere.io/kubesphere/pkg/informers"
"kubesphere.io/kubesphere/pkg/simple/client/prometheus"
"math"
"sort"
"strconv"
"unicode"
"runtime/debug"
"github.com/golang/glog"
)
const (
DefaultPageLimit = 5
DefaultPage = 1
ResultTypeVector = "vector"
ResultTypeMatrix = "matrix"
MetricStatusSuccess = "success"
ResultItemMetricResourceName = "resource_name"
ResultSortTypeDesc = "desc"
ResultSortTypeAsc = "asc"
)
type FormatedMetricDataWrapper struct {
fmtMetricData FormatedMetricData
by func(p, q *map[string]interface{}) bool
fmtMetricData prometheus.QueryResult
by func(p, q *prometheus.QueryValue) bool
}
func (wrapper FormatedMetricDataWrapper) Len() int {
......@@ -51,10 +61,10 @@ func (wrapper FormatedMetricDataWrapper) Swap(i, j int) {
}
// sorted metric by ascending or descending order
func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetric) (*FormatedLevelMetric, int) {
func (rawMetrics *Response) SortBy(sortMetricName string, sortType string) (*Response, int) {
defer func() {
if err := recover(); err != nil {
klog.Errorln(err)
glog.Errorln(err)
debug.PrintStack()
}
}()
......@@ -82,31 +92,31 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri
if metricItem.MetricName == sortMetricName {
if sortType == ResultSortTypeAsc {
// asc
sort.Sort(FormatedMetricDataWrapper{metricItem.Data, func(p, q *map[string]interface{}) bool {
value1 := (*p)[ResultItemValue].([]interface{})
value2 := (*q)[ResultItemValue].([]interface{})
sort.Sort(FormatedMetricDataWrapper{metricItem.Data, func(p, q *prometheus.QueryValue) bool {
value1 := p.Value
value2 := q.Value
v1, _ := strconv.ParseFloat(value1[len(value1)-1].(string), 64)
v2, _ := strconv.ParseFloat(value2[len(value2)-1].(string), 64)
if v1 == v2 {
resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
return resourceName1.(string) < resourceName2.(string)
resourceName1 := p.Metric[ResultItemMetricResourceName]
resourceName2 := q.Metric[ResultItemMetricResourceName]
return resourceName1 < resourceName2
}
return v1 < v2
}})
} else {
// desc
sort.Sort(FormatedMetricDataWrapper{metricItem.Data, func(p, q *map[string]interface{}) bool {
value1 := (*p)[ResultItemValue].([]interface{})
value2 := (*q)[ResultItemValue].([]interface{})
sort.Sort(FormatedMetricDataWrapper{metricItem.Data, func(p, q *prometheus.QueryValue) bool {
value1 := p.Value
value2 := q.Value
v1, _ := strconv.ParseFloat(value1[len(value1)-1].(string), 64)
v2, _ := strconv.ParseFloat(value2[len(value2)-1].(string), 64)
if v1 == v2 {
resourceName1 := (*p)[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
resourceName2 := (*q)[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
return resourceName1.(string) > resourceName2.(string)
resourceName1 := p.Metric[ResultItemMetricResourceName]
resourceName2 := q.Metric[ResultItemMetricResourceName]
return resourceName1 > resourceName2
}
return v1 > v2
......@@ -116,10 +126,10 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri
for _, r := range metricItem.Data.Result {
// record the ordering of resource_name to indexMap
// example: {"metric":{ResultItemMetricResourceName: "Deployment:xxx"},"value":[1541142931.731,"3"]}
resourceName, exist := r[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
resourceName, exist := r.Metric[ResultItemMetricResourceName]
if exist {
if _, exist := indexMap[resourceName.(string)]; !exist {
indexMap[resourceName.(string)] = i
if _, exist := indexMap[resourceName]; !exist {
indexMap[resourceName] = i
i = i + 1
}
}
......@@ -128,9 +138,9 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri
// iterator all metric to find max metricItems length
for _, r := range metricItem.Data.Result {
k, ok := r[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
k, ok := r.Metric[ResultItemMetricResourceName]
if ok {
currentResourceMap[k.(string)] = 1
currentResourceMap[k] = 1
}
}
......@@ -154,12 +164,12 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri
for i := 0; i < len(rawMetrics.Results); i++ {
re := rawMetrics.Results[i]
if re.Data.ResultType == ResultTypeVector && re.Status == MetricStatusSuccess {
sortedMetric := make([]map[string]interface{}, len(indexMap))
sortedMetric := make([]prometheus.QueryValue, len(indexMap))
for j := 0; j < len(re.Data.Result); j++ {
r := re.Data.Result[j]
k, exist := r[ResultItemMetric].(map[string]interface{})[ResultItemMetricResourceName]
k, exist := r.Metric[ResultItemMetricResourceName]
if exist {
index, exist := indexMap[k.(string)]
index, exist := indexMap[k]
if exist {
sortedMetric[index] = r
}
......@@ -173,7 +183,7 @@ func Sort(sortMetricName string, sortType string, rawMetrics *FormatedLevelMetri
return rawMetrics, len(indexMap)
}
func Page(pageNum string, limitNum string, fmtLevelMetric *FormatedLevelMetric, maxLength int) interface{} {
func (fmtLevelMetric *Response) Page(pageNum string, limitNum string, maxLength int) *Response {
if maxLength <= 0 {
return fmtLevelMetric
}
......@@ -190,7 +200,7 @@ func Page(pageNum string, limitNum string, fmtLevelMetric *FormatedLevelMetric,
if pageNum != "" {
p, err := strconv.Atoi(pageNum)
if err != nil {
klog.Errorln(err)
glog.Errorln(err)
} else {
if p > 0 {
page = p
......@@ -206,7 +216,7 @@ func Page(pageNum string, limitNum string, fmtLevelMetric *FormatedLevelMetric,
if limitNum != "" {
l, err := strconv.Atoi(limitNum)
if err != nil {
klog.Errorln(err)
glog.Errorln(err)
} else {
if l > 0 {
limit = l
......@@ -245,72 +255,40 @@ func Page(pageNum string, limitNum string, fmtLevelMetric *FormatedLevelMetric,
return fmtLevelMetric
}
// maybe this function is time consuming
// The metric param is the result from Prometheus HTTP query
func ReformatJson(metric string, metricsName string, needAddParams map[string]string, needDelParams ...string) *FormatedMetric {
var formatMetric FormatedMetric
err := jsonIter.Unmarshal([]byte(metric), &formatMetric)
func getNodeAddressAndRole(nodeName string) (string, string) {
nodeLister := informers.SharedInformerFactory().Core().V1().Nodes().Lister()
node, err := nodeLister.Get(nodeName)
if err != nil {
klog.Errorln("Unmarshal metric json failed", err.Error(), metric)
return "", ""
}
if formatMetric.MetricName == "" {
if metricsName != "" {
formatMetric.MetricName = metricsName
}
}
// retrive metrics success
if formatMetric.Status == MetricStatusSuccess {
result := formatMetric.Data.Result
for _, res := range result {
metric, exist := res[ResultItemMetric]
// Prometheus query result format: .data.result[].metric
// metricMap is the value of .data.result[].metric
metricMap, sure := metric.(map[string]interface{})
if exist && sure {
delete(metricMap, "__name__")
}
if len(needDelParams) > 0 {
for _, p := range needDelParams {
delete(metricMap, p)
}
}
if needAddParams != nil && len(needAddParams) > 0 {
for n := range needAddParams {
if v, ok := metricMap[n]; ok {
delete(metricMap, n)
metricMap[ResultItemMetricResourceName] = v
} else {
metricMap[ResultItemMetricResourceName] = needAddParams[n]
}
}
}
var addr string
for _, address := range node.Status.Addresses {
if address.Type == "InternalIP" {
addr = address.Address
break
}
}
return &formatMetric
role := "node"
_, exists := node.Labels["node-role.kubernetes.io/master"]
if exists {
role = "master"
}
return addr, role
}
func ReformatNodeStatusField(nodeMetric *FormatedMetric) *FormatedMetric {
metricCount := len(nodeMetric.Data.Result)
for i := 0; i < metricCount; i++ {
metric, exist := nodeMetric.Data.Result[i][ResultItemMetric]
if exist {
status, exist := metric.(map[string]interface{})[MetricStatus]
if exist {
status = UpperFirstLetter(status.(string))
metric.(map[string]interface{})[MetricStatus] = status
func getNodeName(nodeIp string) string {
nodeLister := informers.SharedInformerFactory().Core().V1().Nodes().Lister()
nodes, _ := nodeLister.List(labels.Everything())
for _, node := range nodes {
for _, address := range node.Status.Addresses {
if address.Type == "InternalIP" && address.Address == nodeIp {
return node.Name
}
}
}
return nodeMetric
}
func UpperFirstLetter(str string) string {
for i, ch := range str {
return string(unicode.ToUpper(ch)) + str[i+1:]
}
return ""
}
......@@ -168,11 +168,11 @@ func DeleteWorkspaceRoleBinding(workspace, username string, role string) error {
return err
}
func GetDevOpsProjects(workspaceName string) ([]string, error) {
func GetDevOpsProjectsCount(workspaceName string) (int, error) {
dbconn, err := clientset.ClientSets().MySQL()
if err != nil {
return nil, err
return 0, err
}
query := dbconn.Select(devops.DevOpsProjectIdColumn).
......@@ -183,9 +183,9 @@ func GetDevOpsProjects(workspaceName string) ([]string, error) {
devOpsProjects := make([]string, 0)
if _, err := query.Load(&devOpsProjects); err != nil {
return nil, err
return 0, err
}
return devOpsProjects, nil
return len(devOpsProjects), nil
}
func WorkspaceUserCount(workspace string) (int, error) {
......@@ -196,24 +196,24 @@ func WorkspaceUserCount(workspace string) (int, error) {
return count, nil
}
func GetOrgRoles(name string) ([]string, error) {
return constants.WorkSpaceRoles, nil
func GetOrgRolesCount(name string) (int, error) {
return len(constants.WorkSpaceRoles), nil
}
func WorkspaceNamespaces(workspaceName string) ([]string, error) {
func WorkspaceNamespaceCount(workspaceName string) (int, error) {
ns, err := Namespaces(workspaceName)
namespaces := make([]string, 0)
if err != nil {
return namespaces, err
return 0, err
}
for i := 0; i < len(ns); i++ {
namespaces = append(namespaces, ns[i].Name)
}
return namespaces, nil
return len(namespaces), nil
}
func WorkspaceCount() (int, error) {
......
......@@ -18,12 +18,36 @@
package prometheus
import (
"fmt"
jsoniter "github.com/json-iterator/go"
"io/ioutil"
"k8s.io/klog"
"net/http"
"time"
)
// Prometheus query api response
type APIResponse struct {
Status string `json:"status" description:"result status, one of error, success"`
Data QueryResult `json:"data" description:"actual metric result"`
ErrorType string `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
Warnings []string `json:"warnings,omitempty"`
}
// QueryResult includes result data from a query.
type QueryResult struct {
ResultType string `json:"resultType" description:"result type, one of matrix, vector"`
Result []QueryValue `json:"result" description:"metric data including labels, time series and values"`
}
// Time Series
type QueryValue struct {
Metric map[string]string `json:"metric,omitempty" description:"time series labels"`
Value []interface{} `json:"value,omitempty" description:"time series, values of vector type"`
Values [][]interface{} `json:"values,omitempty" description:"time series, values of matrix type"`
}
type PrometheusClient struct {
client *http.Client
endpoint string
......@@ -40,28 +64,40 @@ func NewPrometheusClient(options *PrometheusOptions) (*PrometheusClient, error)
}, nil
}
func (c *PrometheusClient) SendMonitoringRequest(queryType string, params string) string {
return c.sendMonitoringRequest(c.endpoint, queryType, params)
func (c *PrometheusClient) QueryToK8SPrometheus(queryType string, params string) (apiResponse APIResponse) {
return c.query(c.endpoint, queryType, params)
}
func (c *PrometheusClient) SendSecondaryMonitoringRequest(queryType string, params string) string {
return c.sendMonitoringRequest(c.secondaryEndpoint, queryType, params)
func (c *PrometheusClient) QueryToK8SSystemPrometheus(queryType string, params string) (apiResponse APIResponse) {
return c.query(c.secondaryEndpoint, queryType, params)
}
func (c *PrometheusClient) sendMonitoringRequest(endpoint string, queryType string, params string) string {
epurl := endpoint + queryType + params
response, err := c.client.Get(epurl)
var jsonIter = jsoniter.ConfigCompatibleWithStandardLibrary
func (c *PrometheusClient) query(endpoint string, queryType string, params string) (apiResponse APIResponse) {
url := fmt.Sprintf("%s/api/v1/%s?%s", endpoint, queryType, params)
response, err := c.client.Get(url)
if err != nil {
klog.Error(err)
} else {
defer response.Body.Close()
apiResponse.Status = "error"
return apiResponse
}
defer response.Body.Close()
contents, err := ioutil.ReadAll(response.Body)
body, err := ioutil.ReadAll(response.Body)
if err != nil {
klog.Error(err)
apiResponse.Status = "error"
return apiResponse
}
if err != nil {
klog.Error(err)
}
return string(contents)
err = jsonIter.Unmarshal(body, &apiResponse)
if err != nil {
klog.Errorf("fail to unmarshal prometheus query result: %s", err.Error())
apiResponse.Status = "error"
return apiResponse
}
return ""
return apiResponse
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册