diff --git a/internal/datacoord/coordinator_broker.go b/internal/datacoord/coordinator_broker.go new file mode 100644 index 0000000000000000000000000000000000000000..46606d96971a6078e66f52d0bc10ada047ff56c2 --- /dev/null +++ b/internal/datacoord/coordinator_broker.go @@ -0,0 +1,156 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package datacoord + +import ( + "context" + "time" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "go.uber.org/zap" +) + +const ( + brokerRPCTimeout = 5 * time.Second +) + +type Broker interface { + DescribeCollectionInternal(ctx context.Context, collectionID int64) (*milvuspb.DescribeCollectionResponse, error) + ShowPartitionsInternal(ctx context.Context, collectionID int64) ([]int64, error) + ShowCollections(ctx context.Context, dbName string) (*milvuspb.ShowCollectionsResponse, error) + ListDatabases(ctx context.Context) (*milvuspb.ListDatabasesResponse, error) + HasCollection(ctx context.Context, collectionID int64) (bool, error) +} + +type CoordinatorBroker struct { + rootCoord types.RootCoord +} + +func NewCoordinatorBroker(rootCoord types.RootCoord) *CoordinatorBroker { + return &CoordinatorBroker{ + rootCoord: rootCoord, + } +} + +func (b *CoordinatorBroker) DescribeCollectionInternal(ctx context.Context, collectionID int64) (*milvuspb.DescribeCollectionResponse, error) { + ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout) + defer cancel() + resp, err := b.rootCoord.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection), + commonpbutil.WithSourceID(paramtable.GetNodeID()), + ), + // please do not specify the collection name alone after database feature. + CollectionID: collectionID, + }) + if err = VerifyResponse(resp, err); err != nil { + log.Error("DescribeCollectionInternal failed", + zap.Int64("collectionID", collectionID), + zap.Error(err)) + return nil, err + } + + return resp, nil +} + +func (b *CoordinatorBroker) ShowPartitionsInternal(ctx context.Context, collectionID int64) ([]int64, error) { + ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout) + defer cancel() + resp, err := b.rootCoord.ShowPartitionsInternal(ctx, &milvuspb.ShowPartitionsRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions), + commonpbutil.WithMsgID(0), + commonpbutil.WithSourceID(paramtable.GetNodeID()), + ), + // please do not specify the collection name alone after database feature. + CollectionID: collectionID, + }) + if err = VerifyResponse(resp, err); err != nil { + log.Error("ShowPartitionsInternal failed", + zap.Int64("collectionID", collectionID), + zap.Error(err)) + return nil, err + } + + return resp.PartitionIDs, nil +} + +func (b *CoordinatorBroker) ShowCollections(ctx context.Context, dbName string) (*milvuspb.ShowCollectionsResponse, error) { + ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout) + defer cancel() + resp, err := b.rootCoord.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections), + ), + DbName: dbName, + }) + + if err = VerifyResponse(resp, err); err != nil { + log.Warn("ShowCollections failed", + zap.String("dbName", dbName), + zap.Error(err)) + return nil, err + } + + return resp, nil +} + +func (b *CoordinatorBroker) ListDatabases(ctx context.Context) (*milvuspb.ListDatabasesResponse, error) { + ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout) + defer cancel() + resp, err := b.rootCoord.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{ + Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ListDatabases)), + }) + if err = VerifyResponse(resp, err); err != nil { + log.Warn("failed to ListDatabases", zap.Error(err)) + return nil, err + } + return resp, nil +} + +// HasCollection communicates with RootCoord and check whether this collection exist from the user's perspective. +func (b *CoordinatorBroker) HasCollection(ctx context.Context, collectionID int64) (bool, error) { + ctx, cancel := context.WithTimeout(ctx, brokerRPCTimeout) + defer cancel() + resp, err := b.rootCoord.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection), + commonpbutil.WithSourceID(paramtable.GetNodeID()), + ), + // please do not specify the collection name alone after database feature. + CollectionID: collectionID, + }) + if err != nil { + return false, err + } + if resp == nil { + return false, errNilResponse + } + if resp.Status.ErrorCode == commonpb.ErrorCode_Success { + return true, nil + } + statusErr := common.NewStatusError(resp.Status.ErrorCode, resp.Status.Reason) + if common.IsCollectionNotExistError(statusErr) { + return false, nil + } + return false, statusErr +} diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 4eae36ff56fe58c71bdd790d1db932ad929e71fb..23ffc63bd3b18e1735f430f9da5a28c128f3b5ed 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -374,7 +374,7 @@ func (h *ServerHandler) HasCollection(ctx context.Context, collectionID UniqueID ctx2, cancel := context.WithTimeout(ctx, time.Minute*30) defer cancel() if err := retry.Do(ctx2, func() error { - has, err := h.s.hasCollection(ctx2, collectionID) + has, err := h.s.broker.HasCollection(ctx2, collectionID) if err != nil { log.RatedInfo(60, "datacoord ServerHandler HasCollection retry failed", zap.Error(err)) return err diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 7d1c31e0c6a011ed575ce989f23ecda96374b7fa..0cfcf4be40a223c31669976c9e4fb96940a8c3a8 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -32,7 +32,6 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" indexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client" rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" @@ -44,12 +43,10 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/sessionutil" - "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" - "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/logutil" "github.com/milvus-io/milvus/pkg/util/merr" @@ -145,6 +142,9 @@ type Server struct { //segReferManager *SegmentReferenceManager indexBuilder *indexBuilder indexNodeManager *IndexNodeManager + + // manage ways that data coord access other coord + broker Broker } // ServerHelper datacoord server injection helper @@ -322,6 +322,8 @@ func (s *Server) initDataCoord() error { return err } + s.broker = NewCoordinatorBroker(s.rootCoordClient) + storageCli, err := s.newChunkManagerFactory() if err != nil { return err @@ -1006,33 +1008,12 @@ func (s *Server) stopServerLoop() { // loadCollectionFromRootCoord communicates with RootCoord and asks for collection information. // collection information will be added to server meta info. func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error { - resp, err := s.rootCoordClient.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - // please do not specify the collection name alone after database feature. - CollectionID: collectionID, - }) - if err = VerifyResponse(resp, err); err != nil { + resp, err := s.broker.DescribeCollectionInternal(ctx, collectionID) + if err != nil { return err } - presp, err := s.rootCoordClient.ShowPartitionsInternal(ctx, &milvuspb.ShowPartitionsRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions), - commonpbutil.WithMsgID(0), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - // please do not specify the collection name alone after database feature. - /* - DbName: "", - CollectionName: resp.Schema.Name, - */ - CollectionID: resp.CollectionID, - }) - if err = VerifyResponse(presp, err); err != nil { - log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name), - zap.Int64("collectionID", resp.CollectionID), zap.Error(err)) + partitionIDs, err := s.broker.ShowPartitionsInternal(ctx, collectionID) + if err != nil { return err } @@ -1044,7 +1025,7 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i collInfo := &collectionInfo{ ID: resp.CollectionID, Schema: resp.Schema, - Partitions: presp.PartitionIDs, + Partitions: partitionIDs, StartPositions: resp.GetStartPositions(), Properties: properties, CreatedAt: resp.GetCreatedTimestamp(), @@ -1053,32 +1034,6 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i return nil } -// hasCollection communicates with RootCoord and check whether this collection exist from the user's perspective. -func (s *Server) hasCollection(ctx context.Context, collectionID int64) (bool, error) { - resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - DbName: "", - CollectionID: collectionID, - }) - if err != nil { - return false, err - } - if resp == nil { - return false, errNilResponse - } - if resp.Status.ErrorCode == commonpb.ErrorCode_Success { - return true, nil - } - statusErr := common.NewStatusError(resp.Status.ErrorCode, resp.Status.Reason) - if common.IsCollectionNotExistError(statusErr) { - return false, nil - } - return false, statusErr -} - func (s *Server) reCollectSegmentStats(ctx context.Context) { if s.channelManager == nil { log.Error("null channel manager found, which should NOT happen in non-testing environment") diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 73b5eb5ac7958cdffc69bc6903db190d4f764313..c2af5cfa9b9528d229ac58f8556f9b8ca039aa5b 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -188,6 +188,7 @@ func TestAssignSegmentID(t *testing.T) { RootCoord: svr.rootCoordClient, collID: collID, } + schema := newTestSchema() svr.meta.AddCollection(&collectionInfo{ ID: collID, @@ -3489,6 +3490,7 @@ func TestGetFlushAllState(t *testing.T) { var err error svr.meta = &meta{} svr.rootCoordClient = mocks.NewRootCoord(t) + svr.broker = NewCoordinatorBroker(svr.rootCoordClient) if test.ListDatabaseFailed { svr.rootCoordClient.(*mocks.RootCoord).EXPECT().ListDatabases(mock.Anything, mock.Anything). Return(&milvuspb.ListDatabasesResponse{ diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index c8bddc52421498bba5e085043ce96407aafe6a0b..832458c83f5055dbaf98d7a4eb2607d69051fcd8 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -643,18 +643,10 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf return resp, nil } - dresp, err := s.rootCoordClient.DescribeCollectionInternal(s.ctx, &milvuspb.DescribeCollectionRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - // please do not specify the collection name alone after database feature. - CollectionID: collectionID, - }) - if err = VerifyResponse(dresp, err); err != nil { + dresp, err := s.broker.DescribeCollectionInternal(s.ctx, collectionID) + if err != nil { log.Error("get collection info from rootcoord failed", zap.Error(err)) - resp.Status.Reason = err.Error() return resp, nil } @@ -781,15 +773,8 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI return resp, nil } - dresp, err := s.rootCoordClient.DescribeCollectionInternal(s.ctx, &milvuspb.DescribeCollectionRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection), - commonpbutil.WithSourceID(paramtable.GetNodeID()), - ), - // please do not specify the collection name alone after database feature. - CollectionID: collectionID, - }) - if err = VerifyResponse(dresp, err); err != nil { + dresp, err := s.broker.DescribeCollectionInternal(s.ctx, collectionID) + if err != nil { log.Error("get collection info from rootcoord failed", zap.Error(err)) @@ -1301,37 +1286,24 @@ func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAll return resp, nil } - dbsRsp, err := s.rootCoordClient.ListDatabases(ctx, &milvuspb.ListDatabasesRequest{ - Base: commonpbutil.NewMsgBase(commonpbutil.WithMsgType(commonpb.MsgType_ListDatabases)), - }) - if err = VerifyResponse(dbsRsp, err); err != nil { + dbsRsp, err := s.broker.ListDatabases(ctx) + if err != nil { log.Warn("failed to ListDatabases", zap.Error(err)) resp.Status.Reason = err.Error() return resp, nil } for _, dbName := range dbsRsp.DbNames { - showColRsp, err := s.rootCoordClient.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections), - ), - DbName: dbName, - }) - if err = VerifyResponse(showColRsp, err); err != nil { + showColRsp, err := s.broker.ShowCollections(ctx, dbName) + if err != nil { log.Warn("failed to ShowCollections", zap.Error(err)) resp.Status.Reason = err.Error() return resp, nil } for _, collection := range showColRsp.GetCollectionIds() { - describeColRsp, err := s.rootCoordClient.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection), - ), - // please do not specify the collection name alone after database feature. - CollectionID: collection, - }) - if err = VerifyResponse(describeColRsp, err); err != nil { + describeColRsp, err := s.broker.DescribeCollectionInternal(ctx, collection) + if err != nil { log.Warn("failed to DescribeCollectionInternal", zap.Error(err)) resp.Status.Reason = err.Error() return resp, nil diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index b1ca6b659ca656c884fdb597b8cd72db675c0b12..abd454e53753cc551a3e1b7192bd89893e717e3c 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -39,7 +39,7 @@ import ( ) const ( - brokerRPCTimeout = 10 * time.Second + brokerRPCTimeout = 5 * time.Second ) type Broker interface {