未验证 提交 cba0feb1 编写于 作者: W wei liu 提交者: GitHub

add coordinator broker, to unify rootcoord api access (#25187)

Signed-off-by: NWei Liu <wei.liu@zilliz.com>
上级 d7d61f52
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"time"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"go.uber.org/zap"
)
const (
brokerRPCTimeout = 5 * time.Second
)
type Broker interface {
DescribeCollectionInternal(ctx context.Context, collectionID int64) (*milvuspb.DescribeCollectionResponse, error)
ShowPartitionsInternal(ctx context.Context, collectionID int64) ([]int64, error)
ShowCollections(ctx context.Context, dbName string) (*milvuspb.ShowCollectionsResponse, error)
ListDatabases(ctx context.Context) (*milvuspb.ListDatabasesResponse, error)
HasCollection(ctx context.Context, collectionID int64) (bool, error)
}
type CoordinatorBroker struct {
rootCoord types.RootCoord
}
func NewCoordinatorBroker(rootCoord types.RootCoord) *CoordinatorBroker {
return &CoordinatorBroker{
rootCoord: rootCoord,
}
}
func (b *CoordinatorBroker) DescribeCollectionInternal(ctx context.Context, collectionID int64) (*milvuspb.DescribeCollectionResponse, error) {
ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout)
defer cancel()
resp, err := b.rootCoord.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
// please do not specify the collection name alone after database feature.
CollectionID: collectionID,
})
if err = VerifyResponse(resp, err); err != nil {
log.Error("DescribeCollectionInternal failed",
zap.Int64("collectionID", collectionID),
zap.Error(err))
return nil, err
}
return resp, nil
}
func (b *CoordinatorBroker) ShowPartitionsInternal(ctx context.Context, collectionID int64) ([]int64, error) {
ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout)
defer cancel()
resp, err := b.rootCoord.ShowPartitionsInternal(ctx, &milvuspb.ShowPartitionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions),
commonpbutil.WithMsgID(0),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
// please do not specify the collection name alone after database feature.
CollectionID: collectionID,
})
if err = VerifyResponse(resp, err); err != nil {
log.Error("ShowPartitionsInternal failed",
zap.Int64("collectionID", collectionID),
zap.Error(err))
return nil, err
}
return resp.PartitionIDs, nil
}
func (b *CoordinatorBroker) ShowCollections(ctx context.Context, dbName string) (*milvuspb.ShowCollectionsResponse, error) {
ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout)
defer cancel()
resp, err := b.rootCoord.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections),
),
DbName: dbName,
})
if err = VerifyResponse(resp, err); err != nil {
log.Warn("ShowCollections failed",
zap.String("dbName", dbName),
zap.Error(err))
return nil, err
}
return resp, nil
}
func (b *CoordinatorBroker) ListDatabases(ctx context.Context) (*milvuspb.ListDatabasesResponse, error) {
ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout)
defer cancel()
resp, err := b.rootCoord.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ListDatabases)),
})
if err = VerifyResponse(resp, err); err != nil {
log.Warn("failed to ListDatabases", zap.Error(err))
return nil, err
}
return resp, nil
}
// HasCollection communicates with RootCoord and check whether this collection exist from the user's perspective.
func (b *CoordinatorBroker) HasCollection(ctx context.Context, collectionID int64) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout)
defer cancel()
resp, err := b.rootCoord.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
// please do not specify the collection name alone after database feature.
CollectionID: collectionID,
})
if err != nil {
return false, err
}
if resp == nil {
return false, errNilResponse
}
if resp.Status.ErrorCode == commonpb.ErrorCode_Success {
return true, nil
}
statusErr := common.NewStatusError(resp.Status.ErrorCode, resp.Status.Reason)
if common.IsCollectionNotExistError(statusErr) {
return false, nil
}
return false, statusErr
}
...@@ -374,7 +374,7 @@ func (h *ServerHandler) HasCollection(ctx context.Context, collectionID UniqueID ...@@ -374,7 +374,7 @@ func (h *ServerHandler) HasCollection(ctx context.Context, collectionID UniqueID
ctx2, cancel := context.WithTimeout(ctx, time.Minute*30) ctx2, cancel := context.WithTimeout(ctx, time.Minute*30)
defer cancel() defer cancel()
if err := retry.Do(ctx2, func() error { if err := retry.Do(ctx2, func() error {
has, err := h.s.hasCollection(ctx2, collectionID) has, err := h.s.broker.HasCollection(ctx2, collectionID)
if err != nil { if err != nil {
log.RatedInfo(60, "datacoord ServerHandler HasCollection retry failed", zap.Error(err)) log.RatedInfo(60, "datacoord ServerHandler HasCollection retry failed", zap.Error(err))
return err return err
......
...@@ -32,7 +32,6 @@ import ( ...@@ -32,7 +32,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
indexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client" indexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client"
rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
...@@ -44,12 +43,10 @@ import ( ...@@ -44,12 +43,10 @@ import (
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/logutil" "github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
...@@ -145,6 +142,9 @@ type Server struct { ...@@ -145,6 +142,9 @@ type Server struct {
//segReferManager *SegmentReferenceManager //segReferManager *SegmentReferenceManager
indexBuilder *indexBuilder indexBuilder *indexBuilder
indexNodeManager *IndexNodeManager indexNodeManager *IndexNodeManager
// manage ways that data coord access other coord
broker Broker
} }
// ServerHelper datacoord server injection helper // ServerHelper datacoord server injection helper
...@@ -322,6 +322,8 @@ func (s *Server) initDataCoord() error { ...@@ -322,6 +322,8 @@ func (s *Server) initDataCoord() error {
return err return err
} }
s.broker = NewCoordinatorBroker(s.rootCoordClient)
storageCli, err := s.newChunkManagerFactory() storageCli, err := s.newChunkManagerFactory()
if err != nil { if err != nil {
return err return err
...@@ -1006,33 +1008,12 @@ func (s *Server) stopServerLoop() { ...@@ -1006,33 +1008,12 @@ func (s *Server) stopServerLoop() {
// loadCollectionFromRootCoord communicates with RootCoord and asks for collection information. // loadCollectionFromRootCoord communicates with RootCoord and asks for collection information.
// collection information will be added to server meta info. // collection information will be added to server meta info.
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error { func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
resp, err := s.rootCoordClient.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{ resp, err := s.broker.DescribeCollectionInternal(ctx, collectionID)
Base: commonpbutil.NewMsgBase( if err != nil {
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
// please do not specify the collection name alone after database feature.
CollectionID: collectionID,
})
if err = VerifyResponse(resp, err); err != nil {
return err return err
} }
presp, err := s.rootCoordClient.ShowPartitionsInternal(ctx, &milvuspb.ShowPartitionsRequest{ partitionIDs, err := s.broker.ShowPartitionsInternal(ctx, collectionID)
Base: commonpbutil.NewMsgBase( if err != nil {
commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions),
commonpbutil.WithMsgID(0),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
// please do not specify the collection name alone after database feature.
/*
DbName: "",
CollectionName: resp.Schema.Name,
*/
CollectionID: resp.CollectionID,
})
if err = VerifyResponse(presp, err); err != nil {
log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
return err return err
} }
...@@ -1044,7 +1025,7 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i ...@@ -1044,7 +1025,7 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i
collInfo := &collectionInfo{ collInfo := &collectionInfo{
ID: resp.CollectionID, ID: resp.CollectionID,
Schema: resp.Schema, Schema: resp.Schema,
Partitions: presp.PartitionIDs, Partitions: partitionIDs,
StartPositions: resp.GetStartPositions(), StartPositions: resp.GetStartPositions(),
Properties: properties, Properties: properties,
CreatedAt: resp.GetCreatedTimestamp(), CreatedAt: resp.GetCreatedTimestamp(),
...@@ -1053,32 +1034,6 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i ...@@ -1053,32 +1034,6 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i
return nil return nil
} }
// hasCollection communicates with RootCoord and check whether this collection exist from the user's perspective.
func (s *Server) hasCollection(ctx context.Context, collectionID int64) (bool, error) {
resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
DbName: "",
CollectionID: collectionID,
})
if err != nil {
return false, err
}
if resp == nil {
return false, errNilResponse
}
if resp.Status.ErrorCode == commonpb.ErrorCode_Success {
return true, nil
}
statusErr := common.NewStatusError(resp.Status.ErrorCode, resp.Status.Reason)
if common.IsCollectionNotExistError(statusErr) {
return false, nil
}
return false, statusErr
}
func (s *Server) reCollectSegmentStats(ctx context.Context) { func (s *Server) reCollectSegmentStats(ctx context.Context) {
if s.channelManager == nil { if s.channelManager == nil {
log.Error("null channel manager found, which should NOT happen in non-testing environment") log.Error("null channel manager found, which should NOT happen in non-testing environment")
......
...@@ -188,6 +188,7 @@ func TestAssignSegmentID(t *testing.T) { ...@@ -188,6 +188,7 @@ func TestAssignSegmentID(t *testing.T) {
RootCoord: svr.rootCoordClient, RootCoord: svr.rootCoordClient,
collID: collID, collID: collID,
} }
schema := newTestSchema() schema := newTestSchema()
svr.meta.AddCollection(&collectionInfo{ svr.meta.AddCollection(&collectionInfo{
ID: collID, ID: collID,
...@@ -3489,6 +3490,7 @@ func TestGetFlushAllState(t *testing.T) { ...@@ -3489,6 +3490,7 @@ func TestGetFlushAllState(t *testing.T) {
var err error var err error
svr.meta = &meta{} svr.meta = &meta{}
svr.rootCoordClient = mocks.NewRootCoord(t) svr.rootCoordClient = mocks.NewRootCoord(t)
svr.broker = NewCoordinatorBroker(svr.rootCoordClient)
if test.ListDatabaseFailed { if test.ListDatabaseFailed {
svr.rootCoordClient.(*mocks.RootCoord).EXPECT().ListDatabases(mock.Anything, mock.Anything). svr.rootCoordClient.(*mocks.RootCoord).EXPECT().ListDatabases(mock.Anything, mock.Anything).
Return(&milvuspb.ListDatabasesResponse{ Return(&milvuspb.ListDatabasesResponse{
......
...@@ -643,18 +643,10 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf ...@@ -643,18 +643,10 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
return resp, nil return resp, nil
} }
dresp, err := s.rootCoordClient.DescribeCollectionInternal(s.ctx, &milvuspb.DescribeCollectionRequest{ dresp, err := s.broker.DescribeCollectionInternal(s.ctx, collectionID)
Base: commonpbutil.NewMsgBase( if err != nil {
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
// please do not specify the collection name alone after database feature.
CollectionID: collectionID,
})
if err = VerifyResponse(dresp, err); err != nil {
log.Error("get collection info from rootcoord failed", log.Error("get collection info from rootcoord failed",
zap.Error(err)) zap.Error(err))
resp.Status.Reason = err.Error() resp.Status.Reason = err.Error()
return resp, nil return resp, nil
} }
...@@ -781,15 +773,8 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI ...@@ -781,15 +773,8 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI
return resp, nil return resp, nil
} }
dresp, err := s.rootCoordClient.DescribeCollectionInternal(s.ctx, &milvuspb.DescribeCollectionRequest{ dresp, err := s.broker.DescribeCollectionInternal(s.ctx, collectionID)
Base: commonpbutil.NewMsgBase( if err != nil {
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
// please do not specify the collection name alone after database feature.
CollectionID: collectionID,
})
if err = VerifyResponse(dresp, err); err != nil {
log.Error("get collection info from rootcoord failed", log.Error("get collection info from rootcoord failed",
zap.Error(err)) zap.Error(err))
...@@ -1301,37 +1286,24 @@ func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAll ...@@ -1301,37 +1286,24 @@ func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAll
return resp, nil return resp, nil
} }
dbsRsp, err := s.rootCoordClient.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{ dbsRsp, err := s.broker.ListDatabases(ctx)
Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ListDatabases)), if err != nil {
})
if err = VerifyResponse(dbsRsp, err); err != nil {
log.Warn("failed to ListDatabases", zap.Error(err)) log.Warn("failed to ListDatabases", zap.Error(err))
resp.Status.Reason = err.Error() resp.Status.Reason = err.Error()
return resp, nil return resp, nil
} }
for _, dbName := range dbsRsp.DbNames { for _, dbName := range dbsRsp.DbNames {
showColRsp, err := s.rootCoordClient.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ showColRsp, err := s.broker.ShowCollections(ctx, dbName)
Base: commonpbutil.NewMsgBase( if err != nil {
commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections),
),
DbName: dbName,
})
if err = VerifyResponse(showColRsp, err); err != nil {
log.Warn("failed to ShowCollections", zap.Error(err)) log.Warn("failed to ShowCollections", zap.Error(err))
resp.Status.Reason = err.Error() resp.Status.Reason = err.Error()
return resp, nil return resp, nil
} }
for _, collection := range showColRsp.GetCollectionIds() { for _, collection := range showColRsp.GetCollectionIds() {
describeColRsp, err := s.rootCoordClient.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{ describeColRsp, err := s.broker.DescribeCollectionInternal(ctx, collection)
Base: commonpbutil.NewMsgBase( if err != nil {
commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
),
// please do not specify the collection name alone after database feature.
CollectionID: collection,
})
if err = VerifyResponse(describeColRsp, err); err != nil {
log.Warn("failed to DescribeCollectionInternal", zap.Error(err)) log.Warn("failed to DescribeCollectionInternal", zap.Error(err))
resp.Status.Reason = err.Error() resp.Status.Reason = err.Error()
return resp, nil return resp, nil
......
...@@ -39,7 +39,7 @@ import ( ...@@ -39,7 +39,7 @@ import (
) )
const ( const (
brokerRPCTimeout = 10 * time.Second brokerRPCTimeout = 5 * time.Second
) )
type Broker interface { type Broker interface {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册