diff --git a/cmd/components/data_coord.go b/cmd/components/data_coord.go index bf9284ecf87b0ccb1ba6259643269a3e562e73bb..be6577d1af725d694d01af7a985fcd3f03a22cdf 100644 --- a/cmd/components/data_coord.go +++ b/cmd/components/data_coord.go @@ -14,6 +14,8 @@ package components import ( "context" + "github.com/milvus-io/milvus/internal/proto/internalpb" + grpcdatacoordclient "github.com/milvus-io/milvus/internal/distributed/datacoord" "github.com/milvus-io/milvus/internal/msgstream" ) @@ -51,3 +53,7 @@ func (s *DataCoord) Stop() error { } return nil } + +func (s *DataCoord) GetComponentStates(ctx context.Context, request *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) { + return s.svr.GetComponentStates(ctx, request) +} diff --git a/cmd/components/data_node.go b/cmd/components/data_node.go index b36bba67adf9ae39dd70c719dda466bd9cc905ae..72c8fed270f61a50abe4df9e95d1249579c96293 100644 --- a/cmd/components/data_node.go +++ b/cmd/components/data_node.go @@ -14,6 +14,8 @@ package components import ( "context" + "github.com/milvus-io/milvus/internal/proto/internalpb" + grpcdatanode "github.com/milvus-io/milvus/internal/distributed/datanode" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/msgstream" @@ -53,3 +55,7 @@ func (d *DataNode) Stop() error { } return nil } + +func (d *DataNode) GetComponentStates(ctx context.Context, request *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) { + return d.svr.GetComponentStates(ctx, request) +} diff --git a/cmd/components/index_coord.go b/cmd/components/index_coord.go index dfd4f5584f874e20b3a0f19796b383274b0a5799..d49c66a42d11876cbfdc2a749efb09a9121c5d80 100644 --- a/cmd/components/index_coord.go +++ b/cmd/components/index_coord.go @@ -14,6 +14,8 @@ package components import ( "context" + "github.com/milvus-io/milvus/internal/proto/internalpb" + grpcindexcoord "github.com/milvus-io/milvus/internal/distributed/indexcoord" ) @@ -49,3 +51,7 @@ func (s *IndexCoord) Stop() error { } return nil } + +func (s *IndexCoord) GetComponentStates(ctx context.Context, request *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) { + return s.svr.GetComponentStates(ctx, request) +} diff --git a/cmd/components/index_node.go b/cmd/components/index_node.go index 8cdd8a07c5b4bcb633e02629944dbb2e4d19bf37..701969d27bb1ef6531fe94382a7d36894ce85071 100644 --- a/cmd/components/index_node.go +++ b/cmd/components/index_node.go @@ -14,6 +14,8 @@ package components import ( "context" + "github.com/milvus-io/milvus/internal/proto/internalpb" + grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode" ) @@ -49,3 +51,7 @@ func (n *IndexNode) Stop() error { } return nil } + +func (n *IndexNode) GetComponentStates(ctx context.Context, request *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) { + return n.svr.GetComponentStates(ctx, request) +} diff --git a/cmd/components/proxy.go b/cmd/components/proxy.go index d2aa8ec4ed36ba8af0f50c00e1d3dccbe828159c..e51810f0bce14cb55489423d2e471fb11ee90b01 100644 --- a/cmd/components/proxy.go +++ b/cmd/components/proxy.go @@ -14,6 +14,8 @@ package components import ( "context" + "github.com/milvus-io/milvus/internal/proto/internalpb" + grpcproxy "github.com/milvus-io/milvus/internal/distributed/proxy" "github.com/milvus-io/milvus/internal/msgstream" ) @@ -50,3 +52,7 @@ func (n *Proxy) Stop() error { } return nil } + +func (n *Proxy) GetComponentStates(ctx context.Context, request *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) { + return n.svr.GetComponentStates(ctx, request) +} diff --git a/cmd/components/query_coord.go b/cmd/components/query_coord.go index 41f2110d08207387fc048643e0a7b9b83ea85229..b87de9d53e6cf8d895454eade6831efdfc7c0091 100644 --- a/cmd/components/query_coord.go +++ b/cmd/components/query_coord.go @@ -14,6 +14,8 @@ package components import ( "context" + "github.com/milvus-io/milvus/internal/proto/internalpb" + grpcquerycoord "github.com/milvus-io/milvus/internal/distributed/querycoord" "github.com/milvus-io/milvus/internal/msgstream" ) @@ -51,3 +53,7 @@ func (qs *QueryCoord) Stop() error { } return nil } + +func (qs *QueryCoord) GetComponentStates(ctx context.Context, request *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) { + return qs.svr.GetComponentStates(ctx, request) +} diff --git a/cmd/components/query_node.go b/cmd/components/query_node.go index 1092b03d632c1c2893b5b4d5965d8eff68510ff8..698ac1ec57ed22df9edd41d71183153184de4d3b 100644 --- a/cmd/components/query_node.go +++ b/cmd/components/query_node.go @@ -14,6 +14,8 @@ package components import ( "context" + "github.com/milvus-io/milvus/internal/proto/internalpb" + grpcquerynode "github.com/milvus-io/milvus/internal/distributed/querynode" "github.com/milvus-io/milvus/internal/msgstream" ) @@ -52,3 +54,7 @@ func (q *QueryNode) Stop() error { } return nil } + +func (q *QueryNode) GetComponentStates(ctx context.Context, request *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) { + return q.svr.GetComponentStates(ctx, request) +} diff --git a/cmd/components/root_coord.go b/cmd/components/root_coord.go index 9c91be745a9da9e597a2693ee2d5e737b804c3ce..64e0bbf197ee0dc5fb6e9db6dd6cdef76330c6c3 100644 --- a/cmd/components/root_coord.go +++ b/cmd/components/root_coord.go @@ -15,6 +15,8 @@ import ( "context" "io" + "github.com/milvus-io/milvus/internal/proto/internalpb" + rc "github.com/milvus-io/milvus/internal/distributed/rootcoord" "github.com/milvus-io/milvus/internal/msgstream" "github.com/opentracing/opentracing-go" @@ -55,3 +57,7 @@ func (rc *RootCoord) Stop() error { } return nil } + +func (rc *RootCoord) GetComponentStates(ctx context.Context, request *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) { + return rc.svr.GetComponentStates(ctx, request) +} diff --git a/cmd/roles/healthz_error_handler.go b/cmd/roles/healthz_error_handler.go new file mode 100644 index 0000000000000000000000000000000000000000..139ae700c3c65bf8e654e227dfc9536b0daee31a --- /dev/null +++ b/cmd/roles/healthz_error_handler.go @@ -0,0 +1,48 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package roles + +import ( + "fmt" + "net/http" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/util/healthz" + "github.com/milvus-io/milvus/internal/util/milvuserrors" + "go.uber.org/zap" +) + +func componentsNotServingHandler(w http.ResponseWriter, r *http.Request, msg string) { + w.WriteHeader(http.StatusInternalServerError) + w.Header().Set(healthz.ContentTypeHeader, healthz.ContentTypeText) + _, err := fmt.Fprint(w, msg) + if err != nil { + log.Warn("failed to send response", + zap.Error(err)) + } +} + +func rootCoordNotServingHandler(w http.ResponseWriter, r *http.Request) { + componentsNotServingHandler(w, r, milvuserrors.MsgRootCoordNotServing) +} + +func queryCoordNotServingHandler(w http.ResponseWriter, r *http.Request) { + componentsNotServingHandler(w, r, milvuserrors.MsgQueryCoordNotServing) +} + +func dataCoordNotServingHandler(w http.ResponseWriter, r *http.Request) { + componentsNotServingHandler(w, r, milvuserrors.MsgDataCoordNotServing) +} + +func indexCoordNotServingHandler(w http.ResponseWriter, r *http.Request) { + componentsNotServingHandler(w, r, milvuserrors.MsgIndexCoordNotServing) +} diff --git a/cmd/roles/healthz_handler.go b/cmd/roles/healthz_handler.go new file mode 100644 index 0000000000000000000000000000000000000000..d025e5c05d867215604bb2f537c9d9db7e18b1fe --- /dev/null +++ b/cmd/roles/healthz_handler.go @@ -0,0 +1,81 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package roles + +import ( + "context" + "fmt" + "net/http" + + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/proto/commonpb" + "github.com/milvus-io/milvus/internal/util/healthz" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/proto/internalpb" +) + +func unhealthyHandler(w http.ResponseWriter, r *http.Request, reason string) { + w.WriteHeader(http.StatusInternalServerError) + w.Header().Set(healthz.ContentTypeHeader, healthz.ContentTypeText) + _, err := fmt.Fprint(w, reason) + if err != nil { + log.Warn("failed to send response", + zap.Error(err)) + } +} + +func healthyHandler(w http.ResponseWriter, r *http.Request) { + var err error + + w.WriteHeader(http.StatusOK) + w.Header().Set(healthz.ContentTypeHeader, healthz.ContentTypeText) + _, err = fmt.Fprint(w, "OK") + if err != nil { + log.Warn("failed to send response", + zap.Error(err)) + } +} + +type GetComponentStatesInterface interface { + GetComponentStates(ctx context.Context, request *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) +} + +type componentsHealthzHandler struct { + component GetComponentStatesInterface +} + +func (handler *componentsHealthzHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + states, err := handler.component.GetComponentStates(context.Background(), &internalpb.GetComponentStatesRequest{}) + + if err != nil { + unhealthyHandler(w, r, err.Error()) + return + } + + if states == nil { + unhealthyHandler(w, r, "failed to get states") + return + } + + if states.Status == nil { + unhealthyHandler(w, r, "failed to get status") + return + } + + if states.Status.ErrorCode != commonpb.ErrorCode_Success { + unhealthyHandler(w, r, states.Status.Reason) + return + } + + healthyHandler(w, r) +} diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index fe3e87c4ab0cf55601712ba7050682a6a6909977..206666c886c3bf5e779c7c8175c3fd6816f7b837 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -14,6 +14,7 @@ package roles import ( "context" "fmt" + "net/http" "os" "os/signal" "path" @@ -21,6 +22,9 @@ import ( "sync" "syscall" + "github.com/milvus-io/milvus/internal/util/healthz" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/cmd/components" @@ -97,6 +101,9 @@ func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *compone } wg.Done() _ = rc.Run() + if !localMsg { + http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: rc}) + } }() wg.Wait() @@ -126,6 +133,9 @@ func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string } wg.Done() _ = pn.Run() + if !localMsg { + http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: pn}) + } }() wg.Wait() @@ -154,6 +164,9 @@ func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *compon } wg.Done() _ = qs.Run() + if !localMsg { + http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: qs}) + } }() wg.Wait() @@ -183,6 +196,9 @@ func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias st } wg.Done() _ = qn.Run() + if !localMsg { + http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: qn}) + } }() wg.Wait() @@ -211,6 +227,9 @@ func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *compone } wg.Done() _ = ds.Run() + if !localMsg { + http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: ds}) + } }() wg.Wait() @@ -240,6 +259,9 @@ func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias str } wg.Done() _ = dn.Run() + if !localMsg { + http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: dn}) + } }() wg.Wait() @@ -267,6 +289,9 @@ func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *compon } wg.Done() _ = is.Run() + if !localMsg { + http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: is}) + } }() wg.Wait() @@ -295,6 +320,9 @@ func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, alias st } wg.Done() _ = in.Run() + if !localMsg { + http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: in}) + } }() wg.Wait() @@ -417,6 +445,39 @@ func (mr *MilvusRoles) Run(localMsg bool, alias string) { } } + if localMsg { + standaloneHealthzHandler := func(w http.ResponseWriter, r *http.Request) { + if rc == nil { + rootCoordNotServingHandler(w, r) + return + } + if qs == nil { + queryCoordNotServingHandler(w, r) + return + } + if ds == nil { + dataCoordNotServingHandler(w, r) + return + } + if is == nil { + indexCoordNotServingHandler(w, r) + return + } + // TODO(dragondriver): need to check node state? + + w.WriteHeader(http.StatusOK) + w.Header().Set(healthz.ContentTypeHeader, healthz.ContentTypeText) + _, err := fmt.Fprint(w, "OK") + if err != nil { + log.Warn("failed to send response", + zap.Error(err)) + } + + // TODO(dragondriver): handle component states + } + http.HandleFunc(healthz.HealthzRouterPath, standaloneHealthzHandler) + } + metrics.ServeHTTP() sc := make(chan os.Signal, 1) diff --git a/internal/util/healthz/content_type.go b/internal/util/healthz/content_type.go new file mode 100644 index 0000000000000000000000000000000000000000..5976c1a2923673ec4918726e1cd8c9b3c5e067d9 --- /dev/null +++ b/internal/util/healthz/content_type.go @@ -0,0 +1,17 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package healthz + +const ( + ContentTypeHeader = "Content-Type" + ContentTypeText = "text/plain" +) diff --git a/internal/util/healthz/router.go b/internal/util/healthz/router.go new file mode 100644 index 0000000000000000000000000000000000000000..79aa5f727869b2393b67431babbfa7d3cd6de336 --- /dev/null +++ b/internal/util/healthz/router.go @@ -0,0 +1,14 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +package healthz + +const HealthzRouterPath = "healthz" diff --git a/internal/util/milvuserrors/errors.go b/internal/util/milvuserrors/errors.go index c4c1a3e9e8149ad74578eeae325a10ac7d867d33..705260cba474e24dba0c6379eccebc3c473c9c97 100644 --- a/internal/util/milvuserrors/errors.go +++ b/internal/util/milvuserrors/errors.go @@ -16,6 +16,13 @@ import ( "fmt" ) +const ( + MsgRootCoordNotServing = "root coordinator is not serving" + MsgQueryCoordNotServing = "query coordinator is not serving" + MsgDataCoordNotServing = "data coordinator is not serving" + MsgIndexCoordNotServing = "index coordinator is not serving" +) + func MsgCollectionAlreadyExist(name string) string { return fmt.Sprintf("Collection %s already exist", name) }