未验证 提交 55202ce7 编写于 作者: C Cai Yudong 提交者: GitHub

Support show knowhere prometheus metrics (#23102)

Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 5f4673fd
......@@ -47,14 +47,12 @@ import (
)
// all milvus related metrics is in a separate registry
var Registry *prometheus.Registry
var Registry *metrics.MilvusRegistry
func init() {
Registry = prometheus.NewRegistry()
Registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
Registry.MustRegister(prometheus.NewGoCollector())
metrics.RegisterEtcdMetrics(Registry)
metrics.RegisterMq(Registry)
Registry = metrics.NewMilvusRegistry()
metrics.RegisterEtcdMetrics(Registry.GoRegistry)
metrics.RegisterMq(Registry.GoRegistry)
}
func stopRocksmq() {
......@@ -95,7 +93,7 @@ func runComponent[T component](ctx context.Context,
wg.Wait()
healthz.Register(role)
metricRegister(Registry)
metricRegister(Registry.GoRegistry)
return role
}
......@@ -193,7 +191,7 @@ func (mr *MilvusRoles) setupLogger() {
}
// Register serves prometheus http service
func setupPrometheusHTTPServer(r *prometheus.Registry) {
func setupPrometheusHTTPServer(r *metrics.MilvusRegistry) {
http.Register(&http.Handler{
Path: "/metrics",
Handler: promhttp.HandlerFor(r, promhttp.HandlerOpts{}),
......
......@@ -26,6 +26,7 @@ set(SEGCORE_FILES
FieldIndexing.cpp
InsertRecord.cpp
Reduce.cpp
metrics_c.cpp
plan_c.cpp
reduce_c.cpp
load_index_c.cpp
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#include <string>
#include "knowhere/comp/prometheus_client.h"
#include "segcore/metrics_c.h"
char*
GetKnowhereMetrics() {
auto str = knowhere::prometheusClient->GetMetrics();
auto len = str.length();
char* res = (char*)malloc(len + 1);
memcpy(res, str.data(), len);
return res;
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
char*
GetKnowhereMetrics();
#ifdef __cplusplus
}
#endif
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
/*
#cgo pkg-config: milvus_segcore milvus_common
#include <stdlib.h>
#include "segcore/metrics_c.h"
*/
import "C"
import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"sort"
"strings"
"sync"
"unsafe"
dto "github.com/prometheus/client_model/go"
)
// metricSorter is a sortable slice of *dto.Metric.
type metricSorter []*dto.Metric
func (s metricSorter) Len() int {
return len(s)
}
func (s metricSorter) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s metricSorter) Less(i, j int) bool {
if len(s[i].Label) != len(s[j].Label) {
// This should not happen. The metrics are
// inconsistent. However, we have to deal with the fact, as
// people might use custom collectors or metric family injection
// to create inconsistent metrics. So let's simply compare the
// number of labels in this case. That will still yield
// reproducible sorting.
return len(s[i].Label) < len(s[j].Label)
}
for n, lp := range s[i].Label {
vi := lp.GetValue()
vj := s[j].Label[n].GetValue()
if vi != vj {
return vi < vj
}
}
// We should never arrive here. Multiple metrics with the same
// label set in the same scrape will lead to undefined ingestion
// behavior. However, as above, we have to provide stable sorting
// here, even for inconsistent metrics. So sort equal metrics
// by their timestamp, with missing timestamps (implying "now")
// coming last.
if s[i].TimestampMs == nil {
return false
}
if s[j].TimestampMs == nil {
return true
}
return s[i].GetTimestampMs() < s[j].GetTimestampMs()
}
// NormalizeMetricFamilies returns a MetricFamily slice with empty
// MetricFamilies pruned and the remaining MetricFamilies sorted by name within
// the slice, with the contained Metrics sorted within each MetricFamily.
func NormalizeMetricFamilies(metricFamiliesByName map[string]*dto.MetricFamily) []*dto.MetricFamily {
for _, mf := range metricFamiliesByName {
sort.Sort(metricSorter(mf.Metric))
}
names := make([]string, 0, len(metricFamiliesByName))
for name, mf := range metricFamiliesByName {
if len(mf.Metric) > 0 {
names = append(names, name)
}
}
sort.Strings(names)
result := make([]*dto.MetricFamily, 0, len(names))
for _, name := range names {
result = append(result, metricFamiliesByName[name])
}
return result
}
func NewCRegistry() *CRegistry {
return &CRegistry{
Registry: prometheus.NewRegistry(),
}
}
// only re-write the implementation of Gather()
type CRegistry struct {
*prometheus.Registry
mtx sync.RWMutex
}
// Gather implements Gatherer.
func (r *CRegistry) Gather() (res []*dto.MetricFamily, err error) {
var (
parser expfmt.TextParser
)
r.mtx.RLock()
cMetricsStr := C.GetKnowhereMetrics()
metricsStr := C.GoString(cMetricsStr)
C.free(unsafe.Pointer(cMetricsStr))
out, err := parser.TextToMetricFamilies(strings.NewReader(metricsStr))
if err != nil {
log.Error("fail to parse prometheus metrics")
return
}
res = NormalizeMetricFamilies(out)
return
}
......@@ -18,20 +18,18 @@ package metrics
import (
"testing"
"github.com/prometheus/client_golang/prometheus"
)
func TestRegisterMetrics(t *testing.T) {
r := prometheus.NewRegistry()
r := NewMilvusRegistry()
// Make sure it doesn't panic.
RegisterRootCoord(r)
RegisterDataNode(r)
RegisterDataCoord(r)
RegisterIndexNode(r)
RegisterProxy(r)
RegisterQueryNode(r)
RegisterQueryCoord(r)
RegisterEtcdMetrics(r)
RegisterMq(r)
RegisterRootCoord(r.GoRegistry)
RegisterDataNode(r.GoRegistry)
RegisterDataCoord(r.GoRegistry)
RegisterIndexNode(r.GoRegistry)
RegisterProxy(r.GoRegistry)
RegisterQueryNode(r.GoRegistry)
RegisterQueryCoord(r.GoRegistry)
RegisterEtcdMetrics(r.GoRegistry)
RegisterMq(r.GoRegistry)
}
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)
func NewMilvusRegistry() *MilvusRegistry {
r := &MilvusRegistry{
GoRegistry: prometheus.NewRegistry(),
CRegistry: NewCRegistry(),
}
r.GoRegistry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
r.GoRegistry.MustRegister(prometheus.NewGoCollector())
return r
}
// re-write the implementation of Gather()
type MilvusRegistry struct {
GoRegistry *prometheus.Registry
CRegistry *CRegistry
}
// Gather implements Gatherer.
func (r *MilvusRegistry) Gather() ([]*dto.MetricFamily, error) {
var res []*dto.MetricFamily
res1, err := r.GoRegistry.Gather()
if err != nil {
return res, err
}
res2, err := r.CRegistry.Gather()
if err != nil {
return res, err
}
res = append(res1, res2...)
return res, nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册