diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index dee0e36d34d3e15c2e82f7ae01f929282e83045d..4e8aca99e9835e319f756b7b1ae9110e4ede1f50 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -153,16 +153,9 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp return ret.(*milvuspb.StringResponse), err } -func (c *Client) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*commonpb.Status, error) { +func (c *Client) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) { ret, err := c.recall(func() (interface{}, error) { - return c.grpcClient.BuildIndex(ctx, req) - }) - return ret.(*commonpb.Status), err -} - -func (c *Client) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) { - ret, err := c.recall(func() (interface{}, error) { - return c.grpcClient.DropIndex(ctx, req) + return c.grpcClient.CreateIndex(ctx, req) }) return ret.(*commonpb.Status), err } diff --git a/internal/distributed/indexnode/service.go b/internal/distributed/indexnode/service.go index 611fa6702aadcf93841ca1734dda9a5d1416543f..fd90ea44a97d654d0ed13e073a57885162b33f28 100644 --- a/internal/distributed/indexnode/service.go +++ b/internal/distributed/indexnode/service.go @@ -191,12 +191,8 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetSt return s.indexnode.GetStatisticsChannel(ctx) } -func (s *Server) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*commonpb.Status, error) { - return s.indexnode.BuildIndex(ctx, req) -} - -func (s *Server) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) { - return s.indexnode.DropIndex(ctx, request) +func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) { + return s.indexnode.CreateIndex(ctx, req) } func NewServer(ctx context.Context) (*Server, error) { diff --git a/internal/distributed/indexservice/client/client.go b/internal/distributed/indexservice/client/client.go index 08fde8d312ebeb570c82afe9a9752338a8e06daf..4080bcdf40e044d5e399374fc73242284e6b8b65 100644 --- a/internal/distributed/indexservice/client/client.go +++ b/internal/distributed/indexservice/client/client.go @@ -217,10 +217,3 @@ func (c *Client) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFil }) return ret.(*indexpb.GetIndexFilePathsResponse), err } - -func (c *Client) NotifyBuildIndex(ctx context.Context, nty *indexpb.NotifyBuildIndexRequest) (*commonpb.Status, error) { - ret, err := c.recall(func() (interface{}, error) { - return c.grpcClient.NotifyBuildIndex(ctx, nty) - }) - return ret.(*commonpb.Status), err -} diff --git a/internal/distributed/indexservice/service.go b/internal/distributed/indexservice/service.go index 7e03c6f625203f4c6ca42dd813e1e4a9c36d7cf5..c7043e45436792c996139e1c1496a74e7c6a6ed0 100644 --- a/internal/distributed/indexservice/service.go +++ b/internal/distributed/indexservice/service.go @@ -149,9 +149,6 @@ func (s *Server) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFil return s.indexservice.GetIndexFilePaths(ctx, req) } -func (s *Server) NotifyBuildIndex(ctx context.Context, nty *indexpb.NotifyBuildIndexRequest) (*commonpb.Status, error) { - return s.indexservice.NotifyBuildIndex(ctx, nty) -} func (s *Server) startGrpcLoop(grpcPort int) { defer s.loopWg.Done() diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 457667dbf1a6a0654042605e1d334c3c5fc00fd8..7af124c63968c665829ec8ea3183263aff01e1c2 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -19,9 +19,13 @@ import ( "strconv" "time" + "github.com/milvus-io/milvus/internal/util/retry" + + "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/kv" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" miniokv "github.com/milvus-io/milvus/internal/kv/minio" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -58,6 +62,9 @@ type IndexNode struct { startCallbacks []func() closeCallbacks []func() + etcdKV *etcdkv.EtcdKV + finishedTasks map[UniqueID]commonpb.IndexState + closer io.Closer } @@ -88,7 +95,17 @@ func (i *IndexNode) Register() error { func (i *IndexNode) Init() error { ctx := context.Background() - err := funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200) + connectEtcdFn := func() error { + etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) + i.etcdKV = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath) + return err + } + err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn) + if err != nil { + return err + } + + err = funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200) if err != nil { return err } @@ -98,6 +115,7 @@ func (i *IndexNode) Init() error { Ip: Params.IP, Port: int64(Params.Port), }, + ServerID: i.session.ServerID, } resp, err2 := i.serviceClient.RegisterNode(ctx, request) @@ -164,11 +182,13 @@ func (i *IndexNode) SetIndexServiceClient(serviceClient types.IndexService) { i.serviceClient = serviceClient } -func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexRequest) (*commonpb.Status, error) { +func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateIndexRequest) (*commonpb.Status, error) { log.Debug("indexnode building index ...", zap.Int64("IndexBuildID", request.IndexBuildID), zap.String("Indexname", request.IndexName), zap.Int64("IndexID", request.IndexID), + zap.Int64("Version", request.Version), + zap.String("MetaPath", request.MetaPath), zap.Strings("DataPaths", request.DataPaths), zap.Any("TypeParams", request.TypeParams), zap.Any("IndexParams", request.IndexParams)) @@ -180,6 +200,7 @@ func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexR }, req: request, kv: i.kv, + etcdKV: i.etcdKV, serviceClient: i.serviceClient, nodeID: Params.NodeID, } @@ -194,34 +215,9 @@ func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexR ret.Reason = err.Error() return ret, nil } - log.Debug("indexnode", zap.Int64("indexnode successfully schedule with indexBuildID", request.IndexBuildID)) - return ret, nil -} + log.Debug("IndexNode", zap.Int64("IndexNode successfully schedule with indexBuildID", request.IndexBuildID)) -func (i *IndexNode) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) { - log.Debug("IndexNode", zap.Int64("Drop index by id", request.IndexID)) - indexBuildIDs := i.sched.IndexBuildQueue.tryToRemoveUselessIndexBuildTask(request.IndexID) - log.Debug("IndexNode", zap.Any("The index of the IndexBuildIDs to be deleted", indexBuildIDs)) - for _, indexBuildID := range indexBuildIDs { - nty := &indexpb.NotifyBuildIndexRequest{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - IndexBuildID: indexBuildID, - NodeID: Params.NodeID, - IndexFilePaths: []string{}, - } - resp, err := i.serviceClient.NotifyBuildIndex(ctx, nty) - if err != nil { - log.Warn("IndexNode", zap.String("DropIndex notify error", err.Error())) - } else if resp.ErrorCode != commonpb.ErrorCode_Success { - log.Warn("IndexNode", zap.String("DropIndex notify error reason", resp.Reason)) - } - } - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, nil + return ret, nil } // AddStartCallback adds a callback in the startServer phase. diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 9119eccf60480a14441bfa0a844c5de1c1ad3438..506394872d991741db77a96c66f81e1251f6718c 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -16,10 +16,15 @@ import ( "errors" "runtime" "strconv" + "time" + "github.com/milvus-io/milvus/internal/util/retry" + + "github.com/golang/protobuf/proto" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/kv" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/indexpb" @@ -83,8 +88,9 @@ type IndexBuildTask struct { BaseTask index Index kv kv.BaseKV + etcdKV *etcdkv.EtcdKV savePaths []string - req *indexpb.BuildIndexRequest + req *indexpb.CreateIndexRequest serviceClient types.IndexService nodeID UniqueID } @@ -111,9 +117,55 @@ func (it *IndexBuildTask) OnEnqueue() error { return nil } +func (it *IndexBuildTask) checkIndexMeta(pre bool) error { + fn := func() error { + indexMeta := indexpb.IndexMeta{} + _, values, versions, err := it.etcdKV.LoadWithPrefix2(it.req.MetaPath) + if err != nil { + log.Debug("IndexService", zap.Any("load meta error with path", it.req.MetaPath)) + log.Debug("IndexService", zap.Any("Load meta error", err)) + return err + } + err = proto.UnmarshalText(values[0], &indexMeta) + if err != nil { + log.Debug("IndexService", zap.Any("Unmarshal error", err)) + return err + } + if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished { + log.Debug("IndexNode", zap.Any("Notify build index", "This version is not the latest version")) + return nil + } + if indexMeta.MarkDeleted { + indexMeta.State = commonpb.IndexState_Finished + v := proto.MarshalTextString(&indexMeta) + err := it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0], v) + if err != nil { + return err + } + return nil + } + if pre { + return nil + } + indexMeta.IndexFilePaths = it.savePaths + indexMeta.State = commonpb.IndexState_Finished + if it.err != nil { + indexMeta.State = commonpb.IndexState_Failed + } + log.Debug("IndexNode", zap.Any("MetaPath", it.req.MetaPath)) + err = it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0], + proto.MarshalTextString(&indexMeta)) + return err + } + + err := retry.Retry(3, time.Millisecond*200, fn) + return err + +} + func (it *IndexBuildTask) PreExecute(ctx context.Context) error { log.Debug("preExecute...") - return nil + return it.checkIndexMeta(true) } func (it *IndexBuildTask) PostExecute(ctx context.Context) error { @@ -131,30 +183,7 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error { return err } - nty := &indexpb.NotifyBuildIndexRequest{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - IndexBuildID: it.req.IndexBuildID, - NodeID: it.nodeID, - IndexFilePaths: it.savePaths, - } - if it.err != nil { - nty.Status.ErrorCode = commonpb.ErrorCode_BuildIndexError - } - - ctx = context.TODO() - resp, err := it.serviceClient.NotifyBuildIndex(ctx, nty) - if err != nil { - log.Warn("indexnode", zap.String("error", err.Error())) - return err - } - - if resp.ErrorCode != commonpb.ErrorCode_Success { - err = errors.New(resp.Reason) - log.Debug("indexnode", zap.String("[IndexBuildTask][PostExecute] err", err.Error())) - } - return err + return it.checkIndexMeta(false) } func (it *IndexBuildTask) Execute(ctx context.Context) error { @@ -309,7 +338,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { getSavePathByKey := func(key string) string { // TODO: fix me, use more reasonable method - return strconv.Itoa(int(it.req.IndexBuildID)) + "/" + strconv.Itoa(int(partitionID)) + "/" + strconv.Itoa(int(segmentID)) + "/" + key + return strconv.Itoa(int(it.req.IndexBuildID)) + "/" + strconv.Itoa(int(it.req.Version)) + "/" + strconv.Itoa(int(partitionID)) + "/" + strconv.Itoa(int(segmentID)) + "/" + key } saveBlob := func(path string, value []byte) error { return it.kv.Save(path, string(value)) @@ -322,7 +351,26 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error { savePath := getSavePathByKey(key) - err := saveBlob(savePath, value) + saveIndexFileFn := func() error { + v, err := it.etcdKV.Load(it.req.MetaPath) + if err != nil { + log.Debug("IndexService", zap.Any("load meta error with path", it.req.MetaPath)) + log.Debug("IndexService", zap.Any("Load meta error", err)) + return err + } + indexMeta := indexpb.IndexMeta{} + err = proto.UnmarshalText(v, &indexMeta) + if err != nil { + log.Debug("IndexService", zap.Any("Unmarshal error", err)) + return err + } + if indexMeta.Version > it.req.Version { + log.Debug("IndexNode", zap.Any("Notify build index", "This version is not the latest version")) + return errors.New("This task has been reassigned ") + } + return saveBlob(savePath, value) + } + err := retry.Retry(5, time.Millisecond*200, saveIndexFileFn) if err != nil { return err } diff --git a/internal/indexservice/indexservice.go b/internal/indexservice/indexservice.go index 3e3d3647260082d1718869badaa80a3a8604bf28..d1584985077eb0000b7376c9cb6a6c987ded3970 100644 --- a/internal/indexservice/indexservice.go +++ b/internal/indexservice/indexservice.go @@ -15,9 +15,13 @@ import ( "context" "errors" "math/rand" + "strconv" "sync" "time" + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/golang/protobuf/proto" + "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/allocator" @@ -34,13 +38,12 @@ import ( "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" - "go.etcd.io/etcd/clientv3" ) const ( reqTimeoutInterval = time.Second * 10 durationInterval = time.Second * 10 - dropIndexLimit = 20 + recycleIndexLimit = 20 ) type IndexService struct { @@ -57,12 +60,18 @@ type IndexService struct { sched *TaskScheduler session *sessionutil.Session + eventChan <-chan *sessionutil.SessionEvent + + assignChan chan []UniqueID + idAllocator *allocator.GlobalIDAllocator kv kv.BaseKV metaTable *metaTable + nodeTasks *nodeTasks + nodeLock sync.RWMutex // Add callback functions at different stages @@ -80,6 +89,7 @@ func NewIndexService(ctx context.Context) (*IndexService, error) { loopCtx: ctx1, loopCancel: cancel, nodeClients: &PriorityQueue{}, + nodeTasks: &nodeTasks{}, } return i, nil @@ -89,12 +99,14 @@ func NewIndexService(ctx context.Context) (*IndexService, error) { func (i *IndexService) Register() error { i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, []string{Params.EtcdAddress}) i.session.Init(typeutil.IndexServiceRole, Params.Address, true) + i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, 0) return nil } func (i *IndexService) Init() error { log.Debug("indexservice", zap.String("etcd address", Params.EtcdAddress)) + i.assignChan = make(chan []UniqueID, 1024) connectEtcdFn := func() error { etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) if err != nil { @@ -144,6 +156,13 @@ func (i *IndexService) Init() error { return err } i.UpdateStateCode(internalpb.StateCode_Healthy) + + i.nodeTasks = NewNodeTasks() + + err = i.assignTasksServerStart() + if err != nil { + return err + } return nil } @@ -152,7 +171,16 @@ func (i *IndexService) Start() error { go i.tsLoop() i.loopWg.Add(1) - go i.dropIndexLoop() + go i.recycleUnusedIndexFiles() + + i.loopWg.Add(1) + go i.assignmentTasksLoop() + + i.loopWg.Add(1) + go i.watchNodeLoop() + + i.loopWg.Add(1) + go i.watchMetaLoop() i.sched.Start() // Start callbacks @@ -218,13 +246,24 @@ func (i *IndexService) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri } func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) { - log.Debug("indexservice building index ...", + log.Debug("IndexService building index ...", zap.Int64("IndexBuildID", req.IndexBuildID), zap.String("IndexName = ", req.IndexName), zap.Int64("IndexID = ", req.IndexID), zap.Strings("DataPath = ", req.DataPaths), zap.Any("TypeParams", req.TypeParams), zap.Any("IndexParams", req.IndexParams)) + hasIndex, indexBuildID := i.metaTable.HasSameReq(req) + if hasIndex { + log.Debug("IndexService", zap.Any("hasIndex true", indexBuildID)) + return &indexpb.BuildIndexResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "already have same index", + }, + IndexBuildID: indexBuildID, + }, nil + } ret := &indexpb.BuildIndexResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -273,6 +312,7 @@ func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRe ret.Status.Reason = err.Error() return ret, nil } + i.assignChan <- []UniqueID{t.indexBuildID} ret.Status.ErrorCode = commonpb.ErrorCode_Success ret.IndexBuildID = t.indexBuildID return ret, nil @@ -322,13 +362,6 @@ func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequ i.metaTable.DeleteIndex(indexBuildID) } }() - - go func() { - allNodeClients := i.nodeClients.PeekAllClients() - for _, client := range allNodeClients { - client.DropIndex(ctx, req) - } - }() }() log.Debug("IndexService", zap.Int64("DropIndex success by ID", req.IndexID)) @@ -359,27 +392,6 @@ func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIn return ret, nil } -func (i *IndexService) NotifyBuildIndex(ctx context.Context, nty *indexpb.NotifyBuildIndexRequest) (*commonpb.Status, error) { - log.Debug("IndexService", - zap.Int64("notify build index", nty.IndexBuildID), - zap.Strings("index file paths", nty.IndexFilePaths), - zap.Int64("node ID", nty.NodeID)) - ret := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - } - log.Debug("IndexService", zap.String("[IndexService][NotifyBuildIndex]", nty.String())) - if err := i.metaTable.NotifyBuildIndex(nty); err != nil { - ret.ErrorCode = commonpb.ErrorCode_BuildIndexError - ret.Reason = err.Error() - log.Debug("IndexService", zap.String("[IndexService][NotifyBuildIndex][metaTable][NotifyBuildIndex]", err.Error())) - } - - log.Debug("IndexService", zap.Any("Index build completed with ID", nty.IndexBuildID)) - - i.nodeClients.IncPriority(nty.NodeID, -1) - return ret, nil -} - func (i *IndexService) tsLoop() { tsoTicker := time.NewTicker(tso.UpdateTimestampStep) defer tsoTicker.Stop() @@ -401,27 +413,177 @@ func (i *IndexService) tsLoop() { } } -func (i *IndexService) dropIndexLoop() { +func (i *IndexService) recycleUnusedIndexFiles() { ctx, cancel := context.WithCancel(i.loopCtx) defer cancel() defer i.loopWg.Done() timeTicker := time.NewTicker(durationInterval) + log.Debug("IndexService start recycle unused index files loop") - log.Debug("IndexService start drop index loop") for { select { case <-ctx.Done(): return case <-timeTicker.C: - indexMetas := i.metaTable.getMarkDeleted(dropIndexLimit) - for j := 0; j < len(indexMetas); j++ { - if err := i.kv.MultiRemove(indexMetas[j].IndexFilePaths); err != nil { - log.Debug("IndexService", zap.String("Remove index files error", err.Error())) + metas := i.metaTable.GetUnusedIndexFiles(recycleIndexLimit) + for _, meta := range metas { + if meta.indexMeta.MarkDeleted { + unusedIndexFilePathPrefix := strconv.Itoa(int(meta.indexMeta.IndexBuildID)) + if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil { + log.Debug("IndexService", zap.String("Remove index files error", err.Error())) + } + i.metaTable.DeleteIndex(meta.indexMeta.IndexBuildID) + } else { + for j := 1; j < int(meta.indexMeta.Version); j++ { + unusedIndexFilePathPrefix := strconv.Itoa(int(meta.indexMeta.IndexBuildID)) + "/" + strconv.Itoa(j) + if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil { + log.Debug("IndexService", zap.String("Remove index files error", err.Error())) + } + } + if err := i.metaTable.UpdateRecycleState(meta.indexMeta.IndexBuildID); err != nil { + log.Debug("IndexService", zap.String("Remove index files error", err.Error())) + } } - i.metaTable.DeleteIndex(indexMetas[j].IndexBuildID) } } } } + +func (i *IndexService) assignmentTasksLoop() { + ctx, cancel := context.WithCancel(i.loopCtx) + + defer cancel() + defer i.loopWg.Done() + + log.Debug("IndexService start assign tasks loop") + + for { + select { + case <-ctx.Done(): + return + case indexBuildIDs := <-i.assignChan: + for _, indexBuildID := range indexBuildIDs { + meta := i.metaTable.GetIndexMeta(indexBuildID) + log.Debug("IndexService", zap.Any("Meta", meta)) + if meta.indexMeta.State == commonpb.IndexState_Finished { + continue + } + if err := i.metaTable.UpdateVersion(indexBuildID); err != nil { + log.Debug("IndexService", zap.String("build index update version err", err.Error())) + } + nodeID, builderClient, nodeServerID := i.nodeClients.PeekClient() + if builderClient == nil { + log.Debug("IndexService has no available IndexNode") + i.assignChan <- []UniqueID{indexBuildID} + continue + } + req := &indexpb.CreateIndexRequest{ + IndexBuildID: indexBuildID, + IndexName: meta.indexMeta.Req.IndexName, + IndexID: meta.indexMeta.Req.IndexID, + Version: meta.indexMeta.Version + 1, + MetaPath: "/indexes/" + strconv.FormatInt(indexBuildID, 10), + DataPaths: meta.indexMeta.Req.DataPaths, + TypeParams: meta.indexMeta.Req.TypeParams, + IndexParams: meta.indexMeta.Req.IndexParams, + } + resp, err := builderClient.CreateIndex(ctx, req) + if err != nil { + log.Debug("IndexService", zap.String("build index err", err.Error())) + } + if err = i.metaTable.BuildIndex(indexBuildID, nodeServerID); err != nil { + log.Debug("IndexService", zap.String("update meta table error", err.Error())) + } + if resp.ErrorCode != commonpb.ErrorCode_Success { + log.Debug("IndexService", zap.String("build index err", resp.Reason)) + } + i.nodeClients.IncPriority(nodeID, 1) + } + } + } +} + +func (i *IndexService) watchNodeLoop() { + ctx, cancel := context.WithCancel(i.loopCtx) + + defer cancel() + defer i.loopWg.Done() + log.Debug("IndexService start watch node loop") + + for { + select { + case <-ctx.Done(): + return + case event := <-i.eventChan: + switch event.EventType { + case sessionutil.SessionAddEvent: + serverID := event.Session.ServerID + log.Debug("IndexService", zap.Any("Add IndexNode, session serverID", serverID)) + case sessionutil.SessionDelEvent: + serverID := event.Session.ServerID + log.Debug("IndexService", zap.Any("The IndexNode crashed with ID", serverID)) + indexBuildIDs := i.nodeTasks.getTasksByLeaseKey(serverID) + i.assignChan <- indexBuildIDs + i.nodeTasks.delete(serverID) + } + } + } +} + +func (i *IndexService) watchMetaLoop() { + ctx, cancel := context.WithCancel(i.loopCtx) + + defer cancel() + defer i.loopWg.Done() + log.Debug("IndexService start watch meta loop") + + watchChan := i.metaTable.client.WatchWithPrefix("indexes") + + for { + select { + case <-ctx.Done(): + return + case resp := <-watchChan: + log.Debug("meta updated.") + for _, event := range resp.Events { + eventRevision := event.Kv.Version + indexMeta := &indexpb.IndexMeta{} + err := proto.UnmarshalText(string(event.Kv.Value), indexMeta) + if err != nil { + log.Debug("IndexService", zap.Any("Unmarshal error", err)) + } + indexBuildID := indexMeta.IndexBuildID + switch event.Type { + case mvccpb.PUT: + //TODO: get indexBuildID fast + log.Debug("IndexService", zap.Any("Meta need load by IndexBuildID", indexBuildID)) + + reload := i.metaTable.LoadMetaFromETCD(indexBuildID, eventRevision) + if reload { + i.nodeTasks.finishTask(indexBuildID) + } + case mvccpb.DELETE: + } + } + } + } +} + +func (i *IndexService) assignTasksServerStart() error { + sessions, _, err := i.session.GetSessions(typeutil.IndexNodeRole) + if err != nil { + return err + } + var serverIDs []int64 + for _, session := range sessions { + serverIDs = append(serverIDs, session.ServerID) + } + tasks := i.metaTable.GetUnassignedTasks(serverIDs) + for _, taskQueue := range tasks { + i.assignChan <- taskQueue + } + + return nil +} diff --git a/internal/indexservice/meta_table.go b/internal/indexservice/meta_table.go index 02be7c0e9f4074eaa17df4d2275283241c14c9ce..2e8eb4be9bea7527107f407b8f81309220b962e9 100644 --- a/internal/indexservice/meta_table.go +++ b/internal/indexservice/meta_table.go @@ -15,24 +15,37 @@ import ( "fmt" "strconv" "sync" + "time" + + "github.com/milvus-io/milvus/internal/util/retry" "go.uber.org/zap" "github.com/golang/protobuf/proto" - "github.com/milvus-io/milvus/internal/kv" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/indexpb" ) +const ( + RequestTimeout = 10 * time.Second + maxTasks = 10 +) + +type Meta struct { + indexMeta *indexpb.IndexMeta + revision int64 +} + type metaTable struct { - client kv.TxnKV // client of a reliable kv service, i.e. etcd client - indexBuildID2Meta map[UniqueID]indexpb.IndexMeta // index build id to index meta + client *etcdkv.EtcdKV // client of a reliable kv service, i.e. etcd client + indexBuildID2Meta map[UniqueID]Meta // index build id to index meta lock sync.RWMutex } -func NewMetaTable(kv kv.TxnKV) (*metaTable, error) { +func NewMetaTable(kv *etcdkv.EtcdKV) (*metaTable, error) { mt := &metaTable{ client: kv, lock: sync.RWMutex{}, @@ -46,31 +59,70 @@ func NewMetaTable(kv kv.TxnKV) (*metaTable, error) { } func (mt *metaTable) reloadFromKV() error { - mt.indexBuildID2Meta = make(map[UniqueID]indexpb.IndexMeta) + mt.indexBuildID2Meta = make(map[UniqueID]Meta) + key := "indexes" + log.Debug("LoadWithPrefix ", zap.String("prefix", key)) - _, values, err := mt.client.LoadWithPrefix("indexes") + _, values, versions, err := mt.client.LoadWithPrefix2(key) if err != nil { return err } - for _, value := range values { + for i := 0; i < len(values); i++ { indexMeta := indexpb.IndexMeta{} - err = proto.UnmarshalText(value, &indexMeta) + err = proto.UnmarshalText(values[i], &indexMeta) if err != nil { return fmt.Errorf("IndexService metaTable reloadFromKV UnmarshalText indexpb.IndexMeta err:%w", err) } - mt.indexBuildID2Meta[indexMeta.IndexBuildID] = indexMeta + + meta := &Meta{ + indexMeta: &indexMeta, + revision: versions[i], + } + mt.indexBuildID2Meta[indexMeta.IndexBuildID] = *meta } return nil } // metaTable.lock.Lock() before call this function -func (mt *metaTable) saveIndexMeta(meta *indexpb.IndexMeta) error { - value := proto.MarshalTextString(meta) +func (mt *metaTable) saveIndexMeta(meta *Meta) error { + value := proto.MarshalTextString(meta.indexMeta) + + key := "indexes/" + strconv.FormatInt(meta.indexMeta.IndexBuildID, 10) + log.Debug("LoadWithPrefix ", zap.String("prefix", key)) + + err := mt.client.CompareVersionAndSwap(key, meta.revision, value) + if err != nil { + return err + } + + meta.revision = meta.revision + 1 + mt.indexBuildID2Meta[meta.indexMeta.IndexBuildID] = *meta + + return nil +} + +func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) { + key := "indexes/" + strconv.FormatInt(indexBuildID, 10) - mt.indexBuildID2Meta[meta.IndexBuildID] = *meta + _, values, version, err := mt.client.LoadWithPrefix2(key) + if err != nil { + return nil, err + } + im := &indexpb.IndexMeta{} + err = proto.UnmarshalText(values[0], im) + if err != nil { + return nil, err + } + if im.State == commonpb.IndexState_Finished { + return nil, nil + } + m := &Meta{ + revision: version[0], + indexMeta: im, + } - return mt.client.Save("/indexes/"+strconv.FormatInt(meta.IndexBuildID, 10), value) + return m, nil } func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequest) error { @@ -81,67 +133,115 @@ func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequ if ok { return fmt.Errorf("index already exists with ID = %d", indexBuildID) } - meta := &indexpb.IndexMeta{ - State: commonpb.IndexState_Unissued, - IndexBuildID: indexBuildID, - Req: req, + meta := &Meta{ + indexMeta: &indexpb.IndexMeta{ + State: commonpb.IndexState_Unissued, + IndexBuildID: indexBuildID, + Req: req, + NodeServerID: 0, + Version: 0, + }, + revision: 0, } return mt.saveIndexMeta(meta) } -func (mt *metaTable) BuildIndex(indexBuildID UniqueID) error { +func (mt *metaTable) BuildIndex(indexBuildID UniqueID, serverID int64) error { mt.lock.Lock() defer mt.lock.Unlock() - log.Debug("IndexService update index state") + log.Debug("IndexService update index meta") meta, ok := mt.indexBuildID2Meta[indexBuildID] if !ok { return fmt.Errorf("index not exists with ID = %d", indexBuildID) } - if meta.State != commonpb.IndexState_Unissued { - return fmt.Errorf("can not update index state, index with ID = %d state is %d", indexBuildID, meta.State) + if meta.indexMeta.State != commonpb.IndexState_Unissued { + return fmt.Errorf("can not set lease key, index with ID = %d state is %d", indexBuildID, meta.indexMeta.State) } - meta.State = commonpb.IndexState_InProgress - return mt.saveIndexMeta(&meta) + meta.indexMeta.NodeServerID = serverID + + err := mt.saveIndexMeta(&meta) + if err != nil { + fn := func() error { + m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID) + if m == nil { + return err + } + m.indexMeta.NodeServerID = serverID + return mt.saveIndexMeta(m) + } + err2 := retry.Retry(5, time.Millisecond*200, fn) + if err2 != nil { + return err2 + } + } + + return nil } -func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error { +func (mt *metaTable) UpdateVersion(indexBuildID UniqueID) error { mt.lock.Lock() defer mt.lock.Unlock() + log.Debug("IndexService update index version") + meta, ok := mt.indexBuildID2Meta[indexBuildID] + if !ok { + return fmt.Errorf("index not exists with ID = %d", indexBuildID) + } - log.Debug("indexservice", zap.Int64("mark index is deleted", indexID)) + if meta.indexMeta.State != commonpb.IndexState_Unissued { + return fmt.Errorf("can not set lease key, index with ID = %d state is %d", indexBuildID, meta.indexMeta.State) + } - for indexBuildID, meta := range mt.indexBuildID2Meta { - if meta.Req.IndexID == indexID { - meta.MarkDeleted = true - mt.indexBuildID2Meta[indexBuildID] = meta + meta.indexMeta.Version = meta.indexMeta.Version + 1 + + err := mt.saveIndexMeta(&meta) + if err != nil { + fn := func() error { + m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID) + if m == nil { + return err + } + m.indexMeta.Version = m.indexMeta.Version + 1 + + return mt.saveIndexMeta(m) } + err2 := retry.Retry(5, time.Millisecond*200, fn) + return err2 } return nil } -func (mt *metaTable) NotifyBuildIndex(nty *indexpb.NotifyBuildIndexRequest) error { +func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error { mt.lock.Lock() defer mt.lock.Unlock() - log.Debug("indexservice", zap.Int64("notify build index", nty.IndexBuildID)) - indexBuildID := nty.IndexBuildID - meta, ok := mt.indexBuildID2Meta[indexBuildID] - if !ok { - return fmt.Errorf("index not exists with ID = %d", indexBuildID) - } + log.Debug("indexservice", zap.Int64("mark index is deleted", indexID)) - if nty.Status.ErrorCode != commonpb.ErrorCode_Success { - meta.State = commonpb.IndexState_Failed - meta.FailReason = nty.Status.Reason - } else { - meta.State = commonpb.IndexState_Finished - meta.IndexFilePaths = nty.IndexFilePaths + for _, meta := range mt.indexBuildID2Meta { + if meta.indexMeta.Req.IndexID == indexID { + meta.indexMeta.MarkDeleted = true + if err := mt.saveIndexMeta(&meta); err != nil { + log.Debug("IndexService", zap.Any("Meta table mark deleted err", err.Error())) + fn := func() error { + m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID) + if m == nil { + return err + } + + m.indexMeta.MarkDeleted = true + return mt.saveIndexMeta(m) + } + err2 := retry.Retry(5, time.Millisecond*200, fn) + if err2 != nil { + return err2 + } + } + } } - return mt.saveIndexMeta(&meta) + return nil } func (mt *metaTable) GetIndexState(indexBuildID UniqueID) (*indexpb.IndexInfo, error) { @@ -154,13 +254,13 @@ func (mt *metaTable) GetIndexState(indexBuildID UniqueID) (*indexpb.IndexInfo, e if !ok { return ret, fmt.Errorf("index not exists with ID = %d", indexBuildID) } - if meta.MarkDeleted { + if meta.indexMeta.MarkDeleted { return ret, fmt.Errorf("index not exists with ID = %d", indexBuildID) } - ret.IndexID = meta.Req.IndexID - ret.IndexName = meta.Req.IndexName - ret.Reason = meta.FailReason - ret.State = meta.State + ret.IndexID = meta.indexMeta.Req.IndexID + ret.IndexName = meta.indexMeta.Req.IndexName + ret.Reason = meta.indexMeta.FailReason + ret.State = meta.indexMeta.State return ret, nil } @@ -174,10 +274,10 @@ func (mt *metaTable) GetIndexFilePathInfo(indexBuildID UniqueID) (*indexpb.Index if !ok { return nil, fmt.Errorf("index not exists with ID = %d", indexBuildID) } - if meta.MarkDeleted { + if meta.indexMeta.MarkDeleted { return nil, fmt.Errorf("index not exists with ID = %d", indexBuildID) } - ret.IndexFilePaths = meta.IndexFilePaths + ret.IndexFilePaths = meta.indexMeta.IndexFilePaths return ret, nil } @@ -186,23 +286,227 @@ func (mt *metaTable) DeleteIndex(indexBuildID UniqueID) { defer mt.lock.Unlock() delete(mt.indexBuildID2Meta, indexBuildID) + key := "indexes/" + strconv.FormatInt(indexBuildID, 10) + + err := mt.client.Remove(key) + if err != nil { + log.Debug("IndexService", zap.Any("Delete IndexMeta in etcd error", err)) + } } -func (mt *metaTable) getMarkDeleted(limit int) []indexpb.IndexMeta { +func (mt *metaTable) UpdateRecycleState(indexBuildID UniqueID) error { mt.lock.Lock() defer mt.lock.Unlock() - log.Debug("IndexService get mark deleted meta") + meta, ok := mt.indexBuildID2Meta[indexBuildID] + if !ok { + return fmt.Errorf("index not exists with ID = %d", indexBuildID) + } + + meta.indexMeta.Recycled = true + if err := mt.saveIndexMeta(&meta); err != nil { + fn := func() error { + m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID) + if m == nil { + return err + } - var indexMetas []indexpb.IndexMeta + m.indexMeta.Recycled = true + return mt.saveIndexMeta(m) + } + err2 := retry.Retry(5, time.Millisecond*200, fn) + if err2 != nil { + return err2 + } + } + + return nil +} + +func (mt *metaTable) GetUnusedIndexFiles(limit int) []Meta { + mt.lock.Lock() + defer mt.lock.Unlock() + + var metas []Meta for _, meta := range mt.indexBuildID2Meta { - if meta.MarkDeleted && meta.State == commonpb.IndexState_Finished { - indexMetas = append(indexMetas, meta) + if meta.indexMeta.State == commonpb.IndexState_Finished && (meta.indexMeta.MarkDeleted || !meta.indexMeta.Recycled) { + metas = append(metas, meta) + } + if len(metas) >= limit { + return metas + } + } + + return metas +} + +func (mt *metaTable) GetIndexMeta(indexBuildID UniqueID) Meta { + mt.lock.Lock() + defer mt.lock.Unlock() + + meta, ok := mt.indexBuildID2Meta[indexBuildID] + if !ok { + log.Debug("IndexService", zap.Any("Meta table does not have the meta with indexBuildID", indexBuildID)) + } + + return meta +} + +func (mt *metaTable) GetUnassignedTasks(serverIDs []int64) [][]UniqueID { + var tasks [][]UniqueID + var indexBuildIDs []UniqueID + + for indexBuildID, meta := range mt.indexBuildID2Meta { + alive := false + for _, serverID := range serverIDs { + if meta.indexMeta.NodeServerID == serverID { + alive = true + } + } + if !alive { + indexBuildIDs = append(indexBuildIDs, indexBuildID) } - if len(indexMetas) >= limit { - return indexMetas + if len(indexBuildIDs) >= 10 { + tasks = append(tasks, indexBuildIDs) + indexBuildIDs = []UniqueID{} + } + } + tasks = append(tasks, indexBuildIDs) + + return tasks +} + +func compare2Array(arr1, arr2 interface{}) bool { + p1, ok := arr1.([]*commonpb.KeyValuePair) + if ok { + p2, ok1 := arr2.([]*commonpb.KeyValuePair) + if ok1 { + for _, param1 := range p1 { + sameParams := false + for _, param2 := range p2 { + if param1.Key == param2.Key && param1.Value == param2.Value { + sameParams = true + } + } + if !sameParams { + return false + } + } + return true } + log.Error("indexservice", zap.Any("type error", "arr2 type should be commonpb.KeyValuePair")) + return false } + v1, ok2 := arr1.([]string) + if ok2 { + v2, ok3 := arr2.([]string) + if ok3 { + for _, s1 := range v1 { + sameParams := false + for _, s2 := range v2 { + if s1 == s2 { + sameParams = true + } + } + if !sameParams { + return false + } + } + return true + } + log.Error("indexservice", zap.Any("type error", "arr2 type should be string array")) + return false + } + log.Error("indexservice", zap.Any("type error", "param type should be commonpb.KeyValuePair or string array")) + return false +} + +func (mt *metaTable) HasSameReq(req *indexpb.BuildIndexRequest) (bool, UniqueID) { + mt.lock.Lock() + defer mt.lock.Unlock() + +LOOP: + for _, meta := range mt.indexBuildID2Meta { + if meta.indexMeta.Req.IndexID == req.IndexID { + if len(meta.indexMeta.Req.DataPaths) != len(req.DataPaths) { + goto LOOP + } + if len(meta.indexMeta.Req.IndexParams) == len(req.IndexParams) && + compare2Array(meta.indexMeta.Req.DataPaths, req.DataPaths) { + if !compare2Array(meta.indexMeta.Req.IndexParams, req.IndexParams) || + !compare2Array(meta.indexMeta.Req.TypeParams, req.TypeParams) { + goto LOOP + } + return true, meta.indexMeta.IndexBuildID + } + } + } + return false, -1 +} + +func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool { + mt.lock.Lock() + defer mt.lock.Unlock() + + meta, ok := mt.indexBuildID2Meta[indexBuildID] + if ok { + if meta.revision >= revision { + return false + } + } + + m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID) + if m == nil { + log.Debug("IndexService", zap.Any("Load meta from etcd error", err)) + return false + } + + log.Debug("IndexService", zap.Any("IndexMeta", m)) + mt.indexBuildID2Meta[indexBuildID] = *m + + return true +} + +type nodeTasks struct { + nodeID2Tasks map[int64][]UniqueID +} + +func NewNodeTasks() *nodeTasks { + return &nodeTasks{ + nodeID2Tasks: map[int64][]UniqueID{}, + } +} + +func (nt *nodeTasks) getTasksByLeaseKey(serverID int64) []UniqueID { + indexBuildIDs, ok := nt.nodeID2Tasks[serverID] + if !ok { + return nil + } + return indexBuildIDs +} + +func (nt *nodeTasks) assignTask(serverID int64, indexBuildID UniqueID) { + indexBuildIDs, ok := nt.nodeID2Tasks[serverID] + if !ok { + var IDs []UniqueID + IDs = append(IDs, indexBuildID) + nt.nodeID2Tasks[serverID] = IDs + return + } + indexBuildIDs = append(indexBuildIDs, indexBuildID) + nt.nodeID2Tasks[serverID] = indexBuildIDs +} + +func (nt *nodeTasks) finishTask(indexBuildID UniqueID) { + for serverID := range nt.nodeID2Tasks { + for i, buildID := range nt.nodeID2Tasks[serverID] { + if buildID == indexBuildID { + nt.nodeID2Tasks[serverID] = append(nt.nodeID2Tasks[serverID][:i], nt.nodeID2Tasks[serverID][:i+1]...) + } + } + } +} - return indexMetas +func (nt *nodeTasks) delete(serverID int64) { + delete(nt.nodeID2Tasks, serverID) } diff --git a/internal/indexservice/node_mgr.go b/internal/indexservice/node_mgr.go index 750c5be4b0d475f62330f0bee685dac20f80662a..0131a609076b7ebcf78b9b6e8b4d09ebfb896a02 100644 --- a/internal/indexservice/node_mgr.go +++ b/internal/indexservice/node_mgr.go @@ -53,6 +53,7 @@ func (i *IndexService) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest value: nodeClient, key: nodeID, addr: req.Address, + serverID: req.ServerID, priority: 0, } i.nodeClients.Push(item) diff --git a/internal/indexservice/priority_queue.go b/internal/indexservice/priority_queue.go index dce7e6ed0da0c70975937200659c66a7715ebe48..571ad55ff4ac2842f4d928b2c505272a39bc22e3 100644 --- a/internal/indexservice/priority_queue.go +++ b/internal/indexservice/priority_queue.go @@ -21,9 +21,12 @@ import ( // An Item is something we manage in a priority queue. type PQItem struct { - value types.IndexNode // The value of the item; arbitrary. - key UniqueID - addr *commonpb.Address + value types.IndexNode // The value of the item; arbitrary. + key UniqueID + addr *commonpb.Address + + serverID int64 + priority int // The priority of the item in the queue. // The index is needed by update and is maintained by the heap.Interface methods. index int // The index of the item in the heap. @@ -135,12 +138,13 @@ func (pq *PriorityQueue) Peek() interface{} { //return item.value } -func (pq *PriorityQueue) PeekClient() (UniqueID, types.IndexNode) { +// PeekClient picks an IndexNode with the lowest load. +func (pq *PriorityQueue) PeekClient() (UniqueID, types.IndexNode, int64) { item := pq.Peek() if item == nil { - return UniqueID(-1), nil + return UniqueID(-1), nil, 0 } - return item.(*PQItem).key, item.(*PQItem).value + return item.(*PQItem).key, item.(*PQItem).value, item.(*PQItem).serverID } func (pq *PriorityQueue) PeekAllClients() []types.IndexNode { diff --git a/internal/indexservice/task.go b/internal/indexservice/task.go index f23a909ba7590decf95b337df2289d3c026d8c0d..44b2e5dba904f9714a00d718f30dbf86be6a9db1 100644 --- a/internal/indexservice/task.go +++ b/internal/indexservice/task.go @@ -15,12 +15,11 @@ import ( "context" "errors" + "github.com/milvus-io/milvus/internal/log" "go.uber.org/zap" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/kv" - "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/types" ) @@ -108,36 +107,16 @@ func (it *IndexAddTask) OnEnqueue() error { } func (it *IndexAddTask) PreExecute(ctx context.Context) error { - log.Debug("pretend to check Index Req") - nodeID, builderClient := it.nodeClients.PeekClient() - if builderClient == nil { - return errors.New("IndexAddTask Service not available") - } - it.builderClient = builderClient - it.buildClientNodeID = nodeID - err := it.table.AddIndex(it.indexBuildID, it.req) - if err != nil { - return err - } + it.req.IndexBuildID = it.indexBuildID return nil } func (it *IndexAddTask) Execute(ctx context.Context) error { - it.req.IndexBuildID = it.indexBuildID - log.Debug("before index ...") - resp, err := it.builderClient.BuildIndex(ctx, it.req) - if err != nil { - log.Debug("indexservice", zap.String("build index finish err", err.Error())) - return err - } - if resp.ErrorCode != commonpb.ErrorCode_Success { - return errors.New(resp.Reason) - } - err = it.table.BuildIndex(it.indexBuildID) + log.Debug("IndexService", zap.Any("BuildIndex, IndexBuildID = ", it.indexBuildID)) + err := it.table.AddIndex(it.indexBuildID, it.req) if err != nil { return err } - it.nodeClients.IncPriority(it.buildClientNodeID, 1) return nil } diff --git a/internal/kv/etcd/etcd_kv.go b/internal/kv/etcd/etcd_kv.go index af3e6fdf06ce74d858aa38748e96ec17d0aeaed2..09a486ab65175dbae0034409d7bf951d2d7b39d3 100644 --- a/internal/kv/etcd/etcd_kv.go +++ b/internal/kv/etcd/etcd_kv.go @@ -68,6 +68,27 @@ func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) { return keys, values, nil } +func (kv *EtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) { + key = path.Join(kv.rootPath, key) + log.Debug("LoadWithPrefix ", zap.String("prefix", key)) + ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) + defer cancel() + resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + if err != nil { + return nil, nil, nil, err + } + keys := make([]string, 0, resp.Count) + values := make([]string, 0, resp.Count) + versions := make([]int64, 0, resp.Count) + for _, kv := range resp.Kvs { + keys = append(keys, string(kv.Key)) + values = append(values, string(kv.Value)) + versions = append(versions, kv.Version) + } + return keys, values, versions, nil +} + func (kv *EtcdKV) Load(key string) (string, error) { key = path.Join(kv.rootPath, key) ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout) diff --git a/internal/proto/index_service.proto b/internal/proto/index_service.proto index 4706d86307edcdbd9bf4867e63862f9654373844..e9d01550cd1ea9c7db7a4ed32973e4af17a89d56 100644 --- a/internal/proto/index_service.proto +++ b/internal/proto/index_service.proto @@ -16,7 +16,6 @@ service IndexService { rpc BuildIndex(BuildIndexRequest) returns (BuildIndexResponse){} rpc GetIndexStates(GetIndexStatesRequest) returns (GetIndexStatesResponse) {} rpc GetIndexFilePaths(GetIndexFilePathsRequest) returns (GetIndexFilePathsResponse){} - rpc NotifyBuildIndex(NotifyBuildIndexRequest) returns (common.Status) {} rpc DropIndex(DropIndexRequest) returns (common.Status) {} } @@ -24,13 +23,13 @@ service IndexNode { rpc GetComponentStates(internal.GetComponentStatesRequest) returns (internal.ComponentStates) {} rpc GetTimeTickChannel(internal.GetTimeTickChannelRequest) returns(milvus.StringResponse) {} rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) returns(milvus.StringResponse){} - rpc BuildIndex(BuildIndexRequest) returns (common.Status){} - rpc DropIndex(DropIndexRequest) returns (common.Status) {} + rpc CreateIndex(CreateIndexRequest) returns (common.Status){} } message RegisterNodeRequest { common.MsgBase base = 1; common.Address address = 2; + int64 serverID = 3; } message RegisterNodeResponse { @@ -55,25 +54,29 @@ message GetIndexStatesResponse { repeated IndexInfo states = 2; } -message BuildIndexRequest { +message CreateIndexRequest { int64 indexBuildID = 1; string index_name = 2; int64 indexID = 3; - repeated string data_paths = 4; - repeated common.KeyValuePair type_params = 5; - repeated common.KeyValuePair index_params = 6; + int64 version = 4; + string meta_path = 5; + repeated string data_paths = 6; + repeated common.KeyValuePair type_params = 7; + repeated common.KeyValuePair index_params = 8; } -message BuildIndexResponse { - common.Status status = 1; - int64 indexBuildID = 2; +message BuildIndexRequest { + int64 indexBuildID = 1; + string index_name = 2; + int64 indexID = 3; + repeated string data_paths = 5; + repeated common.KeyValuePair type_params = 6; + repeated common.KeyValuePair index_params = 7; } -message NotifyBuildIndexRequest { +message BuildIndexResponse { common.Status status = 1; int64 indexBuildID = 2; - repeated string index_file_paths = 3; - int64 nodeID = 4; } message GetIndexFilePathsRequest { @@ -98,6 +101,9 @@ message IndexMeta { BuildIndexRequest req = 4; repeated string index_file_paths = 5; bool mark_deleted = 6; + int64 node_serverID = 7; + int64 version = 8; + bool recycled = 9; } message DropIndexRequest { diff --git a/internal/proto/indexpb/index_service.pb.go b/internal/proto/indexpb/index_service.pb.go index 96891ba3781021f9dd08c62fa9f6addb0204cbd9..9a38aa672a761a14ceaa77550b7dedd864af0f3d 100644 --- a/internal/proto/indexpb/index_service.pb.go +++ b/internal/proto/indexpb/index_service.pb.go @@ -30,6 +30,7 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package type RegisterNodeRequest struct { Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` Address *commonpb.Address `protobuf:"bytes,2,opt,name=address,proto3" json:"address,omitempty"` + ServerID int64 `protobuf:"varint,3,opt,name=serverID,proto3" json:"serverID,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -74,6 +75,13 @@ func (m *RegisterNodeRequest) GetAddress() *commonpb.Address { return nil } +func (m *RegisterNodeRequest) GetServerID() int64 { + if m != nil { + return m.ServerID + } + return 0 +} + type RegisterNodeResponse struct { Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` InitParams *internalpb.InitParams `protobuf:"bytes,2,opt,name=init_params,json=initParams,proto3" json:"init_params,omitempty"` @@ -278,13 +286,108 @@ func (m *GetIndexStatesResponse) GetStates() []*IndexInfo { return nil } +type CreateIndexRequest struct { + IndexBuildID int64 `protobuf:"varint,1,opt,name=indexBuildID,proto3" json:"indexBuildID,omitempty"` + IndexName string `protobuf:"bytes,2,opt,name=index_name,json=indexName,proto3" json:"index_name,omitempty"` + IndexID int64 `protobuf:"varint,3,opt,name=indexID,proto3" json:"indexID,omitempty"` + Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` + MetaPath string `protobuf:"bytes,5,opt,name=meta_path,json=metaPath,proto3" json:"meta_path,omitempty"` + DataPaths []string `protobuf:"bytes,6,rep,name=data_paths,json=dataPaths,proto3" json:"data_paths,omitempty"` + TypeParams []*commonpb.KeyValuePair `protobuf:"bytes,7,rep,name=type_params,json=typeParams,proto3" json:"type_params,omitempty"` + IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,8,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CreateIndexRequest) Reset() { *m = CreateIndexRequest{} } +func (m *CreateIndexRequest) String() string { return proto.CompactTextString(m) } +func (*CreateIndexRequest) ProtoMessage() {} +func (*CreateIndexRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_a5d2036b4df73e0a, []int{5} +} + +func (m *CreateIndexRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_CreateIndexRequest.Unmarshal(m, b) +} +func (m *CreateIndexRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_CreateIndexRequest.Marshal(b, m, deterministic) +} +func (m *CreateIndexRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_CreateIndexRequest.Merge(m, src) +} +func (m *CreateIndexRequest) XXX_Size() int { + return xxx_messageInfo_CreateIndexRequest.Size(m) +} +func (m *CreateIndexRequest) XXX_DiscardUnknown() { + xxx_messageInfo_CreateIndexRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_CreateIndexRequest proto.InternalMessageInfo + +func (m *CreateIndexRequest) GetIndexBuildID() int64 { + if m != nil { + return m.IndexBuildID + } + return 0 +} + +func (m *CreateIndexRequest) GetIndexName() string { + if m != nil { + return m.IndexName + } + return "" +} + +func (m *CreateIndexRequest) GetIndexID() int64 { + if m != nil { + return m.IndexID + } + return 0 +} + +func (m *CreateIndexRequest) GetVersion() int64 { + if m != nil { + return m.Version + } + return 0 +} + +func (m *CreateIndexRequest) GetMetaPath() string { + if m != nil { + return m.MetaPath + } + return "" +} + +func (m *CreateIndexRequest) GetDataPaths() []string { + if m != nil { + return m.DataPaths + } + return nil +} + +func (m *CreateIndexRequest) GetTypeParams() []*commonpb.KeyValuePair { + if m != nil { + return m.TypeParams + } + return nil +} + +func (m *CreateIndexRequest) GetIndexParams() []*commonpb.KeyValuePair { + if m != nil { + return m.IndexParams + } + return nil +} + type BuildIndexRequest struct { IndexBuildID int64 `protobuf:"varint,1,opt,name=indexBuildID,proto3" json:"indexBuildID,omitempty"` IndexName string `protobuf:"bytes,2,opt,name=index_name,json=indexName,proto3" json:"index_name,omitempty"` IndexID int64 `protobuf:"varint,3,opt,name=indexID,proto3" json:"indexID,omitempty"` - DataPaths []string `protobuf:"bytes,4,rep,name=data_paths,json=dataPaths,proto3" json:"data_paths,omitempty"` - TypeParams []*commonpb.KeyValuePair `protobuf:"bytes,5,rep,name=type_params,json=typeParams,proto3" json:"type_params,omitempty"` - IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,6,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"` + DataPaths []string `protobuf:"bytes,5,rep,name=data_paths,json=dataPaths,proto3" json:"data_paths,omitempty"` + TypeParams []*commonpb.KeyValuePair `protobuf:"bytes,6,rep,name=type_params,json=typeParams,proto3" json:"type_params,omitempty"` + IndexParams []*commonpb.KeyValuePair `protobuf:"bytes,7,rep,name=index_params,json=indexParams,proto3" json:"index_params,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -294,7 +397,7 @@ func (m *BuildIndexRequest) Reset() { *m = BuildIndexRequest{} } func (m *BuildIndexRequest) String() string { return proto.CompactTextString(m) } func (*BuildIndexRequest) ProtoMessage() {} func (*BuildIndexRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_a5d2036b4df73e0a, []int{5} + return fileDescriptor_a5d2036b4df73e0a, []int{6} } func (m *BuildIndexRequest) XXX_Unmarshal(b []byte) error { @@ -369,7 +472,7 @@ func (m *BuildIndexResponse) Reset() { *m = BuildIndexResponse{} } func (m *BuildIndexResponse) String() string { return proto.CompactTextString(m) } func (*BuildIndexResponse) ProtoMessage() {} func (*BuildIndexResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_a5d2036b4df73e0a, []int{6} + return fileDescriptor_a5d2036b4df73e0a, []int{7} } func (m *BuildIndexResponse) XXX_Unmarshal(b []byte) error { @@ -404,69 +507,6 @@ func (m *BuildIndexResponse) GetIndexBuildID() int64 { return 0 } -type NotifyBuildIndexRequest struct { - Status *commonpb.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` - IndexBuildID int64 `protobuf:"varint,2,opt,name=indexBuildID,proto3" json:"indexBuildID,omitempty"` - IndexFilePaths []string `protobuf:"bytes,3,rep,name=index_file_paths,json=indexFilePaths,proto3" json:"index_file_paths,omitempty"` - NodeID int64 `protobuf:"varint,4,opt,name=nodeID,proto3" json:"nodeID,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *NotifyBuildIndexRequest) Reset() { *m = NotifyBuildIndexRequest{} } -func (m *NotifyBuildIndexRequest) String() string { return proto.CompactTextString(m) } -func (*NotifyBuildIndexRequest) ProtoMessage() {} -func (*NotifyBuildIndexRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_a5d2036b4df73e0a, []int{7} -} - -func (m *NotifyBuildIndexRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_NotifyBuildIndexRequest.Unmarshal(m, b) -} -func (m *NotifyBuildIndexRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_NotifyBuildIndexRequest.Marshal(b, m, deterministic) -} -func (m *NotifyBuildIndexRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_NotifyBuildIndexRequest.Merge(m, src) -} -func (m *NotifyBuildIndexRequest) XXX_Size() int { - return xxx_messageInfo_NotifyBuildIndexRequest.Size(m) -} -func (m *NotifyBuildIndexRequest) XXX_DiscardUnknown() { - xxx_messageInfo_NotifyBuildIndexRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_NotifyBuildIndexRequest proto.InternalMessageInfo - -func (m *NotifyBuildIndexRequest) GetStatus() *commonpb.Status { - if m != nil { - return m.Status - } - return nil -} - -func (m *NotifyBuildIndexRequest) GetIndexBuildID() int64 { - if m != nil { - return m.IndexBuildID - } - return 0 -} - -func (m *NotifyBuildIndexRequest) GetIndexFilePaths() []string { - if m != nil { - return m.IndexFilePaths - } - return nil -} - -func (m *NotifyBuildIndexRequest) GetNodeID() int64 { - if m != nil { - return m.NodeID - } - return 0 -} - type GetIndexFilePathsRequest struct { IndexBuildIDs []int64 `protobuf:"varint,1,rep,packed,name=indexBuildIDs,proto3" json:"indexBuildIDs,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -615,6 +655,9 @@ type IndexMeta struct { Req *BuildIndexRequest `protobuf:"bytes,4,opt,name=req,proto3" json:"req,omitempty"` IndexFilePaths []string `protobuf:"bytes,5,rep,name=index_file_paths,json=indexFilePaths,proto3" json:"index_file_paths,omitempty"` MarkDeleted bool `protobuf:"varint,6,opt,name=mark_deleted,json=markDeleted,proto3" json:"mark_deleted,omitempty"` + NodeServerID int64 `protobuf:"varint,7,opt,name=node_serverID,json=nodeServerID,proto3" json:"node_serverID,omitempty"` + Version int64 `protobuf:"varint,8,opt,name=version,proto3" json:"version,omitempty"` + Recycled bool `protobuf:"varint,9,opt,name=recycled,proto3" json:"recycled,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -687,6 +730,27 @@ func (m *IndexMeta) GetMarkDeleted() bool { return false } +func (m *IndexMeta) GetNodeServerID() int64 { + if m != nil { + return m.NodeServerID + } + return 0 +} + +func (m *IndexMeta) GetVersion() int64 { + if m != nil { + return m.Version + } + return 0 +} + +func (m *IndexMeta) GetRecycled() bool { + if m != nil { + return m.Recycled + } + return false +} + type DropIndexRequest struct { IndexID int64 `protobuf:"varint,1,opt,name=indexID,proto3" json:"indexID,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -732,9 +796,9 @@ func init() { proto.RegisterType((*GetIndexStatesRequest)(nil), "milvus.proto.index.GetIndexStatesRequest") proto.RegisterType((*IndexInfo)(nil), "milvus.proto.index.IndexInfo") proto.RegisterType((*GetIndexStatesResponse)(nil), "milvus.proto.index.GetIndexStatesResponse") + proto.RegisterType((*CreateIndexRequest)(nil), "milvus.proto.index.CreateIndexRequest") proto.RegisterType((*BuildIndexRequest)(nil), "milvus.proto.index.BuildIndexRequest") proto.RegisterType((*BuildIndexResponse)(nil), "milvus.proto.index.BuildIndexResponse") - proto.RegisterType((*NotifyBuildIndexRequest)(nil), "milvus.proto.index.NotifyBuildIndexRequest") proto.RegisterType((*GetIndexFilePathsRequest)(nil), "milvus.proto.index.GetIndexFilePathsRequest") proto.RegisterType((*IndexFilePathInfo)(nil), "milvus.proto.index.IndexFilePathInfo") proto.RegisterType((*GetIndexFilePathsResponse)(nil), "milvus.proto.index.GetIndexFilePathsResponse") @@ -745,65 +809,69 @@ func init() { func init() { proto.RegisterFile("index_service.proto", fileDescriptor_a5d2036b4df73e0a) } var fileDescriptor_a5d2036b4df73e0a = []byte{ - // 914 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x56, 0xcd, 0x6f, 0x1b, 0x45, - 0x14, 0xcf, 0x7a, 0x13, 0x07, 0x3f, 0x9b, 0x28, 0x99, 0x96, 0xb0, 0x18, 0xaa, 0x26, 0x4b, 0x01, - 0x03, 0xad, 0x53, 0xb9, 0x14, 0x4e, 0x48, 0x90, 0x5a, 0x44, 0x16, 0x6a, 0x14, 0x4d, 0x2b, 0x0e, - 0x48, 0x60, 0x8d, 0xbd, 0xcf, 0xce, 0xa8, 0xfb, 0x95, 0x9d, 0x71, 0x45, 0x4e, 0x5c, 0xe0, 0x88, - 0xb8, 0x21, 0xf1, 0x7f, 0x20, 0xf5, 0xdf, 0x43, 0x3b, 0x33, 0xbb, 0xf5, 0xae, 0xd7, 0xae, 0x43, - 0xf8, 0xb8, 0x70, 0xdb, 0x79, 0xf3, 0x7b, 0xdf, 0xbf, 0xf7, 0x66, 0xe1, 0x06, 0x0f, 0x3d, 0xfc, - 0x61, 0x28, 0x30, 0x79, 0xce, 0xc7, 0xd8, 0x8d, 0x93, 0x48, 0x46, 0x84, 0x04, 0xdc, 0x7f, 0x3e, - 0x13, 0xfa, 0xd4, 0x55, 0x88, 0x76, 0x6b, 0x1c, 0x05, 0x41, 0x14, 0x6a, 0x59, 0x7b, 0x87, 0x87, - 0x12, 0x93, 0x90, 0xf9, 0xe6, 0xdc, 0x9a, 0xd7, 0x70, 0x7f, 0x84, 0x1b, 0x14, 0xa7, 0x5c, 0x48, - 0x4c, 0x4e, 0x23, 0x0f, 0x29, 0x5e, 0xcc, 0x50, 0x48, 0x72, 0x1f, 0x36, 0x47, 0x4c, 0xa0, 0x63, - 0x1d, 0x58, 0x9d, 0x66, 0xef, 0x9d, 0x6e, 0xc1, 0x8b, 0x31, 0xff, 0x58, 0x4c, 0x8f, 0x99, 0x40, - 0xaa, 0x90, 0xe4, 0x53, 0xd8, 0x66, 0x9e, 0x97, 0xa0, 0x10, 0x4e, 0x6d, 0x85, 0xd2, 0x97, 0x1a, - 0x43, 0x33, 0xb0, 0xfb, 0xab, 0x05, 0x37, 0x8b, 0x11, 0x88, 0x38, 0x0a, 0x05, 0x92, 0x07, 0x50, - 0x17, 0x92, 0xc9, 0x99, 0x30, 0x41, 0xbc, 0x5d, 0x69, 0xef, 0x89, 0x82, 0x50, 0x03, 0x25, 0xc7, - 0xd0, 0xe4, 0x21, 0x97, 0xc3, 0x98, 0x25, 0x2c, 0xc8, 0x22, 0x39, 0xec, 0x96, 0x8a, 0x64, 0xea, - 0x31, 0x08, 0xb9, 0x3c, 0x53, 0x40, 0x0a, 0x3c, 0xff, 0x76, 0x3f, 0x87, 0x37, 0x4e, 0x50, 0x0e, - 0xd2, 0x52, 0xa6, 0xd6, 0x51, 0x64, 0x45, 0xb9, 0x03, 0xaf, 0xab, 0x02, 0x1f, 0xcf, 0xb8, 0xef, - 0x0d, 0xfa, 0x69, 0x60, 0x76, 0xc7, 0xa6, 0x45, 0xa1, 0xfb, 0xc2, 0x82, 0x86, 0x52, 0x1e, 0x84, - 0x93, 0x88, 0x3c, 0x84, 0xad, 0x34, 0x34, 0x5d, 0xc9, 0x9d, 0xde, 0xed, 0xca, 0x24, 0x5e, 0xfa, - 0xa2, 0x1a, 0x4d, 0x5c, 0x68, 0xcd, 0x5b, 0x55, 0x89, 0xd8, 0xb4, 0x20, 0x23, 0x0e, 0x6c, 0xab, - 0xf3, 0xa0, 0xef, 0xd8, 0xea, 0x3a, 0x3b, 0x92, 0x5b, 0x00, 0x9a, 0x2b, 0x21, 0x0b, 0xd0, 0xd9, - 0x3c, 0xb0, 0x3a, 0x0d, 0xda, 0x50, 0x92, 0x53, 0x16, 0x20, 0xd9, 0x87, 0x7a, 0x82, 0x4c, 0x44, - 0xa1, 0xb3, 0xa5, 0xae, 0xcc, 0xc9, 0xfd, 0xc9, 0x82, 0xfd, 0x72, 0xe6, 0xd7, 0x69, 0xc6, 0x43, - 0xad, 0x84, 0x69, 0x1f, 0xec, 0x4e, 0xb3, 0x77, 0xab, 0xbb, 0x48, 0xd6, 0x6e, 0x5e, 0x2a, 0x6a, - 0xc0, 0xee, 0xef, 0x35, 0xd8, 0xd3, 0x39, 0xa6, 0x57, 0x59, 0xf1, 0xcb, 0x15, 0xb1, 0x2a, 0x2a, - 0x52, 0xcc, 0xbb, 0x56, 0xce, 0x7b, 0x65, 0xc1, 0x3c, 0x26, 0xd9, 0x30, 0x66, 0xf2, 0x5c, 0x38, - 0x9b, 0x07, 0x76, 0xaa, 0x98, 0x4a, 0xce, 0x52, 0x41, 0xca, 0x2a, 0x79, 0x19, 0x63, 0xc6, 0xaa, - 0x2d, 0x95, 0xcd, 0x61, 0x65, 0x09, 0xbe, 0xc6, 0xcb, 0x6f, 0x98, 0x3f, 0xc3, 0x33, 0xc6, 0x13, - 0x0a, 0xa9, 0x96, 0x66, 0x15, 0xe9, 0x9b, 0xf8, 0x33, 0x23, 0xf5, 0x75, 0x8d, 0x34, 0x95, 0x9a, - 0xe1, 0x66, 0x00, 0x64, 0xbe, 0x34, 0xd7, 0xe9, 0xce, 0x1a, 0x14, 0x73, 0xff, 0xb0, 0xe0, 0xcd, - 0xd3, 0x48, 0xf2, 0xc9, 0xe5, 0x62, 0x43, 0xfe, 0x29, 0xa7, 0xa4, 0x03, 0xbb, 0xba, 0x52, 0x13, - 0xee, 0xa3, 0x69, 0x89, 0xad, 0x5a, 0xb2, 0xa3, 0xe4, 0x5f, 0x71, 0x1f, 0x75, 0x5f, 0xf6, 0xa1, - 0x1e, 0x46, 0x1e, 0x0e, 0xfa, 0x8a, 0xe3, 0x36, 0x35, 0x27, 0xf7, 0x0b, 0x70, 0x32, 0x1e, 0xe7, - 0xe0, 0xab, 0x0d, 0xf1, 0x6f, 0x16, 0xec, 0x15, 0xf4, 0xd5, 0x30, 0xff, 0xf7, 0x29, 0xa7, 0x81, - 0xbd, 0x55, 0x91, 0xdb, 0x75, 0x88, 0xd0, 0x07, 0x98, 0x73, 0xab, 0x47, 0xf5, 0xbd, 0xa5, 0xa3, - 0x3a, 0x5f, 0x10, 0xda, 0x98, 0xe4, 0x81, 0xfd, 0x52, 0x33, 0x6b, 0xef, 0x31, 0x4a, 0xb6, 0xd6, - 0xb4, 0xe6, 0xab, 0xb1, 0x76, 0xa5, 0xd5, 0x78, 0x1b, 0x9a, 0x13, 0xc6, 0xfd, 0xa1, 0x59, 0x61, - 0xb6, 0x9a, 0x72, 0x48, 0x45, 0x54, 0x49, 0xc8, 0x67, 0x60, 0x27, 0x78, 0xa1, 0x28, 0xb1, 0x24, - 0x91, 0x05, 0x32, 0xd3, 0x54, 0xa3, 0xb2, 0x0b, 0x5b, 0x95, 0xc4, 0x3b, 0x84, 0x56, 0xc0, 0x92, - 0x67, 0x43, 0x0f, 0x7d, 0x94, 0xe8, 0x39, 0xf5, 0x03, 0xab, 0xf3, 0x1a, 0x6d, 0xa6, 0xb2, 0xbe, - 0x16, 0xb9, 0x77, 0x61, 0xb7, 0x9f, 0x44, 0x71, 0x61, 0x64, 0xe6, 0x16, 0x90, 0x55, 0x58, 0x40, - 0xbd, 0x17, 0xdb, 0xd0, 0xd2, 0xa9, 0xea, 0xd7, 0x9d, 0xc4, 0x40, 0x4e, 0x50, 0x3e, 0x8a, 0x82, - 0x38, 0x0a, 0x31, 0x94, 0x7a, 0x1d, 0x93, 0xfb, 0x4b, 0x5e, 0xb2, 0x45, 0xa8, 0x71, 0xd9, 0x7e, - 0x7f, 0x89, 0x46, 0x09, 0xee, 0x6e, 0x90, 0x40, 0x79, 0x7c, 0xca, 0x03, 0x7c, 0xca, 0xc7, 0xcf, - 0x1e, 0x9d, 0xb3, 0x30, 0x44, 0x7f, 0x95, 0xc7, 0x12, 0x34, 0xf3, 0xf8, 0x6e, 0x51, 0xc3, 0x1c, - 0x9e, 0xc8, 0x84, 0x87, 0xd3, 0x8c, 0xa8, 0xee, 0x06, 0xb9, 0x80, 0x9b, 0x27, 0xa8, 0xbc, 0x73, - 0x21, 0xf9, 0x58, 0x64, 0x0e, 0x7b, 0xcb, 0x1d, 0x2e, 0x80, 0xaf, 0xe8, 0x72, 0x0c, 0xad, 0xf9, - 0x3f, 0x0d, 0xf2, 0x41, 0x15, 0x37, 0x2a, 0xfe, 0x86, 0xda, 0x9d, 0x57, 0x03, 0x73, 0x27, 0xdf, - 0x01, 0xbc, 0xa4, 0x17, 0x59, 0x8f, 0x7e, 0x8b, 0x5d, 0x2a, 0xc3, 0x72, 0xf3, 0x1c, 0x76, 0x8a, - 0x4f, 0x34, 0xf9, 0xb0, 0x4a, 0xb7, 0xf2, 0x07, 0xa6, 0xfd, 0xd1, 0x3a, 0xd0, 0xdc, 0x55, 0x02, - 0x7b, 0x0b, 0x9b, 0x86, 0xdc, 0x5d, 0x65, 0xa2, 0xbc, 0x6c, 0xdb, 0xf7, 0xd6, 0x44, 0xe7, 0x3e, - 0xbf, 0x87, 0xdd, 0xf2, 0x7b, 0x43, 0x3e, 0xae, 0x32, 0xb2, 0xe4, 0x55, 0x6a, 0xaf, 0xda, 0x78, - 0xee, 0x06, 0x39, 0x83, 0x46, 0x3e, 0x95, 0xe4, 0x4e, 0x95, 0xe1, 0xf2, 0xd0, 0xbe, 0xc2, 0x62, - 0xef, 0xe7, 0x4d, 0xb3, 0xf7, 0x14, 0xa5, 0xfe, 0x1f, 0xdb, 0xbf, 0x7f, 0x6c, 0xe9, 0x5f, 0x99, - 0xa8, 0x7f, 0x9b, 0x07, 0xc7, 0x9f, 0x7c, 0xdb, 0x9b, 0x72, 0x79, 0x3e, 0x1b, 0xa5, 0x37, 0x47, - 0x1a, 0x7a, 0x8f, 0x47, 0xe6, 0xeb, 0x28, 0x2b, 0xc8, 0x91, 0xd2, 0x3e, 0x52, 0x3e, 0xe2, 0xd1, - 0xa8, 0xae, 0x8e, 0x0f, 0xfe, 0x0c, 0x00, 0x00, 0xff, 0xff, 0x2d, 0x81, 0x5b, 0xf5, 0xdc, 0x0d, - 0x00, 0x00, + // 986 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xec, 0x56, 0xdf, 0x6e, 0x1b, 0xc5, + 0x17, 0xce, 0x7a, 0x1b, 0xff, 0x39, 0x76, 0xa3, 0x66, 0xda, 0x5f, 0xb5, 0x3f, 0x97, 0xaa, 0xce, + 0xb6, 0x80, 0x41, 0xad, 0x53, 0xb9, 0x14, 0xae, 0x90, 0x20, 0xb1, 0x88, 0x2c, 0xd4, 0x2a, 0x9a, + 0x44, 0x5c, 0x20, 0x21, 0x6b, 0xe2, 0x3d, 0x49, 0x46, 0xdd, 0x3f, 0xce, 0xce, 0x38, 0x22, 0xf7, + 0xdc, 0x73, 0x87, 0x84, 0x78, 0x0e, 0xc4, 0x73, 0x70, 0xc5, 0x2b, 0xf0, 0x18, 0x68, 0x66, 0x67, + 0xb7, 0xbb, 0xeb, 0x75, 0xe2, 0x90, 0xc2, 0x15, 0x77, 0x7b, 0xce, 0x9c, 0x33, 0xdf, 0x9c, 0x6f, + 0xce, 0xf9, 0x76, 0xe0, 0x2e, 0x0f, 0x3d, 0xfc, 0x7e, 0x22, 0x30, 0x3e, 0xe7, 0x53, 0x1c, 0xcc, + 0xe2, 0x48, 0x46, 0x84, 0x04, 0xdc, 0x3f, 0x9f, 0x8b, 0xc4, 0x1a, 0xe8, 0x88, 0x6e, 0x67, 0x1a, + 0x05, 0x41, 0x14, 0x26, 0xbe, 0xee, 0x06, 0x0f, 0x25, 0xc6, 0x21, 0xf3, 0x8d, 0xdd, 0xc9, 0x67, + 0xb8, 0xbf, 0x58, 0x70, 0x97, 0xe2, 0x09, 0x17, 0x12, 0xe3, 0xd7, 0x91, 0x87, 0x14, 0xcf, 0xe6, + 0x28, 0x24, 0x79, 0x0e, 0xb7, 0x8e, 0x98, 0x40, 0xc7, 0xea, 0x59, 0xfd, 0xf6, 0xf0, 0xbd, 0x41, + 0x01, 0xc6, 0xec, 0xff, 0x4a, 0x9c, 0xec, 0x30, 0x81, 0x54, 0x47, 0x92, 0x4f, 0xa1, 0xc1, 0x3c, + 0x2f, 0x46, 0x21, 0x9c, 0xda, 0x25, 0x49, 0x5f, 0x26, 0x31, 0x34, 0x0d, 0x26, 0x5d, 0x68, 0xaa, + 0x92, 0x30, 0x1e, 0x8f, 0x1c, 0xbb, 0x67, 0xf5, 0x6d, 0x9a, 0xd9, 0xee, 0x8f, 0x16, 0xdc, 0x2b, + 0x9e, 0x4e, 0xcc, 0xa2, 0x50, 0x20, 0x79, 0x01, 0x75, 0x21, 0x99, 0x9c, 0x0b, 0x73, 0xc0, 0x07, + 0x95, 0x58, 0x07, 0x3a, 0x84, 0x9a, 0x50, 0xb2, 0x03, 0x6d, 0x1e, 0x72, 0x39, 0x99, 0xb1, 0x98, + 0x05, 0xe9, 0x29, 0xb7, 0x06, 0x25, 0x06, 0x0d, 0x59, 0xe3, 0x90, 0xcb, 0x7d, 0x1d, 0x48, 0x81, + 0x67, 0xdf, 0xee, 0xe7, 0xf0, 0xbf, 0x3d, 0x94, 0x63, 0xc5, 0xb3, 0xda, 0x1d, 0x45, 0x4a, 0xd8, + 0x13, 0xb8, 0xad, 0xd9, 0xdf, 0x99, 0x73, 0xdf, 0x1b, 0x8f, 0xd4, 0xc1, 0xec, 0xbe, 0x4d, 0x8b, + 0x4e, 0xf7, 0x37, 0x0b, 0x5a, 0x3a, 0x79, 0x1c, 0x1e, 0x47, 0xe4, 0x25, 0xac, 0xab, 0xa3, 0x25, + 0x2c, 0x6f, 0x0c, 0x1f, 0x55, 0x16, 0xf1, 0x16, 0x8b, 0x26, 0xd1, 0xc4, 0x85, 0x4e, 0x7e, 0x57, + 0x5d, 0x88, 0x4d, 0x0b, 0x3e, 0xe2, 0x40, 0x43, 0xdb, 0x19, 0xa9, 0xa9, 0x49, 0x1e, 0x02, 0x24, + 0x8d, 0x14, 0xb2, 0x00, 0x9d, 0x5b, 0x3d, 0xab, 0xdf, 0xa2, 0x2d, 0xed, 0x79, 0xcd, 0x02, 0x24, + 0xf7, 0xa1, 0x1e, 0x23, 0x13, 0x51, 0xe8, 0xac, 0xeb, 0x25, 0x63, 0xb9, 0x3f, 0x58, 0x70, 0xbf, + 0x5c, 0xf9, 0x4d, 0x2e, 0xe3, 0x65, 0x92, 0x84, 0xea, 0x1e, 0xec, 0x7e, 0x7b, 0xf8, 0x70, 0xb0, + 0xd8, 0xc9, 0x83, 0x8c, 0x2a, 0x6a, 0x82, 0xdd, 0xdf, 0x6b, 0x40, 0x76, 0x63, 0x64, 0x12, 0xf5, + 0x5a, 0xca, 0x7e, 0x99, 0x12, 0xab, 0x82, 0x92, 0x62, 0xe1, 0xb5, 0x72, 0xe1, 0xcb, 0x19, 0x73, + 0xa0, 0x71, 0x8e, 0xb1, 0xe0, 0x51, 0xa8, 0xe9, 0xb2, 0x69, 0x6a, 0x92, 0x07, 0xd0, 0x0a, 0x50, + 0xb2, 0xc9, 0x8c, 0xc9, 0x53, 0xc3, 0x57, 0x53, 0x39, 0xf6, 0x99, 0x3c, 0x55, 0x78, 0x1e, 0x33, + 0x8b, 0xc2, 0xa9, 0xf7, 0x6c, 0x85, 0xa7, 0x3c, 0x6a, 0x55, 0x77, 0xa3, 0xbc, 0x98, 0x61, 0xda, + 0x8d, 0x0d, 0xcd, 0xc2, 0x56, 0x25, 0x75, 0x5f, 0xe3, 0xc5, 0x37, 0xcc, 0x9f, 0xe3, 0x3e, 0xe3, + 0x31, 0x05, 0x95, 0x95, 0x74, 0x23, 0x19, 0x99, 0xb2, 0xd3, 0x4d, 0x9a, 0xab, 0x6e, 0xd2, 0xd6, + 0x69, 0xa6, 0xa7, 0x7f, 0xae, 0xc1, 0x66, 0x42, 0xd2, 0xbf, 0x46, 0x69, 0x91, 0x9b, 0xf5, 0x2b, + 0xb8, 0xa9, 0xbf, 0x0b, 0x6e, 0x1a, 0x7f, 0x8b, 0x9b, 0x00, 0x48, 0x9e, 0x9a, 0x9b, 0x74, 0xfc, + 0x0a, 0x63, 0xeb, 0x7e, 0x01, 0x4e, 0x3a, 0x64, 0x5f, 0x71, 0x1f, 0x35, 0x1b, 0xd7, 0x53, 0x98, + 0x9f, 0x2c, 0xd8, 0x2c, 0xe4, 0x6b, 0xa5, 0xf9, 0xa7, 0x0e, 0x4c, 0xfa, 0x70, 0x27, 0x61, 0xf9, + 0x98, 0xfb, 0x68, 0xae, 0xd3, 0xd6, 0xd7, 0xb9, 0xc1, 0x0b, 0x55, 0xa8, 0x83, 0xfd, 0xbf, 0xa2, + 0xb6, 0x9b, 0x30, 0x3a, 0x02, 0xc8, 0xc1, 0x26, 0x3a, 0xf2, 0xfe, 0x52, 0x1d, 0xc9, 0x13, 0x42, + 0x5b, 0xc7, 0xd9, 0xc1, 0xfe, 0xac, 0x19, 0x4d, 0x7e, 0x85, 0x92, 0xad, 0xd4, 0xf6, 0x99, 0x6e, + 0xd7, 0xae, 0xa5, 0xdb, 0x8f, 0xa0, 0x7d, 0xcc, 0xb8, 0x3f, 0x31, 0xfa, 0x6a, 0xeb, 0x71, 0x01, + 0xe5, 0xa2, 0xda, 0x43, 0x3e, 0x03, 0x3b, 0xc6, 0x33, 0x2d, 0x32, 0x4b, 0x0a, 0x59, 0x18, 0x53, + 0xaa, 0x32, 0x2a, 0x6f, 0x61, 0xbd, 0xea, 0x16, 0xc8, 0x16, 0x74, 0x02, 0x16, 0xbf, 0x99, 0x78, + 0xe8, 0xa3, 0x44, 0xcf, 0xa9, 0xf7, 0xac, 0x7e, 0x93, 0xb6, 0x95, 0x6f, 0x94, 0xb8, 0xc8, 0x63, + 0xb8, 0x1d, 0x46, 0x1e, 0x4e, 0xb2, 0xbf, 0x72, 0x23, 0xa1, 0x40, 0x39, 0x0f, 0x8c, 0x2f, 0xaf, + 0x89, 0xcd, 0xa2, 0x26, 0x76, 0xa1, 0x19, 0xe3, 0xf4, 0x62, 0xea, 0xa3, 0xe7, 0xb4, 0xf4, 0xee, + 0x99, 0xed, 0x3e, 0x85, 0x3b, 0xa3, 0x38, 0x9a, 0x15, 0x74, 0x26, 0x27, 0x12, 0x56, 0x41, 0x24, + 0x86, 0x7f, 0xd4, 0xa1, 0x93, 0xb0, 0x98, 0x3c, 0x79, 0xc8, 0x0c, 0xc8, 0x1e, 0xca, 0xdd, 0x28, + 0x98, 0x45, 0x21, 0x86, 0x32, 0xf9, 0x0d, 0x91, 0xe7, 0x4b, 0xfe, 0xe0, 0x8b, 0xa1, 0x06, 0xb2, + 0xfb, 0xc1, 0x92, 0x8c, 0x52, 0xb8, 0xbb, 0x46, 0x02, 0x8d, 0x78, 0xc8, 0x03, 0x3c, 0xe4, 0xd3, + 0x37, 0xbb, 0xa7, 0x2c, 0x0c, 0xd1, 0xbf, 0x0c, 0xb1, 0x14, 0x9a, 0x22, 0x3e, 0x2e, 0x66, 0x18, + 0xe3, 0x40, 0xc6, 0x3c, 0x3c, 0x49, 0x67, 0xc0, 0x5d, 0x23, 0x67, 0x70, 0x6f, 0x0f, 0x35, 0x3a, + 0x17, 0x92, 0x4f, 0x45, 0x0a, 0x38, 0x5c, 0x0e, 0xb8, 0x10, 0x7c, 0x4d, 0xc8, 0x29, 0x74, 0xf2, + 0x2f, 0x2c, 0xf2, 0x61, 0x55, 0xdb, 0x55, 0xbc, 0x10, 0xbb, 0xfd, 0xab, 0x03, 0x33, 0x90, 0xef, + 0x00, 0xde, 0x76, 0x2e, 0x59, 0xad, 0xb3, 0x17, 0x6f, 0xa9, 0x1c, 0x96, 0x6d, 0xcf, 0x61, 0xa3, + 0xf8, 0x34, 0x21, 0x1f, 0x55, 0xe5, 0x56, 0x3e, 0xdc, 0xba, 0x1f, 0xaf, 0x12, 0x9a, 0x41, 0xc5, + 0xb0, 0xb9, 0x20, 0x62, 0xe4, 0xe9, 0x65, 0x5b, 0x94, 0x75, 0xbc, 0xfb, 0x6c, 0xc5, 0xe8, 0x0c, + 0x73, 0x1f, 0x5a, 0xd9, 0xd4, 0x90, 0x27, 0x55, 0xd9, 0xe5, 0xa1, 0xea, 0x5e, 0x26, 0x9f, 0xee, + 0xda, 0xf0, 0x57, 0xdb, 0x48, 0x9e, 0xbe, 0xf2, 0xff, 0xc6, 0xea, 0xdd, 0x8f, 0xd5, 0x21, 0xb4, + 0x73, 0xcf, 0x54, 0x52, 0xd9, 0xcb, 0x8b, 0xef, 0xd8, 0x2b, 0xee, 0x6d, 0xe7, 0x93, 0x6f, 0x87, + 0x27, 0x5c, 0x9e, 0xce, 0x8f, 0xd4, 0xca, 0x76, 0x12, 0xfa, 0x8c, 0x47, 0xe6, 0x6b, 0x3b, 0x2d, + 0x60, 0x5b, 0x67, 0x6f, 0x6b, 0x94, 0xd9, 0xd1, 0x51, 0x5d, 0x9b, 0x2f, 0xfe, 0x0a, 0x00, 0x00, + 0xff, 0xff, 0x9b, 0x6b, 0x7c, 0x8a, 0x41, 0x0e, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -825,7 +893,6 @@ type IndexServiceClient interface { BuildIndex(ctx context.Context, in *BuildIndexRequest, opts ...grpc.CallOption) (*BuildIndexResponse, error) GetIndexStates(ctx context.Context, in *GetIndexStatesRequest, opts ...grpc.CallOption) (*GetIndexStatesResponse, error) GetIndexFilePaths(ctx context.Context, in *GetIndexFilePathsRequest, opts ...grpc.CallOption) (*GetIndexFilePathsResponse, error) - NotifyBuildIndex(ctx context.Context, in *NotifyBuildIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) DropIndex(ctx context.Context, in *DropIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) } @@ -900,15 +967,6 @@ func (c *indexServiceClient) GetIndexFilePaths(ctx context.Context, in *GetIndex return out, nil } -func (c *indexServiceClient) NotifyBuildIndex(ctx context.Context, in *NotifyBuildIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { - out := new(commonpb.Status) - err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexService/NotifyBuildIndex", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - func (c *indexServiceClient) DropIndex(ctx context.Context, in *DropIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { out := new(commonpb.Status) err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexService/DropIndex", in, out, opts...) @@ -927,7 +985,6 @@ type IndexServiceServer interface { BuildIndex(context.Context, *BuildIndexRequest) (*BuildIndexResponse, error) GetIndexStates(context.Context, *GetIndexStatesRequest) (*GetIndexStatesResponse, error) GetIndexFilePaths(context.Context, *GetIndexFilePathsRequest) (*GetIndexFilePathsResponse, error) - NotifyBuildIndex(context.Context, *NotifyBuildIndexRequest) (*commonpb.Status, error) DropIndex(context.Context, *DropIndexRequest) (*commonpb.Status, error) } @@ -956,9 +1013,6 @@ func (*UnimplementedIndexServiceServer) GetIndexStates(ctx context.Context, req func (*UnimplementedIndexServiceServer) GetIndexFilePaths(ctx context.Context, req *GetIndexFilePathsRequest) (*GetIndexFilePathsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetIndexFilePaths not implemented") } -func (*UnimplementedIndexServiceServer) NotifyBuildIndex(ctx context.Context, req *NotifyBuildIndexRequest) (*commonpb.Status, error) { - return nil, status.Errorf(codes.Unimplemented, "method NotifyBuildIndex not implemented") -} func (*UnimplementedIndexServiceServer) DropIndex(ctx context.Context, req *DropIndexRequest) (*commonpb.Status, error) { return nil, status.Errorf(codes.Unimplemented, "method DropIndex not implemented") } @@ -1093,24 +1147,6 @@ func _IndexService_GetIndexFilePaths_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } -func _IndexService_NotifyBuildIndex_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(NotifyBuildIndexRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(IndexServiceServer).NotifyBuildIndex(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/milvus.proto.index.IndexService/NotifyBuildIndex", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(IndexServiceServer).NotifyBuildIndex(ctx, req.(*NotifyBuildIndexRequest)) - } - return interceptor(ctx, in, info, handler) -} - func _IndexService_DropIndex_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(DropIndexRequest) if err := dec(in); err != nil { @@ -1161,10 +1197,6 @@ var _IndexService_serviceDesc = grpc.ServiceDesc{ MethodName: "GetIndexFilePaths", Handler: _IndexService_GetIndexFilePaths_Handler, }, - { - MethodName: "NotifyBuildIndex", - Handler: _IndexService_NotifyBuildIndex_Handler, - }, { MethodName: "DropIndex", Handler: _IndexService_DropIndex_Handler, @@ -1181,8 +1213,7 @@ type IndexNodeClient interface { GetComponentStates(ctx context.Context, in *internalpb.GetComponentStatesRequest, opts ...grpc.CallOption) (*internalpb.ComponentStates, error) GetTimeTickChannel(ctx context.Context, in *internalpb.GetTimeTickChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) GetStatisticsChannel(ctx context.Context, in *internalpb.GetStatisticsChannelRequest, opts ...grpc.CallOption) (*milvuspb.StringResponse, error) - BuildIndex(ctx context.Context, in *BuildIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) - DropIndex(ctx context.Context, in *DropIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) + CreateIndex(ctx context.Context, in *CreateIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) } type indexNodeClient struct { @@ -1220,18 +1251,9 @@ func (c *indexNodeClient) GetStatisticsChannel(ctx context.Context, in *internal return out, nil } -func (c *indexNodeClient) BuildIndex(ctx context.Context, in *BuildIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { +func (c *indexNodeClient) CreateIndex(ctx context.Context, in *CreateIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { out := new(commonpb.Status) - err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexNode/BuildIndex", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *indexNodeClient) DropIndex(ctx context.Context, in *DropIndexRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { - out := new(commonpb.Status) - err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexNode/DropIndex", in, out, opts...) + err := c.cc.Invoke(ctx, "/milvus.proto.index.IndexNode/CreateIndex", in, out, opts...) if err != nil { return nil, err } @@ -1243,8 +1265,7 @@ type IndexNodeServer interface { GetComponentStates(context.Context, *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) GetTimeTickChannel(context.Context, *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) GetStatisticsChannel(context.Context, *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) - BuildIndex(context.Context, *BuildIndexRequest) (*commonpb.Status, error) - DropIndex(context.Context, *DropIndexRequest) (*commonpb.Status, error) + CreateIndex(context.Context, *CreateIndexRequest) (*commonpb.Status, error) } // UnimplementedIndexNodeServer can be embedded to have forward compatible implementations. @@ -1260,11 +1281,8 @@ func (*UnimplementedIndexNodeServer) GetTimeTickChannel(ctx context.Context, req func (*UnimplementedIndexNodeServer) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetStatisticsChannel not implemented") } -func (*UnimplementedIndexNodeServer) BuildIndex(ctx context.Context, req *BuildIndexRequest) (*commonpb.Status, error) { - return nil, status.Errorf(codes.Unimplemented, "method BuildIndex not implemented") -} -func (*UnimplementedIndexNodeServer) DropIndex(ctx context.Context, req *DropIndexRequest) (*commonpb.Status, error) { - return nil, status.Errorf(codes.Unimplemented, "method DropIndex not implemented") +func (*UnimplementedIndexNodeServer) CreateIndex(ctx context.Context, req *CreateIndexRequest) (*commonpb.Status, error) { + return nil, status.Errorf(codes.Unimplemented, "method CreateIndex not implemented") } func RegisterIndexNodeServer(s *grpc.Server, srv IndexNodeServer) { @@ -1325,38 +1343,20 @@ func _IndexNode_GetStatisticsChannel_Handler(srv interface{}, ctx context.Contex return interceptor(ctx, in, info, handler) } -func _IndexNode_BuildIndex_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(BuildIndexRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(IndexNodeServer).BuildIndex(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/milvus.proto.index.IndexNode/BuildIndex", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(IndexNodeServer).BuildIndex(ctx, req.(*BuildIndexRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _IndexNode_DropIndex_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(DropIndexRequest) +func _IndexNode_CreateIndex_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CreateIndexRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(IndexNodeServer).DropIndex(ctx, in) + return srv.(IndexNodeServer).CreateIndex(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/milvus.proto.index.IndexNode/DropIndex", + FullMethod: "/milvus.proto.index.IndexNode/CreateIndex", } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(IndexNodeServer).DropIndex(ctx, req.(*DropIndexRequest)) + return srv.(IndexNodeServer).CreateIndex(ctx, req.(*CreateIndexRequest)) } return interceptor(ctx, in, info, handler) } @@ -1378,12 +1378,8 @@ var _IndexNode_serviceDesc = grpc.ServiceDesc{ Handler: _IndexNode_GetStatisticsChannel_Handler, }, { - MethodName: "BuildIndex", - Handler: _IndexNode_BuildIndex_Handler, - }, - { - MethodName: "DropIndex", - Handler: _IndexNode_DropIndex_Handler, + MethodName: "CreateIndex", + Handler: _IndexNode_CreateIndex_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/internal/types/types.go b/internal/types/types.go index 9d74e1a5d9c96fc5cfa76867704ce3e2e78de579..2f3aaf3fbf1148454e1aefa74f891dfff22d58e8 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -67,8 +67,7 @@ type IndexNode interface { Component TimeTickProvider - BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*commonpb.Status, error) - DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) + CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) } type IndexService interface { @@ -80,7 +79,6 @@ type IndexService interface { DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) - NotifyBuildIndex(ctx context.Context, nty *indexpb.NotifyBuildIndexRequest) (*commonpb.Status, error) } type MasterService interface {