未验证 提交 4fd6e3e9 编写于 作者: K KubeSphere CI Bot 提交者: GitHub

Merge pull request #3173 from Ma-Dan/master

Add metrics-server client to monitoring
...@@ -37,6 +37,7 @@ import ( ...@@ -37,6 +37,7 @@ import (
eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events/elasticsearch" eventsclient "kubesphere.io/kubesphere/pkg/simple/client/events/elasticsearch"
"kubesphere.io/kubesphere/pkg/simple/client/k8s" "kubesphere.io/kubesphere/pkg/simple/client/k8s"
esclient "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch" esclient "kubesphere.io/kubesphere/pkg/simple/client/logging/elasticsearch"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring/metricsserver"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring/prometheus" "kubesphere.io/kubesphere/pkg/simple/client/monitoring/prometheus"
"kubesphere.io/kubesphere/pkg/simple/client/openpitrix" "kubesphere.io/kubesphere/pkg/simple/client/openpitrix"
"kubesphere.io/kubesphere/pkg/simple/client/s3" "kubesphere.io/kubesphere/pkg/simple/client/s3"
...@@ -124,6 +125,8 @@ func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIS ...@@ -124,6 +125,8 @@ func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIS
apiServer.MonitoringClient = monitoringClient apiServer.MonitoringClient = monitoringClient
} }
apiServer.MetricsClient = metricsserver.NewMetricsClient(kubernetesClient.Kubernetes(), s.KubernetesOptions)
if s.LoggingOptions.Host != "" { if s.LoggingOptions.Host != "" {
loggingClient, err := esclient.NewClient(s.LoggingOptions) loggingClient, err := esclient.NewClient(s.LoggingOptions)
if err != nil { if err != nil {
......
...@@ -132,6 +132,8 @@ type APIServer struct { ...@@ -132,6 +132,8 @@ type APIServer struct {
// monitoring client set // monitoring client set
MonitoringClient monitoring.Interface MonitoringClient monitoring.Interface
MetricsClient monitoring.Interface
// //
OpenpitrixClient openpitrix.Client OpenpitrixClient openpitrix.Client
...@@ -212,7 +214,7 @@ func (s *APIServer) installKubeSphereAPIs() { ...@@ -212,7 +214,7 @@ func (s *APIServer) installKubeSphereAPIs() {
urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config)) urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache)) urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.InformerFactory, s.OpenpitrixClient)) urlruntime.Must(monitoringv1alpha3.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.MonitoringClient, s.MetricsClient, s.InformerFactory, s.OpenpitrixClient))
urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.OpenpitrixClient)) urlruntime.Must(openpitrixv1.AddToContainer(s.container, s.InformerFactory, s.OpenpitrixClient))
urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes())) urlruntime.Must(operationsv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes()))
urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory, urlruntime.Must(resourcesv1alpha2.AddToContainer(s.container, s.KubernetesClient.Kubernetes(), s.InformerFactory,
......
...@@ -35,8 +35,8 @@ type handler struct { ...@@ -35,8 +35,8 @@ type handler struct {
mo model.MonitoringOperator mo model.MonitoringOperator
} }
func newHandler(k kubernetes.Interface, m monitoring.Interface, f informers.InformerFactory, o openpitrix.Client) *handler { func newHandler(k kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, f informers.InformerFactory, o openpitrix.Client) *handler {
return &handler{k, model.NewMonitoringOperator(m, k, f, o)} return &handler{k, model.NewMonitoringOperator(monitoringClient, metricsClient, k, f, o)}
} }
func (h handler) handleKubeSphereMetricsQuery(req *restful.Request, resp *restful.Response) { func (h handler) handleKubeSphereMetricsQuery(req *restful.Request, resp *restful.Response) {
......
...@@ -217,7 +217,7 @@ func TestParseRequestParams(t *testing.T) { ...@@ -217,7 +217,7 @@ func TestParseRequestParams(t *testing.T) {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
client := fake.NewSimpleClientset(&tt.namespace) client := fake.NewSimpleClientset(&tt.namespace)
fakeInformerFactory := informers.NewInformerFactories(client, nil, nil, nil, nil, nil) fakeInformerFactory := informers.NewInformerFactories(client, nil, nil, nil, nil, nil)
handler := newHandler(client, nil, fakeInformerFactory, nil) handler := newHandler(client, nil, nil, fakeInformerFactory, nil)
result, err := handler.makeQueryOptions(tt.params, tt.lvl) result, err := handler.makeQueryOptions(tt.params, tt.lvl)
if err != nil { if err != nil {
......
...@@ -39,10 +39,10 @@ const ( ...@@ -39,10 +39,10 @@ const (
var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha3"} var GroupVersion = schema.GroupVersion{Group: groupName, Version: "v1alpha3"}
func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, factory informers.InformerFactory, opClient openpitrix.Client) error { func AddToContainer(c *restful.Container, k8sClient kubernetes.Interface, monitoringClient monitoring.Interface, metricsClient monitoring.Interface, factory informers.InformerFactory, opClient openpitrix.Client) error {
ws := runtime.NewWebService(GroupVersion) ws := runtime.NewWebService(GroupVersion)
h := newHandler(k8sClient, monitoringClient, factory, opClient) h := newHandler(k8sClient, monitoringClient, metricsClient, factory, opClient)
ws.Route(ws.GET("/kubesphere"). ws.Route(ws.GET("/kubesphere").
To(h.handleKubeSphereMetricsQuery). To(h.handleKubeSphereMetricsQuery).
......
...@@ -47,18 +47,20 @@ type MonitoringOperator interface { ...@@ -47,18 +47,20 @@ type MonitoringOperator interface {
} }
type monitoringOperator struct { type monitoringOperator struct {
c monitoring.Interface prometheus monitoring.Interface
k8s kubernetes.Interface metricsserver monitoring.Interface
ks ksinformers.SharedInformerFactory k8s kubernetes.Interface
op openpitrix.Interface ks ksinformers.SharedInformerFactory
op openpitrix.Interface
} }
func NewMonitoringOperator(client monitoring.Interface, k8s kubernetes.Interface, factory informers.InformerFactory, opClient opclient.Client) MonitoringOperator { func NewMonitoringOperator(monitoringClient monitoring.Interface, metricsClient monitoring.Interface, k8s kubernetes.Interface, factory informers.InformerFactory, opClient opclient.Client) MonitoringOperator {
return &monitoringOperator{ return &monitoringOperator{
c: client, prometheus: monitoringClient,
k8s: k8s, metricsserver: metricsClient,
ks: factory.KubeSphereSharedInformerFactory(), k8s: k8s,
op: openpitrix.NewOpenpitrixOperator(factory.KubernetesSharedInformerFactory(), opClient), ks: factory.KubeSphereSharedInformerFactory(),
op: openpitrix.NewOpenpitrixOperator(factory.KubernetesSharedInformerFactory(), opClient),
} }
} }
...@@ -74,7 +76,7 @@ func (mo monitoringOperator) GetMetric(expr, namespace string, time time.Time) ( ...@@ -74,7 +76,7 @@ func (mo monitoringOperator) GetMetric(expr, namespace string, time time.Time) (
return monitoring.Metric{}, err return monitoring.Metric{}, err
} }
} }
return mo.c.GetMetric(expr, time), nil return mo.prometheus.GetMetric(expr, time), nil
} }
func (mo monitoringOperator) GetMetricOverTime(expr, namespace string, start, end time.Time, step time.Duration) (monitoring.Metric, error) { func (mo monitoringOperator) GetMetricOverTime(expr, namespace string, start, end time.Time, step time.Duration) (monitoring.Metric, error) {
...@@ -89,21 +91,55 @@ func (mo monitoringOperator) GetMetricOverTime(expr, namespace string, start, en ...@@ -89,21 +91,55 @@ func (mo monitoringOperator) GetMetricOverTime(expr, namespace string, start, en
return monitoring.Metric{}, err return monitoring.Metric{}, err
} }
} }
return mo.c.GetMetricOverTime(expr, start, end, step), nil return mo.prometheus.GetMetricOverTime(expr, start, end, step), nil
} }
func (mo monitoringOperator) GetNamedMetrics(metrics []string, time time.Time, opt monitoring.QueryOption) Metrics { func (mo monitoringOperator) GetNamedMetrics(metrics []string, time time.Time, opt monitoring.QueryOption) Metrics {
ress := mo.c.GetNamedMetrics(metrics, time, opt) ress := mo.prometheus.GetNamedMetrics(metrics, time, opt)
if mo.metricsserver != nil {
mr := mo.metricsserver.GetNamedMetrics(metrics, time, opt)
//Merge edge node metrics data
edgeMetrics := make(map[string]monitoring.MetricData)
for _, metric := range mr {
edgeMetrics[metric.MetricName] = metric.MetricData
}
for i, metric := range ress {
if val, ok := edgeMetrics[metric.MetricName]; ok {
ress[i].MetricData.MetricValues = append(ress[i].MetricData.MetricValues, val.MetricValues...)
}
}
}
return Metrics{Results: ress} return Metrics{Results: ress}
} }
func (mo monitoringOperator) GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt monitoring.QueryOption) Metrics { func (mo monitoringOperator) GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, opt monitoring.QueryOption) Metrics {
ress := mo.c.GetNamedMetricsOverTime(metrics, start, end, step, opt) ress := mo.prometheus.GetNamedMetricsOverTime(metrics, start, end, step, opt)
if mo.metricsserver != nil {
mr := mo.metricsserver.GetNamedMetricsOverTime(metrics, start, end, step, opt)
//Merge edge node metrics data
edgeMetrics := make(map[string]monitoring.MetricData)
for _, metric := range mr {
edgeMetrics[metric.MetricName] = metric.MetricData
}
for i, metric := range ress {
if val, ok := edgeMetrics[metric.MetricName]; ok {
ress[i].MetricData.MetricValues = append(ress[i].MetricData.MetricValues, val.MetricValues...)
}
}
}
return Metrics{Results: ress} return Metrics{Results: ress}
} }
func (mo monitoringOperator) GetMetadata(namespace string) Metadata { func (mo monitoringOperator) GetMetadata(namespace string) Metadata {
data := mo.c.GetMetadata(namespace) data := mo.prometheus.GetMetadata(namespace)
return Metadata{Data: data} return Metadata{Data: data}
} }
...@@ -121,7 +157,7 @@ func (mo monitoringOperator) GetMetricLabelSet(metric, namespace string, start, ...@@ -121,7 +157,7 @@ func (mo monitoringOperator) GetMetricLabelSet(metric, namespace string, start,
return MetricLabelSet{} return MetricLabelSet{}
} }
} }
data := mo.c.GetMetricLabelSet(expr, start, end) data := mo.prometheus.GetMetricLabelSet(expr, start, end)
return MetricLabelSet{Data: data} return MetricLabelSet{Data: data}
} }
......
/*
Copyright 2020 KubeSphere Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metricsserver
import (
"context"
"errors"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
promlabels "github.com/prometheus/prometheus/pkg/labels"
metricsapi "k8s.io/metrics/pkg/apis/metrics"
metricsV1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
"kubesphere.io/kubesphere/pkg/simple/client/k8s"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
)
// metricsServer implements monitoring interface backend by metrics-server
type metricsServer struct {
metricsAPIAvailable bool
metricsClient metricsclient.Interface
k8s kubernetes.Interface
}
var (
supportedMetricsAPIs = map[string]bool{
"v1beta1": true,
}
)
const edgeNodeLabel = "node-role.kubernetes.io/edge"
func metricsAPISupported(discoveredAPIGroups *metav1.APIGroupList) bool {
for _, discoveredAPIGroup := range discoveredAPIGroups.Groups {
if discoveredAPIGroup.Name != metricsapi.GroupName {
continue
}
for _, version := range discoveredAPIGroup.Versions {
if _, found := supportedMetricsAPIs[version.Version]; found {
return true
}
}
}
return false
}
func (m metricsServer) listEdgeNodes() (map[string]v1.Node, error) {
nodes := make(map[string]v1.Node)
nodeClient := m.k8s.CoreV1()
nodeList, err := nodeClient.Nodes().List(context.TODO(), metav1.ListOptions{
LabelSelector: edgeNodeLabel,
})
if err != nil {
return nodes, err
}
for _, n := range nodeList.Items {
nodes[n.Name] = n
}
return nodes, nil
}
func (m metricsServer) filterEdgeNodeNames(edgeNodes map[string]v1.Node, opts *monitoring.QueryOptions) map[string]bool {
edgeNodeNamesFiltered := make(map[string]bool)
regexMatcher, err := promlabels.NewMatcher(promlabels.MatchRegexp, "edgenodefilter", opts.ResourceFilter)
if err != nil {
klog.Errorf("Edge node filter regexp error %v\n", err)
return edgeNodeNamesFiltered
}
for _, n := range edgeNodes {
if regexMatcher.Matches(n.Name) {
edgeNodeNamesFiltered[n.Name] = true
}
}
return edgeNodeNamesFiltered
}
func (m metricsServer) getNodeMetricsFromMetricsAPI() (*metricsapi.NodeMetricsList, error) {
var err error
versionedMetrics := &metricsV1beta1.NodeMetricsList{}
mc := m.metricsClient.MetricsV1beta1()
nm := mc.NodeMetricses()
versionedMetrics, err = nm.List(context.TODO(), metav1.ListOptions{LabelSelector: edgeNodeLabel})
if err != nil {
return nil, err
}
metrics := &metricsapi.NodeMetricsList{}
err = metricsV1beta1.Convert_v1beta1_NodeMetricsList_To_metrics_NodeMetricsList(versionedMetrics, metrics, nil)
if err != nil {
return nil, err
}
return metrics, nil
}
func NewMetricsClient(k kubernetes.Interface, options *k8s.KubernetesOptions) monitoring.Interface {
config, err := clientcmd.BuildConfigFromFlags("", options.KubeConfig)
if err != nil {
klog.Error(err)
return nil
}
discoveryClient := k.Discovery()
apiGroups, err := discoveryClient.ServerGroups()
if err != nil {
klog.Error(err)
return nil
}
metricsAPIAvailable := metricsAPISupported(apiGroups)
if !metricsAPIAvailable {
klog.Warningf("Metrics API not available.")
return nil
}
metricsClient, err := metricsclient.NewForConfig(config)
if err != nil {
klog.Error(err)
return nil
}
return NewMetricsServer(k, metricsAPIAvailable, metricsClient)
}
func NewMetricsServer(k kubernetes.Interface, a bool, m metricsclient.Interface) monitoring.Interface {
var metricsServer metricsServer
metricsServer.k8s = k
metricsServer.metricsAPIAvailable = a
metricsServer.metricsClient = m
return metricsServer
}
func (m metricsServer) GetMetric(expr string, ts time.Time) monitoring.Metric {
var parsedResp monitoring.Metric
return parsedResp
}
func (m metricsServer) GetMetricOverTime(expr string, start, end time.Time, step time.Duration) monitoring.Metric {
var parsedResp monitoring.Metric
return parsedResp
}
const (
metricsNodeCPUUsage = "node_cpu_usage"
metricsNodeCPUTotal = "node_cpu_total"
metricsNodeCPUUltilisation = "node_cpu_utilisation"
metricsNodeMemoryUsageWoCache = "node_memory_usage_wo_cache"
metricsNodeMemoryTotal = "node_memory_total"
metricsNodeMemoryUltilisation = "node_memory_utilisation"
)
var edgeNodeMetrics = []string{metricsNodeCPUUsage, metricsNodeCPUTotal, metricsNodeCPUUltilisation, metricsNodeMemoryUsageWoCache, metricsNodeMemoryTotal, metricsNodeMemoryUltilisation}
func (m metricsServer) parseErrorResp(metrics []string, err error) []monitoring.Metric {
var res []monitoring.Metric
for _, metric := range metrics {
parsedResp := monitoring.Metric{MetricName: metric}
parsedResp.Error = err.Error()
}
return res
}
func (m metricsServer) GetNamedMetrics(metrics []string, ts time.Time, o monitoring.QueryOption) []monitoring.Metric {
var res []monitoring.Metric
opts := monitoring.NewQueryOptions()
o.Apply(opts)
if opts.Level == monitoring.LevelNode {
if !m.metricsAPIAvailable {
klog.Warningf("Metrics API not available.")
return m.parseErrorResp(metrics, errors.New("Metrics API not available."))
}
edgeNodes, err := m.listEdgeNodes()
if err != nil {
klog.Errorf("List edge nodes error %v\n", err)
return m.parseErrorResp(metrics, err)
}
edgeNodeNamesFiltered := m.filterEdgeNodeNames(edgeNodes, opts)
if len(edgeNodeNamesFiltered) == 0 {
klog.V(4).Infof("No edge node metrics is requested")
return res
}
metricsResult, err := m.getNodeMetricsFromMetricsAPI()
if err != nil {
klog.Errorf("Get edge node metrics error %v\n", err)
return m.parseErrorResp(metrics, err)
}
metricsMap := make(map[string]bool)
for _, m := range metrics {
metricsMap[m] = true
}
status := make(map[string]v1.NodeStatus)
for n, _ := range edgeNodeNamesFiltered {
status[n] = edgeNodes[n].Status
}
nodeMetrics := make(map[string]*monitoring.MetricData)
for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
nodeMetrics[enm] = &monitoring.MetricData{MetricType: monitoring.MetricTypeVector}
}
}
var usage v1.ResourceList
var cap v1.ResourceList
for _, m := range metricsResult.Items {
_, ok := edgeNodeNamesFiltered[m.Name]
if !ok {
continue
}
m.Usage.DeepCopyInto(&usage)
status[m.Name].Capacity.DeepCopyInto(&cap)
metricValues := make(map[string]*monitoring.MetricValue)
for _, enm := range edgeNodeMetrics {
metricValues[enm] = &monitoring.MetricValue{
Metadata: make(map[string]string),
}
metricValues[enm].Metadata["node"] = m.Name
metricValues[enm].Metadata["role"] = "edge"
}
for _, addr := range status[m.Name].Addresses {
if addr.Type == v1.NodeInternalIP {
for _, enm := range edgeNodeMetrics {
metricValues[enm].Metadata["host_ip"] = addr.Address
}
break
}
}
_, ok = metricsMap[metricsNodeCPUUsage]
if ok {
metricValues[metricsNodeCPUUsage].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / 1000}
}
_, ok = metricsMap[metricsNodeCPUTotal]
if ok {
metricValues[metricsNodeCPUTotal].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Cpu().MilliValue()) / 1000}
}
_, ok = metricsMap[metricsNodeCPUUltilisation]
if ok {
metricValues[metricsNodeCPUUltilisation].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / float64(cap.Cpu().MilliValue())}
}
_, ok = metricsMap[metricsNodeMemoryUsageWoCache]
if ok {
metricValues[metricsNodeMemoryUsageWoCache].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value())}
}
_, ok = metricsMap[metricsNodeMemoryTotal]
if ok {
metricValues[metricsNodeMemoryTotal].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Memory().Value())}
}
_, ok = metricsMap[metricsNodeMemoryUltilisation]
if ok {
metricValues[metricsNodeMemoryUltilisation].Sample = &monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value()) / float64(cap.Memory().Value())}
}
for _, enm := range edgeNodeMetrics {
_, ok = metricsMap[enm]
if ok {
nodeMetrics[enm].MetricValues = append(nodeMetrics[enm].MetricValues, *metricValues[enm])
}
}
}
for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
res = append(res, monitoring.Metric{MetricName: enm, MetricData: *nodeMetrics[enm]})
}
}
}
return res
}
func (m metricsServer) GetNamedMetricsOverTime(metrics []string, start, end time.Time, step time.Duration, o monitoring.QueryOption) []monitoring.Metric {
var res []monitoring.Metric
opts := monitoring.NewQueryOptions()
o.Apply(opts)
if opts.Level == monitoring.LevelNode {
if !m.metricsAPIAvailable {
klog.Warningf("Metrics API not available.")
return m.parseErrorResp(metrics, errors.New("Metrics API not available."))
}
edgeNodes, err := m.listEdgeNodes()
if err != nil {
klog.Errorf("List edge nodes error %v\n", err)
return m.parseErrorResp(metrics, err)
}
edgeNodeNamesFiltered := m.filterEdgeNodeNames(edgeNodes, opts)
if len(edgeNodeNamesFiltered) == 0 {
klog.V(4).Infof("No edge node metrics is requested")
return res
}
metricsResult, err := m.getNodeMetricsFromMetricsAPI()
if err != nil {
klog.Errorf("Get edge node metrics error %v\n", err)
return m.parseErrorResp(metrics, err)
}
metricsMap := make(map[string]bool)
for _, m := range metrics {
metricsMap[m] = true
}
status := make(map[string]v1.NodeStatus)
for n, _ := range edgeNodeNamesFiltered {
status[n] = edgeNodes[n].Status
}
nodeMetrics := make(map[string]*monitoring.MetricData)
for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
nodeMetrics[enm] = &monitoring.MetricData{MetricType: monitoring.MetricTypeMatrix}
}
}
var usage v1.ResourceList
var cap v1.ResourceList
for _, m := range metricsResult.Items {
_, ok := edgeNodeNamesFiltered[m.Name]
if !ok {
continue
}
m.Usage.DeepCopyInto(&usage)
status[m.Name].Capacity.DeepCopyInto(&cap)
metricValues := make(map[string]*monitoring.MetricValue)
for _, enm := range edgeNodeMetrics {
metricValues[enm] = &monitoring.MetricValue{
Metadata: make(map[string]string),
}
metricValues[enm].Metadata["node"] = m.Name
metricValues[enm].Metadata["role"] = "edge"
}
for _, addr := range status[m.Name].Addresses {
if addr.Type == v1.NodeInternalIP {
for _, enm := range edgeNodeMetrics {
metricValues[enm].Metadata["host_ip"] = addr.Address
}
break
}
}
_, ok = metricsMap[metricsNodeCPUUsage]
if ok {
metricValues[metricsNodeCPUUsage].Series = append(metricValues[metricsNodeCPUUsage].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / 1000})
}
_, ok = metricsMap[metricsNodeCPUTotal]
if ok {
metricValues[metricsNodeCPUTotal].Series = append(metricValues[metricsNodeCPUTotal].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Cpu().MilliValue()) / 1000})
}
_, ok = metricsMap[metricsNodeCPUUltilisation]
if ok {
metricValues[metricsNodeCPUUltilisation].Series = append(metricValues[metricsNodeCPUUltilisation].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Cpu().MilliValue()) / float64(cap.Cpu().MilliValue())})
}
_, ok = metricsMap[metricsNodeMemoryUsageWoCache]
if ok {
metricValues[metricsNodeMemoryUsageWoCache].Series = append(metricValues[metricsNodeMemoryUsageWoCache].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value())})
}
_, ok = metricsMap[metricsNodeMemoryTotal]
if ok {
metricValues[metricsNodeMemoryTotal].Series = append(metricValues[metricsNodeMemoryTotal].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(cap.Memory().Value())})
}
_, ok = metricsMap[metricsNodeMemoryUltilisation]
if ok {
metricValues[metricsNodeMemoryUltilisation].Series = append(metricValues[metricsNodeMemoryUltilisation].Series, monitoring.Point{float64(m.Timestamp.Unix()), float64(usage.Memory().Value()) / float64(cap.Memory().Value())})
}
for _, enm := range edgeNodeMetrics {
_, ok = metricsMap[enm]
if ok {
nodeMetrics[enm].MetricValues = append(nodeMetrics[enm].MetricValues, *metricValues[enm])
}
}
}
for _, enm := range edgeNodeMetrics {
_, ok := metricsMap[enm]
if ok {
res = append(res, monitoring.Metric{MetricName: enm, MetricData: *nodeMetrics[enm]})
}
}
}
return res
}
func (m metricsServer) GetMetadata(namespace string) []monitoring.Metadata {
var meta []monitoring.Metadata
return meta
}
func (m metricsServer) GetMetricLabelSet(expr string, start, end time.Time) []map[string]string {
var res []map[string]string
return res
}
package metricsserver
import (
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/json-iterator/go"
"io/ioutil"
"k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
fakek8s "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
metricsV1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
"kubesphere.io/kubesphere/pkg/simple/client/monitoring"
)
// mergeResourceLists will merge resoure lists. When two lists have the same resourece, the value from
// the last list will be present in the result
func mergeResourceLists(resourceLists ...corev1.ResourceList) corev1.ResourceList {
result := corev1.ResourceList{}
for _, rl := range resourceLists {
for resource, quantity := range rl {
result[resource] = quantity
}
}
return result
}
func getResourceList(cpu, memory string) corev1.ResourceList {
res := corev1.ResourceList{}
if cpu != "" {
res[corev1.ResourceCPU] = resource.MustParse(cpu)
}
if memory != "" {
res[corev1.ResourceMemory] = resource.MustParse(memory)
}
return res
}
var nodeCapacity = mergeResourceLists(getResourceList("8", "8Gi"))
var node1 = &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "edgenode-1",
Labels: map[string]string{
"node-role.kubernetes.io/edge": "",
},
},
Status: corev1.NodeStatus{
Capacity: nodeCapacity,
},
}
var node2 = &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "edgenode-2",
Labels: map[string]string{
"node-role.kubernetes.io/edge": "",
},
},
Status: corev1.NodeStatus{
Capacity: nodeCapacity,
},
}
func TestGetNamedMetrics(t *testing.T) {
tests := []struct {
metrics []string
filter string
expected string
}{
{
metrics: []string{"node_cpu_usage", "node_memory_usage_wo_cache"},
filter: ".*",
expected: "metrics-vector-1.json",
},
{
metrics: []string{"node_cpu_usage", "node_cpu_utilisation"},
filter: "edgenode-2",
expected: "metrics-vector-2.json",
},
{
metrics: []string{"node_memory_usage_wo_cache", "node_memory_utilisation"},
filter: "edgenode-1|edgenode-2",
expected: "metrics-vector-3.json",
},
}
fakeK8sClient := fakek8s.NewSimpleClientset(node1, node2)
informer := informers.NewSharedInformerFactory(fakeK8sClient, 0)
informer.Core().V1().Nodes().Informer().GetIndexer().Add(node1)
informer.Core().V1().Nodes().Informer().GetIndexer().Add(node2)
fakeMetricsclient := &fakemetricsclient.Clientset{}
layout := "2006-01-02T15:04:05.000Z"
str := "2021-01-25T12:34:56.789Z"
metricsTime, _ := time.Parse(layout, str)
fakeMetricsclient.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
metrics := &metricsV1beta1.NodeMetricsList{}
nodeMetric1 := metricsV1beta1.NodeMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: "edgenode-1",
Labels: map[string]string{
"node-role.kubernetes.io/edge": "",
},
},
Timestamp: metav1.Time{Time: metricsTime},
Window: metav1.Duration{Duration: time.Minute},
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(
int64(1000),
resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(
int64(1024*1024),
resource.BinarySI),
},
}
nodeMetric2 := metricsV1beta1.NodeMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: "edgenode-2",
Labels: map[string]string{
"node-role.kubernetes.io/edge": "",
},
},
Timestamp: metav1.Time{Time: metricsTime},
Window: metav1.Duration{Duration: time.Minute},
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(
int64(2000),
resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(
int64(2*1024*1024),
resource.BinarySI),
},
}
metrics.Items = append(metrics.Items, nodeMetric1)
metrics.Items = append(metrics.Items, nodeMetric2)
return true, metrics, nil
})
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
expected := make([]monitoring.Metric, 0)
err := jsonFromFile(tt.expected, &expected)
if err != nil {
t.Fatal(err)
}
client := NewMetricsServer(fakeK8sClient, true, fakeMetricsclient)
result := client.GetNamedMetrics(tt.metrics, time.Now(), monitoring.NodeOption{ResourceFilter: tt.filter})
if diff := cmp.Diff(result, expected); diff != "" {
t.Fatalf("%T differ (-got, +want): %s", expected, diff)
}
})
}
}
func TestGetNamedMetricsOverTime(t *testing.T) {
tests := []struct {
metrics []string
filter string
expected string
}{
{
metrics: []string{"node_cpu_usage", "node_memory_usage_wo_cache"},
filter: ".*",
expected: "metrics-matrix-1.json",
},
{
metrics: []string{"node_cpu_usage", "node_cpu_utilisation"},
filter: "edgenode-2",
expected: "metrics-matrix-2.json",
},
{
metrics: []string{"node_memory_usage_wo_cache", "node_memory_utilisation"},
filter: "edgenode-1|edgenode-2",
expected: "metrics-matrix-3.json",
},
}
fakeK8sClient := fakek8s.NewSimpleClientset(node1, node2)
informer := informers.NewSharedInformerFactory(fakeK8sClient, 0)
informer.Core().V1().Nodes().Informer().GetIndexer().Add(node1)
informer.Core().V1().Nodes().Informer().GetIndexer().Add(node2)
fakeMetricsclient := &fakemetricsclient.Clientset{}
layout := "2006-01-02T15:04:05.000Z"
str := "2021-01-25T12:34:56.789Z"
metricsTime, _ := time.Parse(layout, str)
fakeMetricsclient.AddReactor("list", "nodes", func(action core.Action) (handled bool, ret runtime.Object, err error) {
metrics := &metricsV1beta1.NodeMetricsList{}
nodeMetric1 := metricsV1beta1.NodeMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: "edgenode-1",
Labels: map[string]string{
"node-role.kubernetes.io/edge": "",
},
},
Timestamp: metav1.Time{Time: metricsTime},
Window: metav1.Duration{Duration: time.Minute},
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(
int64(1000),
resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(
int64(1024*1024),
resource.BinarySI),
},
}
nodeMetric2 := metricsV1beta1.NodeMetrics{
ObjectMeta: metav1.ObjectMeta{
Name: "edgenode-2",
Labels: map[string]string{
"node-role.kubernetes.io/edge": "",
},
},
Timestamp: metav1.Time{Time: metricsTime},
Window: metav1.Duration{Duration: time.Minute},
Usage: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(
int64(2000),
resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(
int64(2*1024*1024),
resource.BinarySI),
},
}
metrics.Items = append(metrics.Items, nodeMetric1)
metrics.Items = append(metrics.Items, nodeMetric2)
return true, metrics, nil
})
for i, tt := range tests {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
expected := make([]monitoring.Metric, 0)
err := jsonFromFile(tt.expected, &expected)
if err != nil {
t.Fatal(err)
}
client := NewMetricsServer(fakeK8sClient, true, fakeMetricsclient)
result := client.GetNamedMetricsOverTime(tt.metrics, time.Now().Add(-time.Minute*3), time.Now(), time.Minute, monitoring.NodeOption{ResourceFilter: tt.filter})
if diff := cmp.Diff(result, expected); diff != "" {
t.Fatalf("%T differ (-got, +want): %s", expected, diff)
}
})
}
}
func jsonFromFile(expectedFile string, expectedJsonPtr interface{}) error {
json, err := ioutil.ReadFile(fmt.Sprintf("./testdata/%s", expectedFile))
if err != nil {
return err
}
err = jsoniter.Unmarshal(json, expectedJsonPtr)
if err != nil {
return err
}
return nil
}
[
{
"metric_name": "node_cpu_usage",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"values": [
[
1611578096,
"1"
]
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"values": [
[
1611578096,
"2"
]
]
}
]
}
},
{
"metric_name": "node_memory_usage_wo_cache",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"values": [
[
1611578096,
"1048576"
]
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"values": [
[
1611578096,
"2097152"
]
]
}
]
}
}
]
\ No newline at end of file
[
{
"metric_name": "node_cpu_usage",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"values": [
[
1611578096,
"2"
]
]
}
]
}
},
{
"metric_name": "node_cpu_utilisation",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"values": [
[
1611578096,
"0.25"
]
]
}
]
}
}
]
\ No newline at end of file
[
{
"metric_name": "node_memory_usage_wo_cache",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"values": [
[
1611578096,
"1048576"
]
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"values": [
[
1611578096,
"2097152"
]
]
}
]
}
},
{
"metric_name": "node_memory_utilisation",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"values": [
[
1611578096,
"0.0001220703125"
]
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"values": [
[
1611578096,
"0.000244140625"
]
]
}
]
}
}
]
\ No newline at end of file
[
{
"metric_name": "node_cpu_usage",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"value": [
1611578096,
"1"
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"value": [
1611578096,
"2"
]
}
]
}
},
{
"metric_name": "node_memory_usage_wo_cache",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"value": [
1611578096,
"1048576"
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"value": [
1611578096,
"2097152"
]
}
]
}
}
]
\ No newline at end of file
[
{
"metric_name": "node_cpu_usage",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"value": [
1611578096,
"2"
]
}
]
}
},
{
"metric_name": "node_cpu_utilisation",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"value": [
1611578096,
"0.25"
]
}
]
}
}
]
\ No newline at end of file
[
{
"metric_name": "node_memory_usage_wo_cache",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"value": [
1611578096,
"1048576"
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"value": [
1611578096,
"2097152"
]
}
]
}
},
{
"metric_name": "node_memory_utilisation",
"data": {
"resultType": "vector",
"result": [
{
"metric": {
"node": "edgenode-1",
"role": "edge"
},
"value": [
1611578096,
"0.0001220703125"
]
},
{
"metric": {
"node": "edgenode-2",
"role": "edge"
},
"value": [
1611578096,
"0.000244140625"
]
}
]
}
}
]
\ No newline at end of file
...@@ -120,7 +120,7 @@ func generateSwaggerJson() []byte { ...@@ -120,7 +120,7 @@ func generateSwaggerJson() []byte {
urlruntime.Must(devopsv1alpha2.AddToContainer(container, informerFactory.KubeSphereSharedInformerFactory(), &fakedevops.Devops{}, nil, clientsets.KubeSphere(), fakes3.NewFakeS3(), "", nil)) urlruntime.Must(devopsv1alpha2.AddToContainer(container, informerFactory.KubeSphereSharedInformerFactory(), &fakedevops.Devops{}, nil, clientsets.KubeSphere(), fakes3.NewFakeS3(), "", nil))
urlruntime.Must(devopsv1alpha3.AddToContainer(container, &fakedevops.Devops{}, clientsets.Kubernetes(), clientsets.KubeSphere(), informerFactory.KubeSphereSharedInformerFactory(), informerFactory.KubernetesSharedInformerFactory())) urlruntime.Must(devopsv1alpha3.AddToContainer(container, &fakedevops.Devops{}, clientsets.Kubernetes(), clientsets.KubeSphere(), informerFactory.KubeSphereSharedInformerFactory(), informerFactory.KubernetesSharedInformerFactory()))
urlruntime.Must(iamv1alpha2.AddToContainer(container, nil, nil, group.New(informerFactory, clientsets.KubeSphere(), clientsets.Kubernetes()), nil)) urlruntime.Must(iamv1alpha2.AddToContainer(container, nil, nil, group.New(informerFactory, clientsets.KubeSphere(), clientsets.Kubernetes()), nil))
urlruntime.Must(monitoringv1alpha3.AddToContainer(container, clientsets.Kubernetes(), nil, informerFactory, nil)) urlruntime.Must(monitoringv1alpha3.AddToContainer(container, clientsets.Kubernetes(), nil, nil, informerFactory, nil))
urlruntime.Must(openpitrixv1.AddToContainer(container, informerFactory, openpitrix.NewMockClient(nil))) urlruntime.Must(openpitrixv1.AddToContainer(container, informerFactory, openpitrix.NewMockClient(nil)))
urlruntime.Must(operationsv1alpha2.AddToContainer(container, clientsets.Kubernetes())) urlruntime.Must(operationsv1alpha2.AddToContainer(container, clientsets.Kubernetes()))
urlruntime.Must(resourcesv1alpha2.AddToContainer(container, clientsets.Kubernetes(), informerFactory, "")) urlruntime.Must(resourcesv1alpha2.AddToContainer(container, clientsets.Kubernetes(), informerFactory, ""))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册