未验证 提交 89748ac6 编写于 作者: D dragondriver 提交者: GitHub

Expose metrics of data cluster (#7177)

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 7c4b15e2
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
"errors"
"fmt"
)
func msgDataCoordIsUnhealthy(coordID UniqueID) string {
return fmt.Sprintf("data coord %d is not ready", coordID)
}
func errDataCoordIsUnhealthy(coordID UniqueID) error {
return errors.New(msgDataCoordIsUnhealthy(coordID))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
"testing"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestMsgDataCoordIsUnhealthy(t *testing.T) {
nodeIDList := []typeutil.UniqueID{1, 2, 3}
for _, nodeID := range nodeIDList {
log.Info("TestMsgDataCoordIsUnhealthy", zap.String("msg", msgDataCoordIsUnhealthy(nodeID)))
}
}
func TestErrDataCoordIsUnhealthy(t *testing.T) {
nodeIDList := []typeutil.UniqueID{1, 2, 3}
for _, nodeID := range nodeIDList {
log.Info("TestErrDataCoordIsUnhealthy", zap.Error(errDataCoordIsUnhealthy(nodeID)))
}
}
......@@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"go.uber.org/zap"
)
......@@ -430,3 +431,69 @@ func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedS
Segments: ret,
}, nil
}
func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log.Debug("DataCoord.GetMetrics",
zap.Int64("node_id", Params.NodeID),
zap.String("req", req.Request))
if s.isClosed() {
log.Warn("DataCoord.GetMetrics failed",
zap.Int64("node_id", Params.NodeID),
zap.String("req", req.Request),
zap.Error(errDataCoordIsUnhealthy(Params.NodeID)))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgDataCoordIsUnhealthy(Params.NodeID),
},
Response: "",
}, nil
}
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
log.Warn("DataCoord.GetMetrics failed to parse metric type",
zap.Int64("node_id", Params.NodeID),
zap.String("req", req.Request),
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
}, nil
}
log.Debug("DataCoord.GetMetrics",
zap.String("metric_type", metricType))
if metricType == metricsinfo.SystemInfoMetrics {
metrics, err := s.getSystemInfoMetrics(ctx, req)
log.Debug("DataCoord.GetMetrics",
zap.Int64("node_id", Params.NodeID),
zap.String("req", req.Request),
zap.String("metric_type", metricType),
zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
zap.Error(err))
return metrics, err
}
log.Debug("DataCoord.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("node_id", Params.NodeID),
zap.String("req", req.Request),
zap.String("metric_type", metricType))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: metricsinfo.MsgUnimplementedMetric,
},
Response: "",
}, nil
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datacoord
import (
"context"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
)
// TODO(dragondriver): add more detail metrics
func (s *Server) getSystemInfoMetrics(
ctx context.Context,
req *milvuspb.GetMetricsRequest,
) (*milvuspb.GetMetricsResponse, error) {
clusterTopology := metricsinfo.DataClusterTopology{
Self: metricsinfo.DataCoordInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, Params.NodeID),
},
},
ConnectedNodes: make([]metricsinfo.DataNodeInfos, 0),
}
nodes := s.cluster.GetNodes()
for _, node := range nodes {
metrics, err := node.GetClient().GetMetrics(ctx, req)
if err != nil {
log.Warn("invalid metrics of query node was found",
zap.Error(err))
clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, metricsinfo.DataNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
HasError: true,
ErrorReason: err.Error(),
// Name doesn't matter here cause we can't get it when error occurs, using address as the Name?
Name: "",
},
})
continue
}
if metrics.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("invalid metrics of query node was found",
zap.Any("error_code", metrics.Status.ErrorCode),
zap.Any("error_reason", metrics.Status.Reason))
clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, metricsinfo.DataNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
HasError: true,
ErrorReason: metrics.Status.Reason,
Name: metrics.ComponentName,
},
})
continue
}
infos := metricsinfo.DataNodeInfos{}
err = metricsinfo.UnmarshalComponentInfos(metrics.Response, &infos)
if err != nil {
log.Warn("invalid metrics of query node was found",
zap.Error(err))
clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, metricsinfo.DataNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
HasError: true,
ErrorReason: err.Error(),
Name: metrics.ComponentName,
},
})
continue
}
clusterTopology.ConnectedNodes = append(clusterTopology.ConnectedNodes, infos)
}
coordTopology := metricsinfo.DataCoordTopology{
Cluster: clusterTopology,
Connections: metricsinfo.ConnTopology{
Name: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, Params.NodeID),
// TODO(dragondriver): connection info
},
}
resp, err := metricsinfo.MarshalTopology(coordTopology)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, Params.NodeID),
}, nil
}
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, Params.NodeID),
}, nil
}
......@@ -15,6 +15,9 @@ import (
"sync/atomic"
"time"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/typeutil"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/util/tsoutil"
......@@ -311,3 +314,34 @@ func (m *mockRootCoordService) SegmentFlushCompleted(ctx context.Context, in *da
func (m *mockRootCoordService) AddNewSegment(ctx context.Context, in *datapb.SegmentMsg) (*commonpb.Status, error) {
panic("not implemented") // TODO: Implement
}
func (c *mockDataNodeClient) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
// TODO(dragondriver): change the id, though it's not important in ut
nodeID := UniqueID(20210819)
nodeInfos := metricsinfo.DataNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
},
}
resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
}, nil
}
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, nodeID),
}, nil
}
......@@ -15,9 +15,17 @@ import (
"os"
"path"
"strconv"
"sync/atomic"
"testing"
"time"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
......@@ -279,6 +287,71 @@ func TestGetSegmentInfo(t *testing.T) {
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
}
func TestServer_GetMetrics(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
var err error
// server is closed
stateSave := atomic.LoadInt64(&svr.isServing)
atomic.StoreInt64(&svr.isServing, ServerStateInitializing)
resp, err := svr.GetMetrics(svr.ctx, &milvuspb.GetMetricsRequest{})
assert.Nil(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
atomic.StoreInt64(&svr.isServing, stateSave)
// failed to parse metric type
invalidRequest := "invalid request"
resp, err = svr.GetMetrics(svr.ctx, &milvuspb.GetMetricsRequest{
Request: invalidRequest,
})
assert.Nil(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// unsupported metric type
unsupportedMetricType := "unsupported"
req, err := metricsinfo.ConstructRequestByMetricType(unsupportedMetricType)
assert.Nil(t, err)
resp, err = svr.GetMetrics(svr.ctx, req)
assert.Nil(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// normal case
req, err = metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
assert.Nil(t, err)
resp, err = svr.GetMetrics(svr.ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
log.Info("TestServer_GetMetrics",
zap.String("name", resp.ComponentName),
zap.String("response", resp.Response))
}
func TestServer_getSystemInfoMetrics(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
assert.Nil(t, err)
resp, err := svr.getSystemInfoMetrics(svr.ctx, req)
assert.Nil(t, err)
log.Info("TestServer_getSystemInfoMetrics",
zap.String("name", resp.ComponentName),
zap.String("response", resp.Response))
var coordTopology metricsinfo.DataCoordTopology
err = metricsinfo.UnmarshalTopology(resp.Response, &coordTopology)
assert.Nil(t, err)
assert.Equal(t, len(svr.cluster.GetNodes()), len(coordTopology.Cluster.ConnectedNodes))
for _, nodeMetrics := range coordTopology.Cluster.ConnectedNodes {
assert.Equal(t, false, nodeMetrics.HasError)
assert.Equal(t, 0, len(nodeMetrics.ErrorReason))
_, err = metricsinfo.MarshalComponentInfos(nodeMetrics)
assert.Nil(t, err)
}
}
func TestChannel(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
......
......@@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
......@@ -348,6 +349,11 @@ func (node *DataNode) UpdateStateCode(code internalpb.StateCode) {
node.State.Store(code)
}
func (node *DataNode) isHealthy() bool {
code := node.State.Load().(internalpb.StateCode)
return code == internalpb.StateCode_Healthy
}
// WatchDmChannels create a new dataSyncService for every unique dmlVchannel name, ignore if dmlVchannel existed.
func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) {
metrics.DataNodeWatchDmChannelsCounter.WithLabelValues(MetricRequestsTotal).Inc()
......@@ -569,3 +575,70 @@ func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
Value: "",
}, nil
}
// TODO(dragondriver): cache the Metrics and set a retention to the cache
func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log.Debug("DataNode.GetMetrics",
zap.Int64("node_id", Params.NodeID),
zap.String("req", req.Request))
if !node.isHealthy() {
log.Warn("DataNode.GetMetrics failed",
zap.Int64("node_id", Params.NodeID),
zap.String("req", req.Request),
zap.Error(errDataNodeIsUnhealthy(Params.NodeID)))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgDataNodeIsUnhealthy(Params.NodeID),
},
Response: "",
}, nil
}
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
log.Warn("DataNode.GetMetrics failed to parse metric type",
zap.Int64("node_id", Params.NodeID),
zap.String("req", req.Request),
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
}, nil
}
log.Debug("DataNode.GetMetrics",
zap.String("metric_type", metricType))
if metricType == metricsinfo.SystemInfoMetrics {
systemInfoMetrics, err := node.getSystemInfoMetrics(ctx, req)
log.Debug("DataNode.GetMetrics",
zap.Int64("node_id", Params.NodeID),
zap.String("req", req.Request),
zap.String("metric_type", metricType),
zap.Any("systemInfoMetrics", systemInfoMetrics), // TODO(dragondriver): necessary? may be very large
zap.Error(err))
return systemInfoMetrics, err
}
log.Debug("DataNode.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("node_id", Params.NodeID),
zap.String("req", req.Request),
zap.String("metric_type", metricType))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: metricsinfo.MsgUnimplementedMetric,
},
Response: "",
}, nil
}
......@@ -21,6 +21,10 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
......@@ -209,6 +213,52 @@ func TestDataNode(t *testing.T) {
assert.NoError(t, err)
})
t.Run("Test getSystemInfoMetrics", func(t *testing.T) {
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
assert.NoError(t, err)
resp, err := node.getSystemInfoMetrics(node.ctx, req)
assert.NoError(t, err)
log.Info("Test DataNode.getSystemInfoMetrics",
zap.String("name", resp.ComponentName),
zap.String("response", resp.Response))
})
t.Run("Test GetMetrics", func(t *testing.T) {
// server is closed
stateSave := node.State.Load().(internalpb.StateCode)
node.State.Store(internalpb.StateCode_Abnormal)
resp, err := node.GetMetrics(ctx, &milvuspb.GetMetricsRequest{})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
node.State.Store(stateSave)
// failed to parse metric type
invalidRequest := "invalid request"
resp, err = node.GetMetrics(ctx, &milvuspb.GetMetricsRequest{
Request: invalidRequest,
})
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// unsupported metric type
unsupportedMetricType := "unsupported"
req, err := metricsinfo.ConstructRequestByMetricType(unsupportedMetricType)
assert.NoError(t, err)
resp, err = node.GetMetrics(ctx, req)
assert.NoError(t, err)
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// normal case
req, err = metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
assert.NoError(t, err)
resp, err = node.GetMetrics(node.ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
log.Info("Test DataNode.GetMetrics",
zap.String("name", resp.ComponentName),
zap.String("response", resp.Response))
})
t.Run("Test BackGroundGC", func(te *testing.T) {
te.Skipf("issue #6574")
ctx, cancel := context.WithCancel(context.Background())
......
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datanode
import (
"errors"
"fmt"
)
func msgDataNodeIsUnhealthy(nodeID UniqueID) string {
return fmt.Sprintf("data node %d is not ready", nodeID)
}
func errDataNodeIsUnhealthy(nodeID UniqueID) error {
return errors.New(msgDataNodeIsUnhealthy(nodeID))
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datanode
import (
"testing"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestMsgDataNodeIsUnhealthy(t *testing.T) {
nodeIDList := []typeutil.UniqueID{1, 2, 3}
for _, nodeID := range nodeIDList {
log.Info("TestMsgDataNodeIsUnhealthy", zap.String("msg", msgDataNodeIsUnhealthy(nodeID)))
}
}
func TestErrDataNodeIsUnhealthy(t *testing.T) {
nodeIDList := []typeutil.UniqueID{1, 2, 3}
for _, nodeID := range nodeIDList {
log.Info("TestErrDataNodeIsUnhealthy", zap.Error(errDataNodeIsUnhealthy(nodeID)))
}
}
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datanode
import (
"context"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
func (node *DataNode) getSystemInfoMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
// TODO(dragondriver): add more metrics
nodeInfos := metricsinfo.DataNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, Params.NodeID),
},
}
resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, Params.NodeID),
}, nil
}
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataNodeRole, Params.NodeID),
}, nil
}
......@@ -255,3 +255,10 @@ func (c *Client) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedS
})
return ret.(*datapb.GetFlushedSegmentsResponse), err
}
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.GetMetrics(ctx, req)
})
return ret.(*milvuspb.GetMetricsResponse), err
}
......@@ -233,3 +233,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
return s.dataCoord.GetFlushedSegments(ctx, req)
}
func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
return s.dataCoord.GetMetrics(ctx, req)
}
......@@ -169,3 +169,10 @@ func (c *Client) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsReq
})
return ret.(*commonpb.Status), err
}
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpc.GetMetrics(ctx, req)
})
return ret.(*milvuspb.GetMetricsResponse), err
}
......@@ -272,3 +272,7 @@ func (s *Server) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsReq
}
return s.datanode.FlushSegments(ctx, req)
}
func (s *Server) GetMetrics(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
return s.datanode.GetMetrics(ctx, request)
}
......@@ -30,6 +30,9 @@ service DataCoord {
rpc SaveBinlogPaths(SaveBinlogPathsRequest) returns (common.Status){}
rpc GetRecoveryInfo(GetRecoveryInfoRequest) returns (GetRecoveryInfoResponse){}
rpc GetFlushedSegments(GetFlushedSegmentsRequest) returns(GetFlushedSegmentsResponse){}
// https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy
rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {}
}
service DataNode {
......@@ -38,6 +41,9 @@ service DataNode {
rpc WatchDmChannels(WatchDmChannelsRequest) returns (common.Status) {}
rpc FlushSegments(FlushSegmentsRequest) returns(common.Status) {}
// https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy
rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {}
}
message FlushRequest {
......
......@@ -43,6 +43,8 @@ type DataNode interface {
WatchDmChannels(ctx context.Context, req *datapb.WatchDmChannelsRequest) (*commonpb.Status, error)
FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error)
GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
}
type DataCoord interface {
......
......@@ -68,6 +68,18 @@ type IndexCoordInfos struct {
// TODO(dragondriver): add more detail metrics
}
// DataNodeInfos implements ComponentInfos
type DataNodeInfos struct {
BaseComponentInfos
// TODO(dragondriver): add more detail metrics
}
// DataCoordInfos implements ComponentInfos
type DataCoordInfos struct {
BaseComponentInfos
// TODO(dragondriver): add more detail metrics
}
// RootCoordInfos implements ComponentInfos
type RootCoordInfos struct {
BaseComponentInfos
......
......@@ -100,6 +100,38 @@ func TestIndexCoordInfos_Codec(t *testing.T) {
assert.Equal(t, infos1.Name, infos2.Name)
}
func TestDataNodeInfos_Codec(t *testing.T) {
infos1 := DataNodeInfos{
BaseComponentInfos: BaseComponentInfos{
Name: ConstructComponentName(typeutil.DataNodeRole, 1),
},
}
s, err := MarshalComponentInfos(infos1)
assert.Equal(t, nil, err)
log.Info("TestDataNodeInfos_Codec",
zap.String("marshaled_result", s))
var infos2 DataNodeInfos
err = UnmarshalComponentInfos(s, &infos2)
assert.Equal(t, nil, err)
assert.Equal(t, infos1.Name, infos2.Name)
}
func TestDataCoordInfos_Codec(t *testing.T) {
infos1 := DataCoordInfos{
BaseComponentInfos: BaseComponentInfos{
Name: ConstructComponentName(typeutil.DataCoordRole, 1),
},
}
s, err := MarshalComponentInfos(infos1)
assert.Equal(t, nil, err)
log.Info("TestDataCoordInfos_Codec",
zap.String("marshaled_result", s))
var infos2 DataCoordInfos
err = UnmarshalComponentInfos(s, &infos2)
assert.Equal(t, nil, err)
assert.Equal(t, infos1.Name, infos2.Name)
}
func TestRootCoordInfos_Codec(t *testing.T) {
infos1 := RootCoordInfos{
BaseComponentInfos: BaseComponentInfos{
......
......@@ -75,6 +75,18 @@ type IndexCoordTopology struct {
Connections ConnTopology `json:"connections"`
}
// DataClusterTopology shows the topology between DataCoord and DataNodes
type DataClusterTopology struct {
Self DataCoordInfos `json:"self"`
ConnectedNodes []DataNodeInfos `json:"connected_nodes"`
}
// DataCoordTopology shows the whole metrics of index cluster
type DataCoordTopology struct {
Cluster DataClusterTopology `json:"cluster"`
Connections ConnTopology `json:"connections"`
}
// RootCoordTopology shows the whole metrics of root coordinator
type RootCoordTopology struct {
Self RootCoordInfos `json:"self"`
......
......@@ -200,6 +200,87 @@ func TestIndexCoordTopology_Codec(t *testing.T) {
}
}
func TestDataClusterTopology_Codec(t *testing.T) {
topology1 := DataClusterTopology{
Self: DataCoordInfos{
BaseComponentInfos: BaseComponentInfos{
Name: ConstructComponentName(typeutil.DataCoordRole, 1),
},
},
ConnectedNodes: []DataNodeInfos{
{
BaseComponentInfos: BaseComponentInfos{
Name: ConstructComponentName(typeutil.DataNodeRole, 1),
},
},
{
BaseComponentInfos: BaseComponentInfos{
Name: ConstructComponentName(typeutil.DataNodeRole, 2),
},
},
},
}
s, err := MarshalTopology(topology1)
assert.Equal(t, nil, err)
log.Info("TestDataClusterTopology_Codec",
zap.String("marshaled_result", s))
var topology2 DataClusterTopology
err = UnmarshalTopology(s, &topology2)
assert.Equal(t, nil, err)
assert.Equal(t, topology1.Self, topology2.Self)
assert.Equal(t, len(topology1.ConnectedNodes), len(topology2.ConnectedNodes))
for i := range topology1.ConnectedNodes {
assert.Equal(t, topology1.ConnectedNodes[i], topology2.ConnectedNodes[i])
}
}
func TestDataCoordTopology_Codec(t *testing.T) {
topology1 := DataCoordTopology{
Cluster: DataClusterTopology{
Self: DataCoordInfos{
BaseComponentInfos: BaseComponentInfos{
Name: ConstructComponentName(typeutil.DataCoordRole, 1),
},
},
ConnectedNodes: []DataNodeInfos{
{
BaseComponentInfos: BaseComponentInfos{
Name: ConstructComponentName(typeutil.DataNodeRole, 1),
},
},
{
BaseComponentInfos: BaseComponentInfos{
Name: ConstructComponentName(typeutil.DataNodeRole, 2),
},
},
},
},
Connections: ConnTopology{
Name: ConstructComponentName(typeutil.DataCoordRole, 1),
ConnectedComponents: []string{
ConstructComponentName(typeutil.RootCoordRole, 1),
},
},
}
s, err := MarshalTopology(topology1)
assert.Equal(t, nil, err)
log.Info("TestDataCoordTopology_Codec",
zap.String("marshaled_result", s))
var topology2 DataCoordTopology
err = UnmarshalTopology(s, &topology2)
assert.Equal(t, nil, err)
assert.Equal(t, topology1.Cluster.Self, topology2.Cluster.Self)
assert.Equal(t, len(topology1.Cluster.ConnectedNodes), len(topology2.Cluster.ConnectedNodes))
for i := range topology1.Cluster.ConnectedNodes {
assert.Equal(t, topology1.Cluster.ConnectedNodes[i], topology2.Cluster.ConnectedNodes[i])
}
assert.Equal(t, topology1.Connections.Name, topology2.Connections.Name)
assert.Equal(t, len(topology1.Connections.ConnectedComponents), len(topology1.Connections.ConnectedComponents))
for i := range topology1.Connections.ConnectedComponents {
assert.Equal(t, topology1.Connections.ConnectedComponents[i], topology2.Connections.ConnectedComponents[i])
}
}
func TestRootCoordTopology_Codec(t *testing.T) {
topology1 := RootCoordTopology{
Self: RootCoordInfos{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册