未验证 提交 787d4f8b 编写于 作者: X Xiaofan 提交者: GitHub

Refine prometheus metrics and etcd log (#16084)

Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
上级 31c5b286
......@@ -51,10 +51,20 @@ import (
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/prometheus/client_golang/prometheus"
)
var Params paramtable.ComponentParam
// all milvus related metrics is in a separate registry
var Registry *prometheus.Registry
func init() {
Registry = prometheus.NewRegistry()
Registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
Registry.MustRegister(prometheus.NewGoCollector())
}
func newMsgFactory(localMsg bool) msgstream.Factory {
if localMsg {
return msgstream.NewRmsFactory()
......@@ -117,7 +127,7 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone
}()
wg.Wait()
metrics.RegisterRootCoord()
metrics.RegisterRootCoord(Registry)
return rc
}
......@@ -149,7 +159,7 @@ func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string
}()
wg.Wait()
metrics.RegisterProxy()
metrics.RegisterProxy(Registry)
return pn
}
......@@ -180,7 +190,7 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon
}()
wg.Wait()
metrics.RegisterQueryCoord()
metrics.RegisterQueryCoord(Registry)
return qs
}
......@@ -212,7 +222,7 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias st
}()
wg.Wait()
metrics.RegisterQueryNode()
metrics.RegisterQueryNode(Registry)
return qn
}
......@@ -245,7 +255,7 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone
}()
wg.Wait()
metrics.RegisterDataCoord()
metrics.RegisterDataCoord(Registry)
return ds
}
......@@ -277,7 +287,7 @@ func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias str
}()
wg.Wait()
metrics.RegisterDataNode()
metrics.RegisterDataNode(Registry)
return dn
}
......@@ -307,7 +317,7 @@ func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *compon
}()
wg.Wait()
metrics.RegisterIndexCoord()
metrics.RegisterIndexCoord(Registry)
return is
}
......@@ -338,7 +348,7 @@ func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, alias st
}()
wg.Wait()
metrics.RegisterIndexNode()
metrics.RegisterIndexNode(Registry)
return in
}
......@@ -486,7 +496,7 @@ func (mr *MilvusRoles) Run(local bool, alias string) {
http.HandleFunc(healthz.HealthzRouterPath, standaloneHealthzHandler)
}
metrics.ServeHTTP()
metrics.ServeHTTP(Registry)
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
......
......@@ -26,7 +26,6 @@ import (
"time"
ot "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/milvus-io/milvus/internal/datacoord"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
......@@ -151,9 +150,7 @@ func (s *Server) startGrpcLoop(grpcPort int) {
grpc.MaxSendMsgSize(Params.ServerMaxSendSize),
grpc.UnaryInterceptor(ot.UnaryServerInterceptor(opts...)),
grpc.StreamInterceptor(ot.StreamServerInterceptor(opts...)))
//grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor))
datapb.RegisterDataCoordServer(s.grpcServer, s)
grpc_prometheus.Register(s.grpcServer)
go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
if err := s.grpcServer.Serve(lis); err != nil {
s.grpcErrChan <- err
......
......@@ -124,15 +124,10 @@ var (
)
//RegisterDataCoord registers DataCoord metrics
func RegisterDataCoord() {
prometheus.MustRegister(DataCoordNumDataNodes)
prometheus.MustRegister(DataCoordNumSegments)
prometheus.MustRegister(DataCoordNumCollections)
prometheus.MustRegister(DataCoordNumStoredRows)
prometheus.MustRegister(DataCoordSyncUTC)
// prometheus.MustRegister(DataCoordSegmentSizeRatio)
// prometheus.MustRegister(DataCoordSegmentFlushDuration)
// prometheus.MustRegister(DataCoordCompactDuration)
// prometheus.MustRegister(DataCoordCompactLoad)
// prometheus.MustRegister(DataCoordNumCompactionTask)
func RegisterDataCoord(registry *prometheus.Registry) {
registry.MustRegister(DataCoordNumDataNodes)
registry.MustRegister(DataCoordNumSegments)
registry.MustRegister(DataCoordNumCollections)
registry.MustRegister(DataCoordNumStoredRows)
registry.MustRegister(DataCoordSyncUTC)
}
......@@ -196,21 +196,19 @@ var (
)
//RegisterDataNode registers DataNode metrics
func RegisterDataNode() {
prometheus.MustRegister(DataNodeNumFlowGraphs)
prometheus.MustRegister(DataNodeConsumeMsgRowsCount)
prometheus.MustRegister(DataNodeFlushedSize)
//prometheus.MustRegister(DataNodeNumDmlChannels)
//prometheus.MustRegister(DataNodeNumDeltaChannels)
prometheus.MustRegister(DataNodeNumConsumers)
prometheus.MustRegister(DataNodeNumProducers)
prometheus.MustRegister(DataNodeTimeSync)
prometheus.MustRegister(DataNodeSegmentRowsCount)
prometheus.MustRegister(DataNodeNumUnflushedSegments)
prometheus.MustRegister(DataNodeFlushSegmentLatency)
prometheus.MustRegister(DataNodeSave2StorageLatency)
prometheus.MustRegister(DataNodeFlushSegmentCount)
prometheus.MustRegister(DataNodeAutoFlushSegmentCount)
prometheus.MustRegister(DataNodeCompactionLatency)
prometheus.MustRegister(DataNodeFlushSegmentsReqCounter)
func RegisterDataNode(registry *prometheus.Registry) {
registry.MustRegister(DataNodeNumFlowGraphs)
registry.MustRegister(DataNodeConsumeMsgRowsCount)
registry.MustRegister(DataNodeFlushedSize)
registry.MustRegister(DataNodeNumConsumers)
registry.MustRegister(DataNodeNumProducers)
registry.MustRegister(DataNodeTimeSync)
registry.MustRegister(DataNodeSegmentRowsCount)
registry.MustRegister(DataNodeNumUnflushedSegments)
registry.MustRegister(DataNodeFlushSegmentLatency)
registry.MustRegister(DataNodeSave2StorageLatency)
registry.MustRegister(DataNodeFlushSegmentCount)
registry.MustRegister(DataNodeAutoFlushSegmentCount)
registry.MustRegister(DataNodeCompactionLatency)
registry.MustRegister(DataNodeFlushSegmentsReqCounter)
}
......@@ -51,8 +51,8 @@ var (
)
//RegisterIndexCoord registers IndexCoord metrics
func RegisterIndexCoord() {
prometheus.MustRegister(IndexCoordIndexRequestCounter)
prometheus.MustRegister(IndexCoordIndexTaskCounter)
prometheus.MustRegister(IndexCoordIndexNodeNum)
func RegisterIndexCoord(registry *prometheus.Registry) {
registry.MustRegister(IndexCoordIndexRequestCounter)
registry.MustRegister(IndexCoordIndexTaskCounter)
registry.MustRegister(IndexCoordIndexNodeNum)
}
......@@ -77,11 +77,11 @@ var (
)
//RegisterIndexNode registers IndexNode metrics
func RegisterIndexNode() {
prometheus.MustRegister(IndexNodeBuildIndexTaskCounter)
prometheus.MustRegister(IndexNodeLoadBinlogLatency)
prometheus.MustRegister(IndexNodeDecodeBinlogLatency)
prometheus.MustRegister(IndexNodeKnowhereBuildIndexLatency)
prometheus.MustRegister(IndexNodeEncodeIndexFileLatency)
prometheus.MustRegister(IndexNodeSaveIndexFileLatency)
func RegisterIndexNode(registry *prometheus.Registry) {
registry.MustRegister(IndexNodeBuildIndexTaskCounter)
registry.MustRegister(IndexNodeLoadBinlogLatency)
registry.MustRegister(IndexNodeDecodeBinlogLatency)
registry.MustRegister(IndexNodeKnowhereBuildIndexLatency)
registry.MustRegister(IndexNodeEncodeIndexFileLatency)
registry.MustRegister(IndexNodeSaveIndexFileLatency)
}
......@@ -18,7 +18,6 @@ package metrics
import (
"net/http"
// nolint:gosec
_ "net/http/pprof"
......@@ -74,8 +73,9 @@ var (
)
//ServeHTTP serves prometheus http service
func ServeHTTP() {
http.Handle("/metrics", promhttp.Handler())
func ServeHTTP(r *prometheus.Registry) {
http.Handle("/metrics", promhttp.HandlerFor(r, promhttp.HandlerOpts{}))
http.Handle("/metrics_default", promhttp.Handler())
go func() {
if err := http.ListenAndServe(":9091", nil); err != nil {
log.Error("handle metrics failed", zap.Error(err))
......
......@@ -18,16 +18,19 @@ package metrics
import (
"testing"
"github.com/prometheus/client_golang/prometheus"
)
func TestRegisterMetrics(t *testing.T) {
r := prometheus.NewRegistry()
// Make sure it doesn't panic.
RegisterRootCoord()
RegisterDataNode()
RegisterDataCoord()
RegisterIndexNode()
RegisterIndexCoord()
RegisterProxy()
RegisterQueryNode()
RegisterQueryCoord()
RegisterRootCoord(r)
RegisterDataNode(r)
RegisterDataCoord(r)
RegisterIndexNode(r)
RegisterIndexCoord(r)
RegisterProxy(r)
RegisterQueryNode(r)
RegisterQueryCoord(r)
}
......@@ -273,38 +273,36 @@ var (
)
//RegisterProxy registers Proxy metrics
func RegisterProxy() {
prometheus.MustRegister(ProxySearchCount)
prometheus.MustRegister(ProxyInsertCount)
prometheus.MustRegister(ProxySearchVectors)
prometheus.MustRegister(ProxyInsertVectors)
//prometheus.MustRegister(ProxyLinkedSDKs)
prometheus.MustRegister(ProxySearchLatency)
prometheus.MustRegister(ProxySearchLatencyPerNQ)
prometheus.MustRegister(ProxySendMessageLatency)
prometheus.MustRegister(ProxyWaitForSearchResultLatency)
prometheus.MustRegister(ProxyReduceSearchResultLatency)
prometheus.MustRegister(ProxyDecodeSearchResultLatency)
prometheus.MustRegister(ProxyMsgStreamObjectsForPChan)
prometheus.MustRegister(ProxyMsgStreamObjectsForSearch)
prometheus.MustRegister(ProxyInsertLatency)
prometheus.MustRegister(ProxySendInsertReqLatency)
prometheus.MustRegister(ProxyCacheHitCounter)
prometheus.MustRegister(ProxyUpdateCacheLatency)
prometheus.MustRegister(ProxySyncTimeTick)
prometheus.MustRegister(ProxyApplyPrimaryKeyLatency)
prometheus.MustRegister(ProxyApplyTimestampLatency)
prometheus.MustRegister(ProxyDDLFunctionCall)
prometheus.MustRegister(ProxyDQLFunctionCall)
prometheus.MustRegister(ProxyDMLFunctionCall)
prometheus.MustRegister(ProxyDDLReqLatency)
prometheus.MustRegister(ProxyDMLReqLatency)
prometheus.MustRegister(ProxyDQLReqLatency)
func RegisterProxy(registry *prometheus.Registry) {
registry.MustRegister(ProxySearchCount)
registry.MustRegister(ProxyInsertCount)
registry.MustRegister(ProxySearchVectors)
registry.MustRegister(ProxyInsertVectors)
registry.MustRegister(ProxySearchLatency)
registry.MustRegister(ProxySearchLatencyPerNQ)
registry.MustRegister(ProxySendMessageLatency)
registry.MustRegister(ProxyWaitForSearchResultLatency)
registry.MustRegister(ProxyReduceSearchResultLatency)
registry.MustRegister(ProxyDecodeSearchResultLatency)
registry.MustRegister(ProxyMsgStreamObjectsForPChan)
registry.MustRegister(ProxyMsgStreamObjectsForSearch)
registry.MustRegister(ProxyInsertLatency)
registry.MustRegister(ProxySendInsertReqLatency)
registry.MustRegister(ProxyCacheHitCounter)
registry.MustRegister(ProxyUpdateCacheLatency)
registry.MustRegister(ProxySyncTimeTick)
registry.MustRegister(ProxyApplyPrimaryKeyLatency)
registry.MustRegister(ProxyApplyTimestampLatency)
registry.MustRegister(ProxyDDLFunctionCall)
registry.MustRegister(ProxyDQLFunctionCall)
registry.MustRegister(ProxyDMLFunctionCall)
registry.MustRegister(ProxyDDLReqLatency)
registry.MustRegister(ProxyDMLReqLatency)
registry.MustRegister(ProxyDQLReqLatency)
}
......@@ -112,15 +112,15 @@ var (
)
//RegisterQueryCoord registers QueryCoord metrics
func RegisterQueryCoord() {
prometheus.MustRegister(QueryCoordNumCollections)
prometheus.MustRegister(QueryCoordNumEntities)
prometheus.MustRegister(QueryCoordLoadCount)
prometheus.MustRegister(QueryCoordReleaseCount)
prometheus.MustRegister(QueryCoordLoadLatency)
prometheus.MustRegister(QueryCoordReleaseLatency)
prometheus.MustRegister(QueryCoordNumChildTasks)
prometheus.MustRegister(QueryCoordNumParentTasks)
prometheus.MustRegister(QueryCoordChildTaskLatency)
prometheus.MustRegister(QueryCoordNumQueryNodes)
func RegisterQueryCoord(registry *prometheus.Registry) {
registry.MustRegister(QueryCoordNumCollections)
registry.MustRegister(QueryCoordNumEntities)
registry.MustRegister(QueryCoordLoadCount)
registry.MustRegister(QueryCoordReleaseCount)
registry.MustRegister(QueryCoordLoadLatency)
registry.MustRegister(QueryCoordReleaseLatency)
registry.MustRegister(QueryCoordNumChildTasks)
registry.MustRegister(QueryCoordNumParentTasks)
registry.MustRegister(QueryCoordChildTaskLatency)
registry.MustRegister(QueryCoordNumQueryNodes)
}
......@@ -200,21 +200,21 @@ var (
)
//RegisterQueryNode registers QueryNode metrics
func RegisterQueryNode() {
prometheus.MustRegister(QueryNodeNumCollections)
prometheus.MustRegister(QueryNodeNumPartitions)
prometheus.MustRegister(QueryNodeNumSegments)
prometheus.MustRegister(QueryNodeNumDmlChannels)
prometheus.MustRegister(QueryNodeNumDeltaChannels)
prometheus.MustRegister(QueryNodeNumConsumers)
prometheus.MustRegister(QueryNodeSQCount)
prometheus.MustRegister(QueryNodeSQReqLatency)
prometheus.MustRegister(QueryNodeSQLatencyInQueue)
prometheus.MustRegister(QueryNodeSQSegmentLatency)
prometheus.MustRegister(QueryNodeSQSegmentLatencyInCore)
prometheus.MustRegister(QueryNodeTranslateHitsLatency)
prometheus.MustRegister(QueryNodeReduceLatency)
prometheus.MustRegister(QueryNodeLoadSegmentLatency)
prometheus.MustRegister(QueryNodeServiceTime)
prometheus.MustRegister(QueryNodeNumFlowGraphs)
func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeNumCollections)
registry.MustRegister(QueryNodeNumPartitions)
registry.MustRegister(QueryNodeNumSegments)
registry.MustRegister(QueryNodeNumDmlChannels)
registry.MustRegister(QueryNodeNumDeltaChannels)
registry.MustRegister(QueryNodeNumConsumers)
registry.MustRegister(QueryNodeSQCount)
registry.MustRegister(QueryNodeSQReqLatency)
registry.MustRegister(QueryNodeSQLatencyInQueue)
registry.MustRegister(QueryNodeSQSegmentLatency)
registry.MustRegister(QueryNodeSQSegmentLatencyInCore)
registry.MustRegister(QueryNodeTranslateHitsLatency)
registry.MustRegister(QueryNodeReduceLatency)
registry.MustRegister(QueryNodeLoadSegmentLatency)
registry.MustRegister(QueryNodeServiceTime)
registry.MustRegister(QueryNodeNumFlowGraphs)
}
......@@ -266,45 +266,45 @@ var (
)
//RegisterRootCoord registers RootCoord metrics
func RegisterRootCoord() {
prometheus.MustRegister(RootCoordProxyLister)
func RegisterRootCoord(registry *prometheus.Registry) {
registry.Register(RootCoordProxyLister)
// for grpc
prometheus.MustRegister(RootCoordCreateCollectionCounter)
prometheus.MustRegister(RootCoordDropCollectionCounter)
prometheus.MustRegister(RootCoordHasCollectionCounter)
prometheus.MustRegister(RootCoordDescribeCollectionCounter)
prometheus.MustRegister(RootCoordShowCollectionsCounter)
prometheus.MustRegister(RootCoordCreatePartitionCounter)
prometheus.MustRegister(RootCoordDropPartitionCounter)
prometheus.MustRegister(RootCoordHasPartitionCounter)
prometheus.MustRegister(RootCoordShowPartitionsCounter)
prometheus.MustRegister(RootCoordCreateIndexCounter)
prometheus.MustRegister(RootCoordDropIndexCounter)
prometheus.MustRegister(RootCoordDescribeIndexCounter)
prometheus.MustRegister(RootCoordDescribeSegmentCounter)
prometheus.MustRegister(RootCoordShowSegmentsCounter)
registry.MustRegister(RootCoordCreateCollectionCounter)
registry.MustRegister(RootCoordDropCollectionCounter)
registry.MustRegister(RootCoordHasCollectionCounter)
registry.MustRegister(RootCoordDescribeCollectionCounter)
registry.MustRegister(RootCoordShowCollectionsCounter)
registry.MustRegister(RootCoordCreatePartitionCounter)
registry.MustRegister(RootCoordDropPartitionCounter)
registry.MustRegister(RootCoordHasPartitionCounter)
registry.MustRegister(RootCoordShowPartitionsCounter)
registry.MustRegister(RootCoordCreateIndexCounter)
registry.MustRegister(RootCoordDropIndexCounter)
registry.MustRegister(RootCoordDescribeIndexCounter)
registry.MustRegister(RootCoordDescribeSegmentCounter)
registry.MustRegister(RootCoordShowSegmentsCounter)
// for time tick
prometheus.MustRegister(RootCoordInsertChannelTimeTick)
registry.MustRegister(RootCoordInsertChannelTimeTick)
//prometheus.MustRegister(PanicCounter)
prometheus.MustRegister(RootCoordSyncTimeTickLatency)
registry.MustRegister(RootCoordSyncTimeTickLatency)
// for DDL latency
prometheus.MustRegister(RootCoordDDLReadTypeLatency)
prometheus.MustRegister(RootCoordDDLWriteTypeLatency)
registry.MustRegister(RootCoordDDLReadTypeLatency)
registry.MustRegister(RootCoordDDLWriteTypeLatency)
// for allocator
prometheus.MustRegister(RootCoordIDAllocCounter)
prometheus.MustRegister(RootCoordTimestampAllocCounter)
prometheus.MustRegister(RootCoordETCDTimestampAllocCounter)
registry.MustRegister(RootCoordIDAllocCounter)
registry.MustRegister(RootCoordTimestampAllocCounter)
registry.MustRegister(RootCoordETCDTimestampAllocCounter)
// for collection
prometheus.MustRegister(RootCoordNumOfCollections)
prometheus.MustRegister(RootCoordNumOfPartitions)
prometheus.MustRegister(RootCoordNumOfSegments)
prometheus.MustRegister(RootCoordNumOfIndexedSegments)
registry.MustRegister(RootCoordNumOfCollections)
registry.MustRegister(RootCoordNumOfPartitions)
registry.MustRegister(RootCoordNumOfSegments)
registry.MustRegister(RootCoordNumOfIndexedSegments)
prometheus.MustRegister(RootCoordNumOfDMLChannel)
prometheus.MustRegister(RootCoordNumOfMsgStream)
registry.MustRegister(RootCoordNumOfDMLChannel)
registry.MustRegister(RootCoordNumOfMsgStream)
}
......@@ -31,12 +31,12 @@ import (
"time"
ot "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/milvus-io/milvus/internal/util/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/sessionutil"
......@@ -108,6 +108,14 @@ const (
sleepDuration = time.Millisecond * 200
)
var Registry *prometheus.Registry
func init() {
Registry = prometheus.NewRegistry()
Registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
Registry.MustRegister(prometheus.NewGoCollector())
}
func newMsgFactory(localMsg bool) msgstream.Factory {
if localMsg {
return msgstream.NewRmsFactory()
......@@ -141,7 +149,7 @@ func runRootCoord(ctx context.Context, localMsg bool) *grpcrootcoord.Server {
}()
wg.Wait()
metrics.RegisterRootCoord()
metrics.RegisterRootCoord(Registry)
return rc
}
......@@ -172,7 +180,7 @@ func runQueryCoord(ctx context.Context, localMsg bool) *grpcquerycoord.Server {
}()
wg.Wait()
metrics.RegisterQueryCoord()
metrics.RegisterQueryCoord(Registry)
return qs
}
......@@ -204,7 +212,7 @@ func runQueryNode(ctx context.Context, localMsg bool, alias string) *grpcqueryno
}()
wg.Wait()
metrics.RegisterQueryNode()
metrics.RegisterQueryNode(Registry)
return qn
}
......@@ -231,7 +239,7 @@ func runDataCoord(ctx context.Context, localMsg bool) *grpcdatacoordclient.Serve
}()
wg.Wait()
metrics.RegisterDataCoord()
metrics.RegisterDataCoord(Registry)
return ds
}
......@@ -263,7 +271,7 @@ func runDataNode(ctx context.Context, localMsg bool, alias string) *grpcdatanode
}()
wg.Wait()
metrics.RegisterDataNode()
metrics.RegisterDataNode(Registry)
return dn
}
......@@ -293,7 +301,7 @@ func runIndexCoord(ctx context.Context, localMsg bool) *grpcindexcoord.Server {
}()
wg.Wait()
metrics.RegisterIndexCoord()
metrics.RegisterIndexCoord(Registry)
return is
}
......@@ -329,7 +337,7 @@ func runIndexNode(ctx context.Context, localMsg bool, alias string) *grpcindexno
}()
wg.Wait()
metrics.RegisterIndexNode()
metrics.RegisterIndexNode(Registry)
return in
}
......
package etcd
import (
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/paramtable"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
"go.uber.org/zap"
)
// EtcdServer is the singleton of embedded etcd server
var (
initOnce sync.Once
closeOnce sync.Once
etcdServer *embed.Etcd
)
// GetEmbedEtcdClient returns client of embed etcd server
func GetEmbedEtcdClient() (*clientv3.Client, error) {
client := v3client.New(etcdServer.Server)
return client, nil
}
// InitEtcdServer initializes embedded etcd server singleton.
func InitEtcdServer(etcdCfg *paramtable.EtcdConfig) error {
if etcdCfg.UseEmbedEtcd {
var initError error
initOnce.Do(func() {
path := etcdCfg.ConfigPath
var cfg *embed.Config
if len(path) > 0 {
cfgFromFile, err := embed.ConfigFromFile(path)
if err != nil {
initError = err
}
cfg = cfgFromFile
} else {
cfg = embed.NewConfig()
}
cfg.Dir = etcdCfg.DataDir
cfg.LogOutputs = []string{etcdCfg.EtcdLogPath}
cfg.LogLevel = etcdCfg.EtcdLogLevel
e, err := embed.StartEtcd(cfg)
if err != nil {
initError = err
}
etcdServer = e
log.Info("finish init Etcd config", zap.String("path", path), zap.String("data", etcdCfg.DataDir))
})
return initError
}
return nil
}
// StopEtcdServer stops embedded etcd server singleton.
func StopEtcdServer() {
if etcdServer != nil {
closeOnce.Do(func() {
etcdServer.Close()
})
}
}
......@@ -19,53 +19,10 @@ package etcd
import (
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/paramtable"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
)
// EtcdServer is the singleton of embedded etcd server
var EtcdServer *embed.Etcd
// InitEtcdServer initializes embedded etcd server singleton.
func InitEtcdServer(etcdCfg *paramtable.EtcdConfig) error {
if etcdCfg.UseEmbedEtcd {
path := etcdCfg.ConfigPath
log.Info("Setting Etcd config", zap.String("path", path), zap.String("data", etcdCfg.DataDir))
var cfg *embed.Config
if len(path) > 0 {
cfgFromFile, err := embed.ConfigFromFile(path)
if err != nil {
return err
}
cfg = cfgFromFile
} else {
cfg = embed.NewConfig()
}
cfg.Dir = etcdCfg.DataDir
cfg.LogOutputs = []string{etcdCfg.EtcdLogPath}
cfg.LogLevel = etcdCfg.EtcdLogLevel
e, err := embed.StartEtcd(cfg)
if err != nil {
return err
}
EtcdServer = e
log.Info("finish init embedded etcd")
}
return nil
}
// StopEtcdServer stops embedded etcd server singleton.
func StopEtcdServer() {
if EtcdServer != nil {
EtcdServer.Close()
}
}
// GetEtcdClient returns etcd client
func GetEtcdClient(cfg *paramtable.EtcdConfig) (*clientv3.Client, error) {
if cfg.UseEmbedEtcd {
......@@ -74,12 +31,6 @@ func GetEtcdClient(cfg *paramtable.EtcdConfig) (*clientv3.Client, error) {
return GetRemoteEtcdClient(cfg.Endpoints)
}
// GetEmbedEtcdClient returns client of embed etcd server
func GetEmbedEtcdClient() (*clientv3.Client, error) {
client := v3client.New(EtcdServer.Server)
return client, nil
}
// GetRemoteEtcdClient returns client of remote etcd by given endpoints
func GetRemoteEtcdClient(endpoints []string) (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
......
......@@ -37,10 +37,6 @@ func TestEtcd(t *testing.T) {
defer os.RemoveAll(Params.EtcdCfg.DataDir)
defer StopEtcdServer()
// port is binded
err = InitEtcdServer(&Params.EtcdCfg)
assert.Error(t, err)
etcdCli, err := GetEtcdClient(&Params.EtcdCfg)
assert.NoError(t, err)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册