diff --git a/cmd/components/data_coord.go b/cmd/components/data_coord.go index 94ea2a9785d21dfb94e80582208c449ceee28887..06ca15dacb55a501b549002e25ffb015d81d134f 100644 --- a/cmd/components/data_coord.go +++ b/cmd/components/data_coord.go @@ -19,9 +19,11 @@ package components import ( "context" + "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/api/milvuspb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/typeutil" grpcdatacoordclient "github.com/milvus-io/milvus/internal/distributed/datacoord" ) @@ -60,6 +62,14 @@ func (s *DataCoord) Stop() error { } // GetComponentStates returns DataCoord's states -func (s *DataCoord) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { - return s.svr.GetComponentStates(ctx, request) +func (s *DataCoord) Health(ctx context.Context) commonpb.StateCode { + resp, err := s.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + if err != nil { + return commonpb.StateCode_Abnormal + } + return resp.State.GetStateCode() +} + +func (s *DataCoord) GetName() string { + return typeutil.DataCoordRole } diff --git a/cmd/components/data_node.go b/cmd/components/data_node.go index 33ae1baf17975f401249f99d1dce20341ded194b..b108e7861212161e155fdbca68b6ac2ba22f2d33 100644 --- a/cmd/components/data_node.go +++ b/cmd/components/data_node.go @@ -19,10 +19,12 @@ package components import ( "context" + "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/api/milvuspb" grpcdatanode "github.com/milvus-io/milvus/internal/distributed/datanode" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/typeutil" ) // DataNode implements DataNode grpc server @@ -62,6 +64,14 @@ func (d *DataNode) Stop() error { } // GetComponentStates returns DataNode's states -func (d *DataNode) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { - return d.svr.GetComponentStates(ctx, request) +func (d *DataNode) Health(ctx context.Context) commonpb.StateCode { + resp, err := d.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + if err != nil { + return commonpb.StateCode_Abnormal + } + return resp.State.GetStateCode() +} + +func (d *DataNode) GetName() string { + return typeutil.DataNodeRole } diff --git a/cmd/components/index_coord.go b/cmd/components/index_coord.go index 9d9366ccefe304f84a65cb80af982b2b19571e02..cc4f715583d63d7383fbd46a3d6d58a841ab4e02 100644 --- a/cmd/components/index_coord.go +++ b/cmd/components/index_coord.go @@ -19,11 +19,13 @@ package components import ( "context" + "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/api/milvuspb" grpcindexcoord "github.com/milvus-io/milvus/internal/distributed/indexcoord" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/typeutil" ) // IndexCoord implements IndexCoord grpc server @@ -62,6 +64,14 @@ func (s *IndexCoord) Stop() error { } // GetComponentStates returns indexnode's states -func (s *IndexCoord) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { - return s.svr.GetComponentStates(ctx, request) +func (s *IndexCoord) Health(ctx context.Context) commonpb.StateCode { + resp, err := s.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + if err != nil { + return commonpb.StateCode_Abnormal + } + return resp.State.GetStateCode() +} + +func (s *IndexCoord) GetName() string { + return typeutil.IndexCoordRole } diff --git a/cmd/components/index_node.go b/cmd/components/index_node.go index f7f3a3f7ff6c11b0ee9e7697f19852f573eef375..ca42363a461cc9caa1e4dd2d68069f0855f52b1a 100644 --- a/cmd/components/index_node.go +++ b/cmd/components/index_node.go @@ -19,10 +19,12 @@ package components import ( "context" + "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/api/milvuspb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/typeutil" grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode" ) @@ -63,6 +65,14 @@ func (n *IndexNode) Stop() error { } // GetComponentStates returns IndexNode's states -func (n *IndexNode) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { - return n.svr.GetComponentStates(ctx, request) +func (n *IndexNode) Health(ctx context.Context) commonpb.StateCode { + resp, err := n.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + if err != nil { + return commonpb.StateCode_Abnormal + } + return resp.State.GetStateCode() +} + +func (n *IndexNode) GetName() string { + return typeutil.IndexNodeRole } diff --git a/cmd/components/proxy.go b/cmd/components/proxy.go index 0bd393a8b462f9dd9569bd51a436b96d25689617..bbe9f6c8a63b57b108d8f496a2545763dc441e02 100644 --- a/cmd/components/proxy.go +++ b/cmd/components/proxy.go @@ -19,11 +19,13 @@ package components import ( "context" + "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/api/milvuspb" grpcproxy "github.com/milvus-io/milvus/internal/distributed/proxy" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" ) @@ -65,6 +67,14 @@ func (n *Proxy) Stop() error { } // GetComponentStates returns Proxy's states -func (n *Proxy) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { - return n.svr.GetComponentStates(ctx, request) +func (n *Proxy) Health(ctx context.Context) commonpb.StateCode { + resp, err := n.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + if err != nil { + return commonpb.StateCode_Abnormal + } + return resp.State.GetStateCode() +} + +func (n *Proxy) GetName() string { + return typeutil.ProxyRole } diff --git a/cmd/components/query_coord.go b/cmd/components/query_coord.go index e7e5973986684a1479af380f3122b165a2842a41..aaf282397c10cef6bf3ecd591718f238fdc891ae 100644 --- a/cmd/components/query_coord.go +++ b/cmd/components/query_coord.go @@ -19,10 +19,12 @@ package components import ( "context" + "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/api/milvuspb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/typeutil" grpcquerycoord "github.com/milvus-io/milvus/internal/distributed/querycoord" ) @@ -64,6 +66,14 @@ func (qs *QueryCoord) Stop() error { } // GetComponentStates returns QueryCoord's states -func (qs *QueryCoord) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { - return qs.svr.GetComponentStates(ctx, request) +func (qs *QueryCoord) Health(ctx context.Context) commonpb.StateCode { + resp, err := qs.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + if err != nil { + return commonpb.StateCode_Abnormal + } + return resp.State.GetStateCode() +} + +func (qs *QueryCoord) GetName() string { + return typeutil.QueryCoordRole } diff --git a/cmd/components/query_node.go b/cmd/components/query_node.go index 5845e1be510d2b377ecc64a06b78282053ce24d3..f6f0f65a4b0c6dd59ad24cf696b1bd86a19549f9 100644 --- a/cmd/components/query_node.go +++ b/cmd/components/query_node.go @@ -19,10 +19,12 @@ package components import ( "context" + "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/api/milvuspb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/typeutil" grpcquerynode "github.com/milvus-io/milvus/internal/distributed/querynode" ) @@ -65,6 +67,14 @@ func (q *QueryNode) Stop() error { } // GetComponentStates returns QueryNode's states -func (q *QueryNode) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { - return q.svr.GetComponentStates(ctx, request) +func (q *QueryNode) Health(ctx context.Context) commonpb.StateCode { + resp, err := q.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + if err != nil { + return commonpb.StateCode_Abnormal + } + return resp.State.GetStateCode() +} + +func (q *QueryNode) GetName() string { + return typeutil.QueryNodeRole } diff --git a/cmd/components/root_coord.go b/cmd/components/root_coord.go index ccb01ecc349ef6306d1104197cb9d33869a14002..ac9ea1693f523418c1c53aacc48166ea736e1697 100644 --- a/cmd/components/root_coord.go +++ b/cmd/components/root_coord.go @@ -20,10 +20,12 @@ import ( "context" "io" + "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/api/milvuspb" rc "github.com/milvus-io/milvus/internal/distributed/rootcoord" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/opentracing/opentracing-go" "go.uber.org/zap" @@ -69,6 +71,14 @@ func (rc *RootCoord) Stop() error { } // GetComponentStates returns RootCoord's states -func (rc *RootCoord) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) { - return rc.svr.GetComponentStates(ctx, request) +func (rc *RootCoord) Health(ctx context.Context) commonpb.StateCode { + resp, err := rc.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + if err != nil { + return commonpb.StateCode_Abnormal + } + return resp.State.GetStateCode() +} + +func (rc *RootCoord) GetName() string { + return typeutil.RootCoordRole } diff --git a/cmd/milvus/run.go b/cmd/milvus/run.go index 88545f1a751451e90b538fc5af70f5bbc47d994e..34856daddbf8334abd1c336e8bcb849310caa167 100644 --- a/cmd/milvus/run.go +++ b/cmd/milvus/run.go @@ -70,7 +70,6 @@ func (c *run) execute(args []string, flags *flag.FlagSet) { case typeutil.IndexNodeRole: role.EnableIndexNode = true case typeutil.StandaloneRole, typeutil.EmbeddedRole: - role.HasMultipleRoles = true role.EnableRootCoord = true role.EnableProxy = true role.EnableQueryCoord = true @@ -81,7 +80,6 @@ func (c *run) execute(args []string, flags *flag.FlagSet) { role.EnableIndexNode = true local = true case roleMixture: - role.HasMultipleRoles = true role.EnableRootCoord = c.enableRootCoord role.EnableQueryCoord = c.enableQueryCoord role.EnableDataCoord = c.enableDataCoord diff --git a/cmd/roles/healthz_handler.go b/cmd/roles/healthz_handler.go deleted file mode 100644 index b964e986c641f305171ee25071b79e1c2fc8d063..0000000000000000000000000000000000000000 --- a/cmd/roles/healthz_handler.go +++ /dev/null @@ -1,106 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package roles - -import ( - "context" - "fmt" - "net/http" - - "github.com/milvus-io/milvus/api/commonpb" - "github.com/milvus-io/milvus/api/milvuspb" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/healthz" - - "go.uber.org/zap" -) - -func unhealthyHandler(w http.ResponseWriter, r *http.Request, reason string) { - w.WriteHeader(http.StatusInternalServerError) - w.Header().Set(healthz.ContentTypeHeader, healthz.ContentTypeText) - _, err := fmt.Fprint(w, reason) - if err != nil { - log.Warn("failed to send response", - zap.Error(err)) - } -} - -func healthyHandler(w http.ResponseWriter, r *http.Request) { - var err error - - w.WriteHeader(http.StatusOK) - w.Header().Set(healthz.ContentTypeHeader, healthz.ContentTypeText) - _, err = fmt.Fprint(w, "OK") - if err != nil { - log.Warn("failed to send response", - zap.Error(err)) - } -} - -// GetComponentStatesInterface defines the interface that get states from component. -type GetComponentStatesInterface interface { - // GetComponentStates returns the states of component. - GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) -} - -type componentsHealthzHandler struct { - component GetComponentStatesInterface -} - -func (handler *componentsHealthzHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - states, err := handler.component.GetComponentStates(context.Background(), &milvuspb.GetComponentStatesRequest{}) - - if err != nil { - log.Warn("failed to get component states", zap.Error(err)) - unhealthyHandler(w, r, err.Error()) - return - } - - if states == nil { - log.Warn("failed to get component states, states is nil") - unhealthyHandler(w, r, "failed to get states") - return - } - - if states.Status == nil { - log.Warn("failed to get component states, states.Status is nil") - unhealthyHandler(w, r, "failed to get status") - return - } - - if states.Status.ErrorCode != commonpb.ErrorCode_Success { - log.Warn("failed to get component states", - zap.String("ErrorCode", states.Status.ErrorCode.String()), - zap.String("Reason", states.Status.Reason)) - unhealthyHandler(w, r, states.Status.Reason) - return - } - - if states.State == nil { - log.Warn("failed to get component states, states.State is nil") - unhealthyHandler(w, r, "failed to get state") - return - } - - if states.State.StateCode != commonpb.StateCode_Healthy { - log.Warn("component is unhealthy", zap.String("state", states.State.StateCode.String())) - unhealthyHandler(w, r, fmt.Sprintf("state: %s", states.State.StateCode.String())) - return - } - - healthyHandler(w, r) -} diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 8c691ac19be9749087aa1151b6471d7014aae355..d68eb00ac161002671b8c68cbc884930a35240c2 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -18,18 +18,16 @@ package roles import ( "context" - "fmt" - "net/http" "os" "os/signal" "strings" "sync" "syscall" - "time" - "github.com/milvus-io/milvus/api/commonpb" - "github.com/milvus-io/milvus/api/milvuspb" "github.com/milvus-io/milvus/internal/management" + rocksmqimpl "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server" + + "go.uber.org/zap" "github.com/milvus-io/milvus/cmd/components" "github.com/milvus-io/milvus/internal/datacoord" @@ -37,22 +35,19 @@ import ( "github.com/milvus-io/milvus/internal/indexcoord" "github.com/milvus-io/milvus/internal/indexnode" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/management/healthz" "github.com/milvus-io/milvus/internal/metrics" - rocksmqimpl "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server" "github.com/milvus-io/milvus/internal/proxy" querycoord "github.com/milvus-io/milvus/internal/querycoordv2" "github.com/milvus-io/milvus/internal/querynode" "github.com/milvus-io/milvus/internal/rootcoord" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/etcd" - "github.com/milvus-io/milvus/internal/util/healthz" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/trace" "github.com/milvus-io/milvus/internal/util/typeutil" - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" ) var Params paramtable.ComponentParam @@ -71,9 +66,50 @@ func stopRocksmq() { rocksmqimpl.CloseRocksMQ() } +type component interface { + healthz.Indicator + Run() error + Stop() error +} + +func runComponent[T component](ctx context.Context, + localMsg bool, + params *paramtable.ComponentParam, + extraInit func(), + creator func(context.Context, dependency.Factory) (T, error), + metricRegister func(*prometheus.Registry)) T { + var role T + var wg sync.WaitGroup + + wg.Add(1) + go func() { + params.InitOnce() + if extraInit != nil { + extraInit() + } + factory := dependency.NewFactory(localMsg) + var err error + role, err = creator(ctx, factory) + if localMsg { + params.SetLogConfig(typeutil.StandaloneRole) + } else { + params.SetLogConfig(role.GetName()) + } + if err != nil { + panic(err) + } + wg.Done() + _ = role.Run() + }() + wg.Wait() + + healthz.Register(role) + metricRegister(Registry) + return role +} + // MilvusRoles decides which components are brought up with Milvus. type MilvusRoles struct { - HasMultipleRoles bool EnableRootCoord bool `env:"ENABLE_ROOT_COORD"` EnableProxy bool `env:"ENABLE_PROXY"` EnableQueryCoord bool `env:"ENABLE_QUERY_COORD"` @@ -100,258 +136,55 @@ func (mr *MilvusRoles) printLDPreLoad() { } func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *components.RootCoord { - var rc *components.RootCoord - var wg sync.WaitGroup - - wg.Add(1) - go func() { - rootcoord.Params.InitOnce() - if localMsg { - rootcoord.Params.SetLogConfig(typeutil.StandaloneRole) - } else { - rootcoord.Params.SetLogConfig(typeutil.RootCoordRole) - } - factory := dependency.NewFactory(localMsg) - var err error - rc, err = components.NewRootCoord(ctx, factory) - if err != nil { - panic(err) - } - if !mr.HasMultipleRoles { - http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: rc}) - } - wg.Done() - _ = rc.Run() - }() - wg.Wait() - - metrics.RegisterRootCoord(Registry) - return rc + return runComponent(ctx, localMsg, &rootcoord.Params, nil, components.NewRootCoord, metrics.RegisterRootCoord) } func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string) *components.Proxy { - var pn *components.Proxy - var wg sync.WaitGroup - - wg.Add(1) - go func() { - proxy.Params.ProxyCfg.InitAlias(alias) - proxy.Params.InitOnce() - if localMsg { - proxy.Params.SetLogConfig(typeutil.StandaloneRole) - } else { - proxy.Params.SetLogConfig(typeutil.ProxyRole) - } - - factory := dependency.NewFactory(localMsg) - var err error - pn, err = components.NewProxy(ctx, factory) - if err != nil { - panic(err) - } - if !mr.HasMultipleRoles { - http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: pn}) - } - wg.Done() - _ = pn.Run() - }() - wg.Wait() - - metrics.RegisterProxy(Registry) - return pn + return runComponent(ctx, localMsg, &proxy.Params, + func() { + proxy.Params.ProxyCfg.InitAlias(alias) + }, + components.NewProxy, + metrics.RegisterProxy) } func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *components.QueryCoord { - var qs *components.QueryCoord - var wg sync.WaitGroup - - wg.Add(1) - go func() { - querycoord.Params.InitOnce() - if localMsg { - querycoord.Params.SetLogConfig(typeutil.StandaloneRole) - } else { - querycoord.Params.SetLogConfig(typeutil.QueryCoordRole) - } - - factory := dependency.NewFactory(localMsg) - var err error - qs, err = components.NewQueryCoord(ctx, factory) - if err != nil { - panic(err) - } - if !mr.HasMultipleRoles { - http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: qs}) - } - wg.Done() - _ = qs.Run() - }() - wg.Wait() - - metrics.RegisterQueryCoord(Registry) - return qs + return runComponent(ctx, localMsg, querycoord.Params, nil, components.NewQueryCoord, metrics.RegisterQueryCoord) } func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias string) *components.QueryNode { - var qn *components.QueryNode - var wg sync.WaitGroup - - wg.Add(1) - go func() { - querynode.Params.QueryNodeCfg.InitAlias(alias) - querynode.Params.InitOnce() - if localMsg { - querynode.Params.SetLogConfig(typeutil.StandaloneRole) - } else { - querynode.Params.SetLogConfig(typeutil.QueryNodeRole) - } - - factory := dependency.NewFactory(localMsg) - var err error - qn, err = components.NewQueryNode(ctx, factory) - if err != nil { - panic(err) - } - if !mr.HasMultipleRoles { - http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: qn}) - } - wg.Done() - _ = qn.Run() - }() - wg.Wait() - - metrics.RegisterQueryNode(Registry) - return qn + return runComponent(ctx, localMsg, &querynode.Params, + func() { + querynode.Params.QueryNodeCfg.InitAlias(alias) + }, + components.NewQueryNode, + metrics.RegisterQueryNode) } func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *components.DataCoord { - var ds *components.DataCoord - var wg sync.WaitGroup - - wg.Add(1) - go func() { - datacoord.Params.InitOnce() - if localMsg { - datacoord.Params.SetLogConfig(typeutil.StandaloneRole) - } else { - datacoord.Params.SetLogConfig(typeutil.DataCoordRole) - } - - factory := dependency.NewFactory(localMsg) - - dctx := log.WithModule(ctx, "DataCoord") - var err error - ds, err = components.NewDataCoord(dctx, factory) - if err != nil { - panic(err) - } - if !mr.HasMultipleRoles { - http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: ds}) - } - wg.Done() - _ = ds.Run() - }() - wg.Wait() - - metrics.RegisterDataCoord(Registry) - return ds + return runComponent(ctx, localMsg, &datacoord.Params, nil, components.NewDataCoord, metrics.RegisterDataCoord) } func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias string) *components.DataNode { - var dn *components.DataNode - var wg sync.WaitGroup - - wg.Add(1) - go func() { - datanode.Params.DataNodeCfg.InitAlias(alias) - datanode.Params.InitOnce() - if localMsg { - datanode.Params.SetLogConfig(typeutil.StandaloneRole) - } else { - datanode.Params.SetLogConfig(typeutil.DataNodeRole) - } - - factory := dependency.NewFactory(localMsg) - var err error - dn, err = components.NewDataNode(ctx, factory) - if err != nil { - panic(err) - } - if !mr.HasMultipleRoles { - http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: dn}) - } - wg.Done() - _ = dn.Run() - }() - wg.Wait() - - metrics.RegisterDataNode(Registry) - return dn + return runComponent(ctx, localMsg, &datanode.Params, + func() { + datanode.Params.DataNodeCfg.InitAlias(alias) + }, + components.NewDataNode, + metrics.RegisterDataNode) } func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *components.IndexCoord { - var is *components.IndexCoord - var wg sync.WaitGroup - - wg.Add(1) - go func() { - indexcoord.Params.InitOnce() - if localMsg { - indexcoord.Params.SetLogConfig(typeutil.StandaloneRole) - } else { - indexcoord.Params.SetLogConfig(typeutil.IndexCoordRole) - } - - factory := dependency.NewFactory(localMsg) - - var err error - is, err = components.NewIndexCoord(ctx, factory) - if err != nil { - panic(err) - } - if !mr.HasMultipleRoles { - http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: is}) - } - wg.Done() - _ = is.Run() - }() - wg.Wait() - - metrics.RegisterIndexCoord(Registry) - return is + return runComponent(ctx, localMsg, &indexcoord.Params, nil, components.NewIndexCoord, metrics.RegisterIndexCoord) } func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, alias string) *components.IndexNode { - var in *components.IndexNode - var wg sync.WaitGroup - - wg.Add(1) - go func() { - indexnode.Params.IndexNodeCfg.InitAlias(alias) - indexnode.Params.InitOnce() - if localMsg { - indexnode.Params.SetLogConfig(typeutil.StandaloneRole) - } else { - indexnode.Params.SetLogConfig(typeutil.IndexNodeRole) - } - - factory := dependency.NewFactory(localMsg) - - var err error - in, err = components.NewIndexNode(ctx, factory) - if err != nil { - panic(err) - } - if !mr.HasMultipleRoles { - http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: in}) - } - wg.Done() - _ = in.Run() - }() - wg.Wait() - - metrics.RegisterIndexNode(Registry) - return in + return runComponent(ctx, localMsg, &indexnode.Params, + func() { + indexnode.Params.IndexNodeCfg.InitAlias(alias) + }, + components.NewIndexNode, + metrics.RegisterIndexNode) } // Run Milvus components. @@ -462,61 +295,6 @@ func (mr *MilvusRoles) Run(local bool, alias string) { } } - if mr.HasMultipleRoles { - multiRoleHealthzHandler := func(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - req := &milvuspb.GetComponentStatesRequest{} - validateResp := func(resp *milvuspb.ComponentStates, err error) bool { - return err == nil && resp != nil && resp.GetState().GetStateCode() == commonpb.StateCode_Healthy - } - if mr.EnableRootCoord { - if rc == nil || !validateResp(rc.GetComponentStates(ctx, req)) { - rootCoordNotServingHandler(w, r) - return - } - } - - if mr.EnableQueryCoord { - if qs == nil || !validateResp(qs.GetComponentStates(ctx, req)) { - queryCoordNotServingHandler(w, r) - return - } - } - - if mr.EnableDataCoord { - if ds == nil || !validateResp(ds.GetComponentStates(ctx, req)) { - dataCoordNotServingHandler(w, r) - return - } - } - if mr.EnableIndexCoord { - if is == nil || !validateResp(is.GetComponentStates(ctx, req)) { - indexCoordNotServingHandler(w, r) - return - } - } - if mr.EnableProxy { - if pn == nil || !validateResp(pn.GetComponentStates(ctx, req)) { - proxyNotServingHandler(w, r) - return - } - } - // TODO(dragondriver): need to check node state? - - w.WriteHeader(http.StatusOK) - w.Header().Set(healthz.ContentTypeHeader, healthz.ContentTypeText) - _, err := fmt.Fprint(w, "OK") - if err != nil { - log.Warn("Failed to send response", - zap.Error(err)) - } - - // TODO(dragondriver): handle component states - } - http.HandleFunc(healthz.HealthzRouterPath, multiRoleHealthzHandler) - } - metrics.Register(Registry) management.ServeHTTP() sc := make(chan os.Signal, 1) diff --git a/go.mod b/go.mod index d59c6d85801c6810039ee7d72c0f871d65fb9059..4bc99c248b8d1a53dd3812827eaed4c6f9911b90 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/BurntSushi/toml v1.0.0 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect - github.com/StackExchange/wmi v1.2.1 // indirect github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e github.com/antonmedv/expr v1.8.9 github.com/apache/arrow/go/v8 v8.0.0-20220322092137-778b1772fd20 @@ -34,12 +33,11 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 github.com/sbinet/npyio v0.6.0 - github.com/shirou/gopsutil v3.21.8+incompatible github.com/spaolacci/murmur3 v1.1.0 github.com/spf13/cast v1.3.1 github.com/spf13/viper v1.8.1 github.com/streamnative/pulsarctl v0.5.0 - github.com/stretchr/testify v1.7.4 + github.com/stretchr/testify v1.8.0 github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c github.com/uber/jaeger-client-go v2.25.0+incompatible go.etcd.io/etcd/api/v3 v3.5.0 @@ -69,6 +67,13 @@ require github.com/sandertv/go-formula/v2 v2.0.0-alpha.7 require github.com/quasilyte/go-ruleguard/dsl v0.3.21 // indirect +require ( + github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/shirou/gopsutil/v3 v3.22.9 + github.com/yusufpapurcu/wmi v1.2.2 // indirect +) + require ( github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/AthenZ/athenz v1.10.15 // indirect @@ -98,7 +103,7 @@ require ( github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/ghodss/yaml v1.0.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect - github.com/go-ole/go-ole v1.2.5 // indirect + github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-playground/locales v0.13.0 // indirect github.com/go-playground/universal-translator v0.17.0 // indirect github.com/go-playground/validator/v10 v10.4.1 // indirect @@ -155,8 +160,8 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.4.0 // indirect github.com/subosito/gotenv v1.2.0 // indirect - github.com/tklauser/go-sysconf v0.3.9 // indirect - github.com/tklauser/numcpus v0.3.0 // indirect + github.com/tklauser/go-sysconf v0.3.10 // indirect + github.com/tklauser/numcpus v0.4.0 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect github.com/uber/jaeger-lib v2.4.0+incompatible // indirect github.com/ugorji/go/codec v1.1.7 // indirect @@ -181,7 +186,7 @@ require ( golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57 // indirect golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 // indirect - golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect + golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect @@ -201,10 +206,12 @@ replace ( github.com/bketelsen/crypt => github.com/bketelsen/crypt v0.0.4 // Fix security alert for core-os/etcd github.com/dgrijalva/jwt-go => github.com/golang-jwt/jwt v3.2.2+incompatible // Fix security alert for jwt-go 3.2.0 github.com/go-kit/kit => github.com/go-kit/kit v0.1.0 +) + +replace ( + // If you want to use the hook interceptor, the following code should be commented out + // and you should modify the api version to be the same as the `so` project. + github.com/milvus-io/milvus/api => ./api github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1 github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect ) - -// If you want to use the hook interceptor, the following code should be commented out -// and you should modify the api version to be the same as the `so` project. -replace github.com/milvus-io/milvus/api => ./api diff --git a/go.sum b/go.sum index a2d774d42d169a1bf131028a1d4c22eb95519297..e44420b62d20091f1173a41c0123a3513497af27 100644 --- a/go.sum +++ b/go.sum @@ -64,8 +64,6 @@ github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0 github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= -github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA= github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ= github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc= @@ -239,8 +237,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= -github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY= -github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= @@ -324,8 +322,9 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -473,6 +472,8 @@ github.com/linkedin/goavro/v2 v2.11.1 h1:4cuAtbDfqkKnBXp9E+tRkIJGa6W6iAjwonwt8O1 github.com/linkedin/goavro/v2 v2.11.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA= github.com/lucasb-eyer/go-colorful v1.0.2/go.mod h1:0MS4r+7BZKSJ5mw4/S5MPN+qHFF1fYclkSPilDOKW0s= github.com/lucasb-eyer/go-colorful v1.0.3/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls= github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= @@ -567,6 +568,8 @@ github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= @@ -617,8 +620,8 @@ github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H github.com/sbinet/npyio v0.6.0 h1:IyqqQIzRjDym9xnIXsToCKei/qCzxDP+Y74KoMlMgXo= github.com/sbinet/npyio v0.6.0/go.mod h1:/q3BNr6dJOy+t6h7RZchTJ0nwRJO52mivaem29WE1j8= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/shirou/gopsutil v3.21.8+incompatible h1:sh0foI8tMRlCidUJR+KzqWYWxrkuuPIGiO6Vp+KXdCU= -github.com/shirou/gopsutil v3.21.8+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= +github.com/shirou/gopsutil/v3 v3.22.9 h1:yibtJhIVEMcdw+tCTbOPiF1VcsuDeTE4utJ8Dm4c5eA= +github.com/shirou/gopsutil/v3 v3.22.9/go.mod h1:bBYl1kjgEJpWpxeHmLI+dVHWtyAwfcmSBLDsp2TNT8A= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -668,15 +671,15 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.4 h1:wZRexSlwd7ZXfKINDLsO4r7WBt3gTKONc6K/VesHvHM= -github.com/stretchr/testify v1.7.4/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M= -github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo= -github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= -github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2biQ= -github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8= +github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw= +github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk= +github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o= +github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4dN7GR16kFc5fp3d1RIYzJW5onx8Ybykw2YQFA= github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -698,6 +701,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= +github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/zeebo/xxh3 v1.0.1 h1:FMSRIbkrLikb/0hZxmltpg84VkqDAT5M8ufXynuhXsI= github.com/zeebo/xxh3 v1.0.1/go.mod h1:8VHV24/3AZLn3b6Mlp/KuC33LWH687Wq6EnziEB+rsA= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -953,6 +958,7 @@ golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -967,14 +973,13 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc= -golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= @@ -1188,8 +1193,6 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= -google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= -google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183/go.mod h1:FvqrFXt+jCsyQibeRv4xxEJBL5iG2DDW5aeJwzDiq4A= diff --git a/internal/util/healthz/content_type.go b/internal/management/healthz/content_type.go similarity index 88% rename from internal/util/healthz/content_type.go rename to internal/management/healthz/content_type.go index c596c253754cae547e6de2f418d933e0cabd0f5b..318ce471ea94fcfeb438ec6eddc109075114f60a 100644 --- a/internal/util/healthz/content_type.go +++ b/internal/management/healthz/content_type.go @@ -21,4 +21,6 @@ const ( ContentTypeHeader = "Content-Type" // ContentTypeText is the health check request type text. ContentTypeText = "text/plain" + // ContentTypeJSON is another health check request type text, which response contains more info. + ContentTypeJSON = "application/json" ) diff --git a/cmd/roles/healthz_error_handler.go b/internal/management/healthz/healthz_error_handler.go similarity index 93% rename from cmd/roles/healthz_error_handler.go rename to internal/management/healthz/healthz_error_handler.go index 517e98b17ef2e61b7299daa31c78a5b765f4990f..c65451eb5d82ca38c8678d2f56e1bd69ee2fb68d 100644 --- a/cmd/roles/healthz_error_handler.go +++ b/internal/management/healthz/healthz_error_handler.go @@ -14,21 +14,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -package roles +package healthz import ( "fmt" "net/http" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/util/healthz" "github.com/milvus-io/milvus/internal/util/milvuserrors" "go.uber.org/zap" ) func componentsNotServingHandler(w http.ResponseWriter, r *http.Request, msg string) { w.WriteHeader(http.StatusInternalServerError) - w.Header().Set(healthz.ContentTypeHeader, healthz.ContentTypeText) + w.Header().Set(ContentTypeHeader, ContentTypeText) _, err := fmt.Fprint(w, msg) if err != nil { log.Warn("failed to send response", diff --git a/internal/management/healthz/healthz_handler.go b/internal/management/healthz/healthz_handler.go new file mode 100644 index 0000000000000000000000000000000000000000..bc5474a909803126ec7ab8f9030f4db7562ce4ee --- /dev/null +++ b/internal/management/healthz/healthz_handler.go @@ -0,0 +1,113 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package healthz + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/milvus-io/milvus/api/commonpb" + "github.com/milvus-io/milvus/api/milvuspb" + "github.com/milvus-io/milvus/internal/log" + "go.uber.org/zap" +) + +// GetComponentStatesInterface defines the interface that get states from component. +type GetComponentStatesInterface interface { + // GetComponentStates returns the states of component. + GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) +} + +type Indicator interface { + GetName() string + Health(ctx context.Context) commonpb.StateCode +} + +type IndicatorState struct { + Name string `json:"name"` + Code commonpb.StateCode `json:"code"` +} + +type HealthResponse struct { + State string `json:"state"` + Detail []*IndicatorState `json:"detail"` +} + +type HealthHandler struct { + indicators []Indicator +} + +var _ http.Handler = (*HealthHandler)(nil) +var defaultHandler = HealthHandler{} + +func Register(indicator Indicator) { + defaultHandler.indicators = append(defaultHandler.indicators, indicator) +} + +func Handler() *HealthHandler { + return &defaultHandler +} + +func (handler *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + resp := &HealthResponse{ + State: "OK", + } + ctx := context.Background() + for _, in := range handler.indicators { + code := in.Health(ctx) + resp.Detail = append(resp.Detail, &IndicatorState{ + Name: in.GetName(), + Code: code, + }) + if code != commonpb.StateCode_Healthy { + resp.State = fmt.Sprintf("component %s state is %s", in.GetName(), code.String()) + } + } + + if resp.State == "OK" { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusInternalServerError) + } + // for compatibility + if r.Header.Get(ContentTypeHeader) != ContentTypeJSON { + writeText(w, r, resp.State) + return + } + + writeJSON(w, r, resp) +} + +func writeJSON(w http.ResponseWriter, r *http.Request, resp *HealthResponse) { + w.Header().Set(ContentTypeHeader, ContentTypeJSON) + bs, err := json.Marshal(resp) + if err != nil { + log.Warn("faild to send response", zap.Error(err)) + } + w.Write(bs) +} + +func writeText(w http.ResponseWriter, r *http.Request, reason string) { + w.Header().Set(ContentTypeHeader, ContentTypeText) + _, err := fmt.Fprint(w, reason) + if err != nil { + log.Warn("failed to send response", + zap.Error(err)) + } +} diff --git a/internal/util/healthz/router.go b/internal/management/router.go similarity index 87% rename from internal/util/healthz/router.go rename to internal/management/router.go index 59f4f9ceb9a845141e8013eb5fc853d512c9797e..7036cde0fb3f50cacb94dd6a97dcbdd766f0d8c4 100644 --- a/internal/util/healthz/router.go +++ b/internal/management/router.go @@ -14,7 +14,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package healthz +package management // HealthzRouterPath is default path for check health state. const HealthzRouterPath = "/healthz" + +// LogLevelRouterPath is path for Get and Update log level at runtime. +const LogLevelRouterPath = "/log/level" diff --git a/internal/management/server.go b/internal/management/server.go index b6a522c60f850a854f9e40ae5b316238caab695c..f4a7166e6ace75f896bd6637a524888b0f6e89c9 100644 --- a/internal/management/server.go +++ b/internal/management/server.go @@ -23,6 +23,7 @@ import ( "strconv" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/management/healthz" "go.uber.org/zap" ) @@ -39,11 +40,15 @@ type HTTPHandler struct { func registerDefaults() { Register(&HTTPHandler{ - Path: "/log/level", + Path: LogLevelRouterPath, HandlerFunc: func(w http.ResponseWriter, req *http.Request) { log.Level().ServeHTTP(w, req) }, }) + Register(&HTTPHandler{ + Path: HealthzRouterPath, + Handler: healthz.Handler(), + }) } func Register(h *HTTPHandler) { diff --git a/internal/management/server_test.go b/internal/management/server_test.go index 5aaf19e0b2d7e18ba6cadbbc4a9806ad38b05555..449c84d4e346cc21766014e703ffff84313730b7 100644 --- a/internal/management/server_test.go +++ b/internal/management/server_test.go @@ -18,14 +18,18 @@ package management import ( "bytes" + "context" "encoding/json" "io/ioutil" "net/http" "net/http/httptest" "testing" + "github.com/milvus-io/milvus/api/commonpb" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/management/healthz" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" "go.uber.org/zap" ) @@ -36,34 +40,45 @@ func TestGetHTTPAddr(t *testing.T) { assert.Equal(t, getHTTPAddr(), ":"+testPort) } -func TestDefaultLogHandler(t *testing.T) { - httpServer := httptest.NewServer(nil) - defer httpServer.Close() +type HTTPServerTestSuite struct { + suite.Suite + server *httptest.Server +} + +func (suite *HTTPServerTestSuite) SetupSuite() { + suite.server = httptest.NewServer(nil) registerDefaults() +} + +func (suite *HTTPServerTestSuite) TearDownSuite() { + defer suite.server.Close() +} + +func (suite *HTTPServerTestSuite) TestDefaultLogHandler() { log.SetLevel(zap.DebugLevel) - assert.Equal(t, zap.DebugLevel, log.GetLevel()) + suite.Equal(zap.DebugLevel, log.GetLevel()) // replace global logger, log change will not be affected. conf := &log.Config{Level: "info", File: log.FileLogConfig{}, DisableTimestamp: true} logger, p, _ := log.InitLogger(conf) log.ReplaceGlobals(logger, p) - assert.Equal(t, zap.InfoLevel, log.GetLevel()) + suite.Equal(zap.InfoLevel, log.GetLevel()) // change log level through http - payload, err := json.Marshal(map[string]interface{}{"level": "error"}) + payload, err := json.Marshal(map[string]any{"level": "error"}) if err != nil { log.Fatal(err.Error()) } - url := httpServer.URL + "/log/level" + url := suite.server.URL + "/log/level" req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(payload)) req.Header.Set("Content-Type", "application/json") if err != nil { log.Fatal(err.Error()) } - client := httpServer.Client() + client := suite.server.Client() resp, err := client.Do(req) if err != nil { log.Fatal(err.Error()) @@ -74,6 +89,54 @@ func TestDefaultLogHandler(t *testing.T) { if err != nil { log.Fatal(err.Error()) } - assert.Equal(t, "{\"level\":\"error\"}\n", string(body)) - assert.Equal(t, zap.ErrorLevel, log.GetLevel()) + suite.Equal("{\"level\":\"error\"}\n", string(body)) + suite.Equal(zap.ErrorLevel, log.GetLevel()) +} + +func (suite *HTTPServerTestSuite) TestHealthzHandler() { + url := suite.server.URL + "/healthz" + client := suite.server.Client() + + healthz.Register(&MockIndicator{"m1", commonpb.StateCode_Healthy}) + + req, _ := http.NewRequest(http.MethodGet, url, nil) + resp, err := client.Do(req) + suite.Nil(err) + defer resp.Body.Close() + body, _ := ioutil.ReadAll(resp.Body) + suite.Equal("OK", string(body)) + + req, _ = http.NewRequest(http.MethodGet, url, nil) + req.Header.Set("Content-Type", "application/json") + resp, err = client.Do(req) + suite.Nil(err) + defer resp.Body.Close() + body, _ = ioutil.ReadAll(resp.Body) + suite.Equal("{\"state\":\"OK\",\"detail\":[{\"name\":\"m1\",\"code\":1}]}", string(body)) + + healthz.Register(&MockIndicator{"m2", commonpb.StateCode_Abnormal}) + req, _ = http.NewRequest(http.MethodGet, url, nil) + req.Header.Set("Content-Type", "application/json") + resp, err = client.Do(req) + suite.Nil(err) + defer resp.Body.Close() + body, _ = ioutil.ReadAll(resp.Body) + suite.Equal("{\"state\":\"component m2 state is Abnormal\",\"detail\":[{\"name\":\"m1\",\"code\":1},{\"name\":\"m2\",\"code\":2}]}", string(body)) +} + +func TestHTTPServerSuite(t *testing.T) { + suite.Run(t, new(HTTPServerTestSuite)) +} + +type MockIndicator struct { + name string + code commonpb.StateCode +} + +func (m *MockIndicator) Health(ctx context.Context) commonpb.StateCode { + return m.code +} + +func (m *MockIndicator) GetName() string { + return m.name } diff --git a/internal/util/metricsinfo/hardware_info.go b/internal/util/metricsinfo/hardware_info.go index e1822865d9c11603128c440844041deb4c8356d3..3421422ccb6df1bdce1f4983a71c32baadaa3ca4 100644 --- a/internal/util/metricsinfo/hardware_info.go +++ b/internal/util/metricsinfo/hardware_info.go @@ -14,8 +14,8 @@ package metricsinfo import ( "sync" - "github.com/shirou/gopsutil/cpu" - "github.com/shirou/gopsutil/mem" + "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/mem" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" diff --git a/internal/util/paramtable/component_param.go b/internal/util/paramtable/component_param.go index d1d6901150d58be5a8248fa3113f32edbe75ff28..a32910e0077fd52c2d17a096a05d19fcb0020189 100644 --- a/internal/util/paramtable/component_param.go +++ b/internal/util/paramtable/component_param.go @@ -21,7 +21,7 @@ import ( "sync/atomic" "time" - "github.com/shirou/gopsutil/disk" + "github.com/shirou/gopsutil/v3/disk" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log"