From b92ff69cea1674084097ccf855112341f8ceba80 Mon Sep 17 00:00:00 2001 From: dragondriver Date: Thu, 4 Feb 2021 11:08:36 +0800 Subject: [PATCH] Implement GetIndexState in Proxy Signed-off-by: dragondriver --- internal/proxynode/impl.go | 6 +- internal/proxynode/interface.go | 1 + internal/proxynode/task.go | 98 ++++++++++++++++++++++-- internal/querynode/flow_graph_gc_node.go | 34 ++++---- 4 files changed, 115 insertions(+), 24 deletions(-) diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index d0c898e5b..27163e8d1 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -442,8 +442,10 @@ func (node *NodeImpl) GetIndexState(request *milvuspb.IndexStateRequest) (*milvu ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) defer cancel() dipt := &GetIndexStateTask{ - Condition: NewTaskCondition(ctx), - IndexStateRequest: request, + Condition: NewTaskCondition(ctx), + IndexStateRequest: request, + indexServiceClient: node.indexServiceClient, + masterClientInterface: node.masterClient, } err := node.sched.DdQueue.Enqueue(dipt) diff --git a/internal/proxynode/interface.go b/internal/proxynode/interface.go index d7f8b048b..cb1fb6332 100644 --- a/internal/proxynode/interface.go +++ b/internal/proxynode/interface.go @@ -23,6 +23,7 @@ type MasterClient interface { CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) + DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) } type IndexServiceClient interface { diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index f29de8e2c..f498d0c83 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -6,6 +6,8 @@ import ( "math" "strconv" + "github.com/zilliztech/milvus-distributed/internal/proto/indexpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/golang/protobuf/proto" @@ -1285,8 +1287,9 @@ func (dit *DescribeIndexTask) PostExecute() error { type GetIndexStateTask struct { Condition *milvuspb.IndexStateRequest - indexServiceClient IndexServiceClient - result *milvuspb.IndexStateResponse + indexServiceClient IndexServiceClient + masterClientInterface MasterClient + result *milvuspb.IndexStateResponse } func (dipt *GetIndexStateTask) OnEnqueue() error { @@ -1336,17 +1339,98 @@ func (dipt *GetIndexStateTask) PreExecute() error { } func (dipt *GetIndexStateTask) Execute() error { - // TODO: use index service client - //var err error - //dipt.result, err = dipt.masterClient.GetIndexState(dipt.IndexStateRequest) - //return err + collectionName := dipt.CollectionName + collectionID, err := globalMetaCache.GetCollectionID(collectionName) + if err != nil { // err is not nil if collection not exists + return err + } + + showPartitionRequest := &milvuspb.ShowPartitionRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kShowPartitions, + MsgID: dipt.Base.MsgID, + Timestamp: dipt.Base.Timestamp, + SourceID: Params.ProxyID, + }, + DbName: dipt.DbName, + CollectionName: collectionName, + CollectionID: collectionID, + } + partitions, err := dipt.masterClientInterface.ShowPartitions(showPartitionRequest) + if err != nil { + return err + } + + for _, partitionID := range partitions.PartitionIDs { + showSegmentsRequest := &milvuspb.ShowSegmentRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kShowSegment, + MsgID: dipt.Base.MsgID, + Timestamp: dipt.Base.Timestamp, + SourceID: Params.ProxyID, + }, + CollectionID: collectionID, + PartitionID: partitionID, + } + segments, err := dipt.masterClientInterface.ShowSegments(showSegmentsRequest) + if err != nil { + return err + } + + getIndexStatesRequest := &indexpb.IndexStatesRequest{ + IndexBuildIDs: make([]UniqueID, 0), + } + for _, segmentID := range segments.SegmentIDs { + describeSegmentRequest := &milvuspb.DescribeSegmentRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_kDescribeSegment, + MsgID: dipt.Base.MsgID, + Timestamp: dipt.Base.Timestamp, + SourceID: Params.ProxyID, + }, + CollectionID: collectionID, + SegmentID: segmentID, + } + segmentDesc, err := dipt.masterClientInterface.DescribeSegment(describeSegmentRequest) + if err != nil { + return err + } + + getIndexStatesRequest.IndexBuildIDs = append(getIndexStatesRequest.IndexBuildIDs, segmentDesc.BuildID) + } + + states, err := dipt.indexServiceClient.GetIndexStates(getIndexStatesRequest) + if err != nil { + return err + } + + if states.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + dipt.result = &milvuspb.IndexStateResponse{ + Status: states.Status, + State: commonpb.IndexState_FAILED, + } + return nil + } + + for _, state := range states.States { + if state.State != commonpb.IndexState_FINISHED { + dipt.result = &milvuspb.IndexStateResponse{ + Status: states.Status, + State: state.State, + } + return nil + } + } + } + dipt.result = &milvuspb.IndexStateResponse{ Status: &commonpb.Status{ - ErrorCode: 0, + ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", }, State: commonpb.IndexState_FINISHED, } + return nil } diff --git a/internal/querynode/flow_graph_gc_node.go b/internal/querynode/flow_graph_gc_node.go index 6c156547d..dc468ed07 100644 --- a/internal/querynode/flow_graph_gc_node.go +++ b/internal/querynode/flow_graph_gc_node.go @@ -21,27 +21,31 @@ func (gcNode *gcNode) Operate(in []*Msg) []*Msg { // TODO: add error handling } - gcMsg, ok := (*in[0]).(*gcMsg) + _, ok := (*in[0]).(*gcMsg) if !ok { log.Println("type assertion failed for gcMsg") // TODO: add error handling } - // drop collections - for _, collectionID := range gcMsg.gcRecord.collections { - err := gcNode.replica.removeCollection(collectionID) - if err != nil { - log.Println(err) - } - } + // Use `releasePartition` and `releaseCollection`, + // because if we drop collections or partitions here, query service doesn't know this behavior, + // which would lead the wrong result of `showCollections` or `showPartition` - // drop partitions - for _, partition := range gcMsg.gcRecord.partitions { - err := gcNode.replica.removePartition(partition.collectionID, partition.partitionID) - if err != nil { - log.Println(err) - } - } + //// drop collections + //for _, collectionID := range gcMsg.gcRecord.collections { + // err := gcNode.replica.removeCollection(collectionID) + // if err != nil { + // log.Println(err) + // } + //} + // + //// drop partitions + //for _, partition := range gcMsg.gcRecord.partitions { + // err := gcNode.replica.removePartition(partition.collectionID, partition.partitionID) + // if err != nil { + // log.Println(err) + // } + //} return nil } -- GitLab