未验证 提交 a606ab9c 编写于 作者: C cai.zhang 提交者: GitHub

Add power-off restart logic for IndexService and IndexNode (#5395)

Add power-off restart logic for IndexService and IndexNode
Signed-off-by: N&lt;xiaocai2333&gt; <cai.zhang@zilliz.com>
上级 dde74e21
......@@ -153,16 +153,9 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp
return ret.(*milvuspb.StringResponse), err
}
func (c *Client) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*commonpb.Status, error) {
func (c *Client) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.BuildIndex(ctx, req)
})
return ret.(*commonpb.Status), err
}
func (c *Client) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.DropIndex(ctx, req)
return c.grpcClient.CreateIndex(ctx, req)
})
return ret.(*commonpb.Status), err
}
......@@ -191,12 +191,8 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetSt
return s.indexnode.GetStatisticsChannel(ctx)
}
func (s *Server) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*commonpb.Status, error) {
return s.indexnode.BuildIndex(ctx, req)
}
func (s *Server) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) {
return s.indexnode.DropIndex(ctx, request)
func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
return s.indexnode.CreateIndex(ctx, req)
}
func NewServer(ctx context.Context) (*Server, error) {
......
......@@ -217,10 +217,3 @@ func (c *Client) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFil
})
return ret.(*indexpb.GetIndexFilePathsResponse), err
}
func (c *Client) NotifyBuildIndex(ctx context.Context, nty *indexpb.NotifyBuildIndexRequest) (*commonpb.Status, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.NotifyBuildIndex(ctx, nty)
})
return ret.(*commonpb.Status), err
}
......@@ -149,9 +149,6 @@ func (s *Server) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFil
return s.indexservice.GetIndexFilePaths(ctx, req)
}
func (s *Server) NotifyBuildIndex(ctx context.Context, nty *indexpb.NotifyBuildIndexRequest) (*commonpb.Status, error) {
return s.indexservice.NotifyBuildIndex(ctx, nty)
}
func (s *Server) startGrpcLoop(grpcPort int) {
defer s.loopWg.Done()
......
......@@ -19,9 +19,13 @@ import (
"strconv"
"time"
"github.com/milvus-io/milvus/internal/util/retry"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
......@@ -58,6 +62,9 @@ type IndexNode struct {
startCallbacks []func()
closeCallbacks []func()
etcdKV *etcdkv.EtcdKV
finishedTasks map[UniqueID]commonpb.IndexState
closer io.Closer
}
......@@ -88,7 +95,17 @@ func (i *IndexNode) Register() error {
func (i *IndexNode) Init() error {
ctx := context.Background()
err := funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200)
connectEtcdFn := func() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
i.etcdKV = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
return err
}
err := retry.Retry(100000, time.Millisecond*200, connectEtcdFn)
if err != nil {
return err
}
err = funcutil.WaitForComponentHealthy(ctx, i.serviceClient, "IndexService", 1000000, time.Millisecond*200)
if err != nil {
return err
}
......@@ -98,6 +115,7 @@ func (i *IndexNode) Init() error {
Ip: Params.IP,
Port: int64(Params.Port),
},
ServerID: i.session.ServerID,
}
resp, err2 := i.serviceClient.RegisterNode(ctx, request)
......@@ -164,11 +182,13 @@ func (i *IndexNode) SetIndexServiceClient(serviceClient types.IndexService) {
i.serviceClient = serviceClient
}
func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexRequest) (*commonpb.Status, error) {
func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
log.Debug("indexnode building index ...",
zap.Int64("IndexBuildID", request.IndexBuildID),
zap.String("Indexname", request.IndexName),
zap.Int64("IndexID", request.IndexID),
zap.Int64("Version", request.Version),
zap.String("MetaPath", request.MetaPath),
zap.Strings("DataPaths", request.DataPaths),
zap.Any("TypeParams", request.TypeParams),
zap.Any("IndexParams", request.IndexParams))
......@@ -180,6 +200,7 @@ func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexR
},
req: request,
kv: i.kv,
etcdKV: i.etcdKV,
serviceClient: i.serviceClient,
nodeID: Params.NodeID,
}
......@@ -194,34 +215,9 @@ func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexR
ret.Reason = err.Error()
return ret, nil
}
log.Debug("indexnode", zap.Int64("indexnode successfully schedule with indexBuildID", request.IndexBuildID))
return ret, nil
}
log.Debug("IndexNode", zap.Int64("IndexNode successfully schedule with indexBuildID", request.IndexBuildID))
func (i *IndexNode) DropIndex(ctx context.Context, request *indexpb.DropIndexRequest) (*commonpb.Status, error) {
log.Debug("IndexNode", zap.Int64("Drop index by id", request.IndexID))
indexBuildIDs := i.sched.IndexBuildQueue.tryToRemoveUselessIndexBuildTask(request.IndexID)
log.Debug("IndexNode", zap.Any("The index of the IndexBuildIDs to be deleted", indexBuildIDs))
for _, indexBuildID := range indexBuildIDs {
nty := &indexpb.NotifyBuildIndexRequest{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IndexBuildID: indexBuildID,
NodeID: Params.NodeID,
IndexFilePaths: []string{},
}
resp, err := i.serviceClient.NotifyBuildIndex(ctx, nty)
if err != nil {
log.Warn("IndexNode", zap.String("DropIndex notify error", err.Error()))
} else if resp.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("IndexNode", zap.String("DropIndex notify error reason", resp.Reason))
}
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
}, nil
return ret, nil
}
// AddStartCallback adds a callback in the startServer phase.
......
......@@ -16,10 +16,15 @@ import (
"errors"
"runtime"
"strconv"
"time"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
......@@ -83,8 +88,9 @@ type IndexBuildTask struct {
BaseTask
index Index
kv kv.BaseKV
etcdKV *etcdkv.EtcdKV
savePaths []string
req *indexpb.BuildIndexRequest
req *indexpb.CreateIndexRequest
serviceClient types.IndexService
nodeID UniqueID
}
......@@ -111,9 +117,55 @@ func (it *IndexBuildTask) OnEnqueue() error {
return nil
}
func (it *IndexBuildTask) checkIndexMeta(pre bool) error {
fn := func() error {
indexMeta := indexpb.IndexMeta{}
_, values, versions, err := it.etcdKV.LoadWithPrefix2(it.req.MetaPath)
if err != nil {
log.Debug("IndexService", zap.Any("load meta error with path", it.req.MetaPath))
log.Debug("IndexService", zap.Any("Load meta error", err))
return err
}
err = proto.UnmarshalText(values[0], &indexMeta)
if err != nil {
log.Debug("IndexService", zap.Any("Unmarshal error", err))
return err
}
if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished {
log.Debug("IndexNode", zap.Any("Notify build index", "This version is not the latest version"))
return nil
}
if indexMeta.MarkDeleted {
indexMeta.State = commonpb.IndexState_Finished
v := proto.MarshalTextString(&indexMeta)
err := it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0], v)
if err != nil {
return err
}
return nil
}
if pre {
return nil
}
indexMeta.IndexFilePaths = it.savePaths
indexMeta.State = commonpb.IndexState_Finished
if it.err != nil {
indexMeta.State = commonpb.IndexState_Failed
}
log.Debug("IndexNode", zap.Any("MetaPath", it.req.MetaPath))
err = it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0],
proto.MarshalTextString(&indexMeta))
return err
}
err := retry.Retry(3, time.Millisecond*200, fn)
return err
}
func (it *IndexBuildTask) PreExecute(ctx context.Context) error {
log.Debug("preExecute...")
return nil
return it.checkIndexMeta(true)
}
func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
......@@ -131,30 +183,7 @@ func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
return err
}
nty := &indexpb.NotifyBuildIndexRequest{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IndexBuildID: it.req.IndexBuildID,
NodeID: it.nodeID,
IndexFilePaths: it.savePaths,
}
if it.err != nil {
nty.Status.ErrorCode = commonpb.ErrorCode_BuildIndexError
}
ctx = context.TODO()
resp, err := it.serviceClient.NotifyBuildIndex(ctx, nty)
if err != nil {
log.Warn("indexnode", zap.String("error", err.Error()))
return err
}
if resp.ErrorCode != commonpb.ErrorCode_Success {
err = errors.New(resp.Reason)
log.Debug("indexnode", zap.String("[IndexBuildTask][PostExecute] err", err.Error()))
}
return err
return it.checkIndexMeta(false)
}
func (it *IndexBuildTask) Execute(ctx context.Context) error {
......@@ -309,7 +338,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
getSavePathByKey := func(key string) string {
// TODO: fix me, use more reasonable method
return strconv.Itoa(int(it.req.IndexBuildID)) + "/" + strconv.Itoa(int(partitionID)) + "/" + strconv.Itoa(int(segmentID)) + "/" + key
return strconv.Itoa(int(it.req.IndexBuildID)) + "/" + strconv.Itoa(int(it.req.Version)) + "/" + strconv.Itoa(int(partitionID)) + "/" + strconv.Itoa(int(segmentID)) + "/" + key
}
saveBlob := func(path string, value []byte) error {
return it.kv.Save(path, string(value))
......@@ -322,7 +351,26 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
savePath := getSavePathByKey(key)
err := saveBlob(savePath, value)
saveIndexFileFn := func() error {
v, err := it.etcdKV.Load(it.req.MetaPath)
if err != nil {
log.Debug("IndexService", zap.Any("load meta error with path", it.req.MetaPath))
log.Debug("IndexService", zap.Any("Load meta error", err))
return err
}
indexMeta := indexpb.IndexMeta{}
err = proto.UnmarshalText(v, &indexMeta)
if err != nil {
log.Debug("IndexService", zap.Any("Unmarshal error", err))
return err
}
if indexMeta.Version > it.req.Version {
log.Debug("IndexNode", zap.Any("Notify build index", "This version is not the latest version"))
return errors.New("This task has been reassigned ")
}
return saveBlob(savePath, value)
}
err := retry.Retry(5, time.Millisecond*200, saveIndexFileFn)
if err != nil {
return err
}
......
......@@ -15,9 +15,13 @@ import (
"context"
"errors"
"math/rand"
"strconv"
"sync"
"time"
"github.com/coreos/etcd/mvcc/mvccpb"
"github.com/golang/protobuf/proto"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/allocator"
......@@ -34,13 +38,12 @@ import (
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.etcd.io/etcd/clientv3"
)
const (
reqTimeoutInterval = time.Second * 10
durationInterval = time.Second * 10
dropIndexLimit = 20
recycleIndexLimit = 20
)
type IndexService struct {
......@@ -57,12 +60,18 @@ type IndexService struct {
sched *TaskScheduler
session *sessionutil.Session
eventChan <-chan *sessionutil.SessionEvent
assignChan chan []UniqueID
idAllocator *allocator.GlobalIDAllocator
kv kv.BaseKV
metaTable *metaTable
nodeTasks *nodeTasks
nodeLock sync.RWMutex
// Add callback functions at different stages
......@@ -80,6 +89,7 @@ func NewIndexService(ctx context.Context) (*IndexService, error) {
loopCtx: ctx1,
loopCancel: cancel,
nodeClients: &PriorityQueue{},
nodeTasks: &nodeTasks{},
}
return i, nil
......@@ -89,12 +99,14 @@ func NewIndexService(ctx context.Context) (*IndexService, error) {
func (i *IndexService) Register() error {
i.session = sessionutil.NewSession(i.loopCtx, Params.MetaRootPath, []string{Params.EtcdAddress})
i.session.Init(typeutil.IndexServiceRole, Params.Address, true)
i.eventChan = i.session.WatchServices(typeutil.IndexNodeRole, 0)
return nil
}
func (i *IndexService) Init() error {
log.Debug("indexservice", zap.String("etcd address", Params.EtcdAddress))
i.assignChan = make(chan []UniqueID, 1024)
connectEtcdFn := func() error {
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
if err != nil {
......@@ -144,6 +156,13 @@ func (i *IndexService) Init() error {
return err
}
i.UpdateStateCode(internalpb.StateCode_Healthy)
i.nodeTasks = NewNodeTasks()
err = i.assignTasksServerStart()
if err != nil {
return err
}
return nil
}
......@@ -152,7 +171,16 @@ func (i *IndexService) Start() error {
go i.tsLoop()
i.loopWg.Add(1)
go i.dropIndexLoop()
go i.recycleUnusedIndexFiles()
i.loopWg.Add(1)
go i.assignmentTasksLoop()
i.loopWg.Add(1)
go i.watchNodeLoop()
i.loopWg.Add(1)
go i.watchMetaLoop()
i.sched.Start()
// Start callbacks
......@@ -218,13 +246,24 @@ func (i *IndexService) GetStatisticsChannel(ctx context.Context) (*milvuspb.Stri
}
func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
log.Debug("indexservice building index ...",
log.Debug("IndexService building index ...",
zap.Int64("IndexBuildID", req.IndexBuildID),
zap.String("IndexName = ", req.IndexName),
zap.Int64("IndexID = ", req.IndexID),
zap.Strings("DataPath = ", req.DataPaths),
zap.Any("TypeParams", req.TypeParams),
zap.Any("IndexParams", req.IndexParams))
hasIndex, indexBuildID := i.metaTable.HasSameReq(req)
if hasIndex {
log.Debug("IndexService", zap.Any("hasIndex true", indexBuildID))
return &indexpb.BuildIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "already have same index",
},
IndexBuildID: indexBuildID,
}, nil
}
ret := &indexpb.BuildIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
......@@ -273,6 +312,7 @@ func (i *IndexService) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRe
ret.Status.Reason = err.Error()
return ret, nil
}
i.assignChan <- []UniqueID{t.indexBuildID}
ret.Status.ErrorCode = commonpb.ErrorCode_Success
ret.IndexBuildID = t.indexBuildID
return ret, nil
......@@ -322,13 +362,6 @@ func (i *IndexService) DropIndex(ctx context.Context, req *indexpb.DropIndexRequ
i.metaTable.DeleteIndex(indexBuildID)
}
}()
go func() {
allNodeClients := i.nodeClients.PeekAllClients()
for _, client := range allNodeClients {
client.DropIndex(ctx, req)
}
}()
}()
log.Debug("IndexService", zap.Int64("DropIndex success by ID", req.IndexID))
......@@ -359,27 +392,6 @@ func (i *IndexService) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIn
return ret, nil
}
func (i *IndexService) NotifyBuildIndex(ctx context.Context, nty *indexpb.NotifyBuildIndexRequest) (*commonpb.Status, error) {
log.Debug("IndexService",
zap.Int64("notify build index", nty.IndexBuildID),
zap.Strings("index file paths", nty.IndexFilePaths),
zap.Int64("node ID", nty.NodeID))
ret := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
log.Debug("IndexService", zap.String("[IndexService][NotifyBuildIndex]", nty.String()))
if err := i.metaTable.NotifyBuildIndex(nty); err != nil {
ret.ErrorCode = commonpb.ErrorCode_BuildIndexError
ret.Reason = err.Error()
log.Debug("IndexService", zap.String("[IndexService][NotifyBuildIndex][metaTable][NotifyBuildIndex]", err.Error()))
}
log.Debug("IndexService", zap.Any("Index build completed with ID", nty.IndexBuildID))
i.nodeClients.IncPriority(nty.NodeID, -1)
return ret, nil
}
func (i *IndexService) tsLoop() {
tsoTicker := time.NewTicker(tso.UpdateTimestampStep)
defer tsoTicker.Stop()
......@@ -401,27 +413,177 @@ func (i *IndexService) tsLoop() {
}
}
func (i *IndexService) dropIndexLoop() {
func (i *IndexService) recycleUnusedIndexFiles() {
ctx, cancel := context.WithCancel(i.loopCtx)
defer cancel()
defer i.loopWg.Done()
timeTicker := time.NewTicker(durationInterval)
log.Debug("IndexService start recycle unused index files loop")
log.Debug("IndexService start drop index loop")
for {
select {
case <-ctx.Done():
return
case <-timeTicker.C:
indexMetas := i.metaTable.getMarkDeleted(dropIndexLimit)
for j := 0; j < len(indexMetas); j++ {
if err := i.kv.MultiRemove(indexMetas[j].IndexFilePaths); err != nil {
log.Debug("IndexService", zap.String("Remove index files error", err.Error()))
metas := i.metaTable.GetUnusedIndexFiles(recycleIndexLimit)
for _, meta := range metas {
if meta.indexMeta.MarkDeleted {
unusedIndexFilePathPrefix := strconv.Itoa(int(meta.indexMeta.IndexBuildID))
if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil {
log.Debug("IndexService", zap.String("Remove index files error", err.Error()))
}
i.metaTable.DeleteIndex(meta.indexMeta.IndexBuildID)
} else {
for j := 1; j < int(meta.indexMeta.Version); j++ {
unusedIndexFilePathPrefix := strconv.Itoa(int(meta.indexMeta.IndexBuildID)) + "/" + strconv.Itoa(j)
if err := i.kv.RemoveWithPrefix(unusedIndexFilePathPrefix); err != nil {
log.Debug("IndexService", zap.String("Remove index files error", err.Error()))
}
}
if err := i.metaTable.UpdateRecycleState(meta.indexMeta.IndexBuildID); err != nil {
log.Debug("IndexService", zap.String("Remove index files error", err.Error()))
}
}
i.metaTable.DeleteIndex(indexMetas[j].IndexBuildID)
}
}
}
}
func (i *IndexService) assignmentTasksLoop() {
ctx, cancel := context.WithCancel(i.loopCtx)
defer cancel()
defer i.loopWg.Done()
log.Debug("IndexService start assign tasks loop")
for {
select {
case <-ctx.Done():
return
case indexBuildIDs := <-i.assignChan:
for _, indexBuildID := range indexBuildIDs {
meta := i.metaTable.GetIndexMeta(indexBuildID)
log.Debug("IndexService", zap.Any("Meta", meta))
if meta.indexMeta.State == commonpb.IndexState_Finished {
continue
}
if err := i.metaTable.UpdateVersion(indexBuildID); err != nil {
log.Debug("IndexService", zap.String("build index update version err", err.Error()))
}
nodeID, builderClient, nodeServerID := i.nodeClients.PeekClient()
if builderClient == nil {
log.Debug("IndexService has no available IndexNode")
i.assignChan <- []UniqueID{indexBuildID}
continue
}
req := &indexpb.CreateIndexRequest{
IndexBuildID: indexBuildID,
IndexName: meta.indexMeta.Req.IndexName,
IndexID: meta.indexMeta.Req.IndexID,
Version: meta.indexMeta.Version + 1,
MetaPath: "/indexes/" + strconv.FormatInt(indexBuildID, 10),
DataPaths: meta.indexMeta.Req.DataPaths,
TypeParams: meta.indexMeta.Req.TypeParams,
IndexParams: meta.indexMeta.Req.IndexParams,
}
resp, err := builderClient.CreateIndex(ctx, req)
if err != nil {
log.Debug("IndexService", zap.String("build index err", err.Error()))
}
if err = i.metaTable.BuildIndex(indexBuildID, nodeServerID); err != nil {
log.Debug("IndexService", zap.String("update meta table error", err.Error()))
}
if resp.ErrorCode != commonpb.ErrorCode_Success {
log.Debug("IndexService", zap.String("build index err", resp.Reason))
}
i.nodeClients.IncPriority(nodeID, 1)
}
}
}
}
func (i *IndexService) watchNodeLoop() {
ctx, cancel := context.WithCancel(i.loopCtx)
defer cancel()
defer i.loopWg.Done()
log.Debug("IndexService start watch node loop")
for {
select {
case <-ctx.Done():
return
case event := <-i.eventChan:
switch event.EventType {
case sessionutil.SessionAddEvent:
serverID := event.Session.ServerID
log.Debug("IndexService", zap.Any("Add IndexNode, session serverID", serverID))
case sessionutil.SessionDelEvent:
serverID := event.Session.ServerID
log.Debug("IndexService", zap.Any("The IndexNode crashed with ID", serverID))
indexBuildIDs := i.nodeTasks.getTasksByLeaseKey(serverID)
i.assignChan <- indexBuildIDs
i.nodeTasks.delete(serverID)
}
}
}
}
func (i *IndexService) watchMetaLoop() {
ctx, cancel := context.WithCancel(i.loopCtx)
defer cancel()
defer i.loopWg.Done()
log.Debug("IndexService start watch meta loop")
watchChan := i.metaTable.client.WatchWithPrefix("indexes")
for {
select {
case <-ctx.Done():
return
case resp := <-watchChan:
log.Debug("meta updated.")
for _, event := range resp.Events {
eventRevision := event.Kv.Version
indexMeta := &indexpb.IndexMeta{}
err := proto.UnmarshalText(string(event.Kv.Value), indexMeta)
if err != nil {
log.Debug("IndexService", zap.Any("Unmarshal error", err))
}
indexBuildID := indexMeta.IndexBuildID
switch event.Type {
case mvccpb.PUT:
//TODO: get indexBuildID fast
log.Debug("IndexService", zap.Any("Meta need load by IndexBuildID", indexBuildID))
reload := i.metaTable.LoadMetaFromETCD(indexBuildID, eventRevision)
if reload {
i.nodeTasks.finishTask(indexBuildID)
}
case mvccpb.DELETE:
}
}
}
}
}
func (i *IndexService) assignTasksServerStart() error {
sessions, _, err := i.session.GetSessions(typeutil.IndexNodeRole)
if err != nil {
return err
}
var serverIDs []int64
for _, session := range sessions {
serverIDs = append(serverIDs, session.ServerID)
}
tasks := i.metaTable.GetUnassignedTasks(serverIDs)
for _, taskQueue := range tasks {
i.assignChan <- taskQueue
}
return nil
}
......@@ -15,24 +15,37 @@ import (
"fmt"
"strconv"
"sync"
"time"
"github.com/milvus-io/milvus/internal/util/retry"
"go.uber.org/zap"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
)
const (
RequestTimeout = 10 * time.Second
maxTasks = 10
)
type Meta struct {
indexMeta *indexpb.IndexMeta
revision int64
}
type metaTable struct {
client kv.TxnKV // client of a reliable kv service, i.e. etcd client
indexBuildID2Meta map[UniqueID]indexpb.IndexMeta // index build id to index meta
client *etcdkv.EtcdKV // client of a reliable kv service, i.e. etcd client
indexBuildID2Meta map[UniqueID]Meta // index build id to index meta
lock sync.RWMutex
}
func NewMetaTable(kv kv.TxnKV) (*metaTable, error) {
func NewMetaTable(kv *etcdkv.EtcdKV) (*metaTable, error) {
mt := &metaTable{
client: kv,
lock: sync.RWMutex{},
......@@ -46,31 +59,70 @@ func NewMetaTable(kv kv.TxnKV) (*metaTable, error) {
}
func (mt *metaTable) reloadFromKV() error {
mt.indexBuildID2Meta = make(map[UniqueID]indexpb.IndexMeta)
mt.indexBuildID2Meta = make(map[UniqueID]Meta)
key := "indexes"
log.Debug("LoadWithPrefix ", zap.String("prefix", key))
_, values, err := mt.client.LoadWithPrefix("indexes")
_, values, versions, err := mt.client.LoadWithPrefix2(key)
if err != nil {
return err
}
for _, value := range values {
for i := 0; i < len(values); i++ {
indexMeta := indexpb.IndexMeta{}
err = proto.UnmarshalText(value, &indexMeta)
err = proto.UnmarshalText(values[i], &indexMeta)
if err != nil {
return fmt.Errorf("IndexService metaTable reloadFromKV UnmarshalText indexpb.IndexMeta err:%w", err)
}
mt.indexBuildID2Meta[indexMeta.IndexBuildID] = indexMeta
meta := &Meta{
indexMeta: &indexMeta,
revision: versions[i],
}
mt.indexBuildID2Meta[indexMeta.IndexBuildID] = *meta
}
return nil
}
// metaTable.lock.Lock() before call this function
func (mt *metaTable) saveIndexMeta(meta *indexpb.IndexMeta) error {
value := proto.MarshalTextString(meta)
func (mt *metaTable) saveIndexMeta(meta *Meta) error {
value := proto.MarshalTextString(meta.indexMeta)
key := "indexes/" + strconv.FormatInt(meta.indexMeta.IndexBuildID, 10)
log.Debug("LoadWithPrefix ", zap.String("prefix", key))
err := mt.client.CompareVersionAndSwap(key, meta.revision, value)
if err != nil {
return err
}
meta.revision = meta.revision + 1
mt.indexBuildID2Meta[meta.indexMeta.IndexBuildID] = *meta
return nil
}
func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) {
key := "indexes/" + strconv.FormatInt(indexBuildID, 10)
mt.indexBuildID2Meta[meta.IndexBuildID] = *meta
_, values, version, err := mt.client.LoadWithPrefix2(key)
if err != nil {
return nil, err
}
im := &indexpb.IndexMeta{}
err = proto.UnmarshalText(values[0], im)
if err != nil {
return nil, err
}
if im.State == commonpb.IndexState_Finished {
return nil, nil
}
m := &Meta{
revision: version[0],
indexMeta: im,
}
return mt.client.Save("/indexes/"+strconv.FormatInt(meta.IndexBuildID, 10), value)
return m, nil
}
func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequest) error {
......@@ -81,67 +133,115 @@ func (mt *metaTable) AddIndex(indexBuildID UniqueID, req *indexpb.BuildIndexRequ
if ok {
return fmt.Errorf("index already exists with ID = %d", indexBuildID)
}
meta := &indexpb.IndexMeta{
State: commonpb.IndexState_Unissued,
IndexBuildID: indexBuildID,
Req: req,
meta := &Meta{
indexMeta: &indexpb.IndexMeta{
State: commonpb.IndexState_Unissued,
IndexBuildID: indexBuildID,
Req: req,
NodeServerID: 0,
Version: 0,
},
revision: 0,
}
return mt.saveIndexMeta(meta)
}
func (mt *metaTable) BuildIndex(indexBuildID UniqueID) error {
func (mt *metaTable) BuildIndex(indexBuildID UniqueID, serverID int64) error {
mt.lock.Lock()
defer mt.lock.Unlock()
log.Debug("IndexService update index state")
log.Debug("IndexService update index meta")
meta, ok := mt.indexBuildID2Meta[indexBuildID]
if !ok {
return fmt.Errorf("index not exists with ID = %d", indexBuildID)
}
if meta.State != commonpb.IndexState_Unissued {
return fmt.Errorf("can not update index state, index with ID = %d state is %d", indexBuildID, meta.State)
if meta.indexMeta.State != commonpb.IndexState_Unissued {
return fmt.Errorf("can not set lease key, index with ID = %d state is %d", indexBuildID, meta.indexMeta.State)
}
meta.State = commonpb.IndexState_InProgress
return mt.saveIndexMeta(&meta)
meta.indexMeta.NodeServerID = serverID
err := mt.saveIndexMeta(&meta)
if err != nil {
fn := func() error {
m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID)
if m == nil {
return err
}
m.indexMeta.NodeServerID = serverID
return mt.saveIndexMeta(m)
}
err2 := retry.Retry(5, time.Millisecond*200, fn)
if err2 != nil {
return err2
}
}
return nil
}
func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error {
func (mt *metaTable) UpdateVersion(indexBuildID UniqueID) error {
mt.lock.Lock()
defer mt.lock.Unlock()
log.Debug("IndexService update index version")
meta, ok := mt.indexBuildID2Meta[indexBuildID]
if !ok {
return fmt.Errorf("index not exists with ID = %d", indexBuildID)
}
log.Debug("indexservice", zap.Int64("mark index is deleted", indexID))
if meta.indexMeta.State != commonpb.IndexState_Unissued {
return fmt.Errorf("can not set lease key, index with ID = %d state is %d", indexBuildID, meta.indexMeta.State)
}
for indexBuildID, meta := range mt.indexBuildID2Meta {
if meta.Req.IndexID == indexID {
meta.MarkDeleted = true
mt.indexBuildID2Meta[indexBuildID] = meta
meta.indexMeta.Version = meta.indexMeta.Version + 1
err := mt.saveIndexMeta(&meta)
if err != nil {
fn := func() error {
m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID)
if m == nil {
return err
}
m.indexMeta.Version = m.indexMeta.Version + 1
return mt.saveIndexMeta(m)
}
err2 := retry.Retry(5, time.Millisecond*200, fn)
return err2
}
return nil
}
func (mt *metaTable) NotifyBuildIndex(nty *indexpb.NotifyBuildIndexRequest) error {
func (mt *metaTable) MarkIndexAsDeleted(indexID UniqueID) error {
mt.lock.Lock()
defer mt.lock.Unlock()
log.Debug("indexservice", zap.Int64("notify build index", nty.IndexBuildID))
indexBuildID := nty.IndexBuildID
meta, ok := mt.indexBuildID2Meta[indexBuildID]
if !ok {
return fmt.Errorf("index not exists with ID = %d", indexBuildID)
}
log.Debug("indexservice", zap.Int64("mark index is deleted", indexID))
if nty.Status.ErrorCode != commonpb.ErrorCode_Success {
meta.State = commonpb.IndexState_Failed
meta.FailReason = nty.Status.Reason
} else {
meta.State = commonpb.IndexState_Finished
meta.IndexFilePaths = nty.IndexFilePaths
for _, meta := range mt.indexBuildID2Meta {
if meta.indexMeta.Req.IndexID == indexID {
meta.indexMeta.MarkDeleted = true
if err := mt.saveIndexMeta(&meta); err != nil {
log.Debug("IndexService", zap.Any("Meta table mark deleted err", err.Error()))
fn := func() error {
m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID)
if m == nil {
return err
}
m.indexMeta.MarkDeleted = true
return mt.saveIndexMeta(m)
}
err2 := retry.Retry(5, time.Millisecond*200, fn)
if err2 != nil {
return err2
}
}
}
}
return mt.saveIndexMeta(&meta)
return nil
}
func (mt *metaTable) GetIndexState(indexBuildID UniqueID) (*indexpb.IndexInfo, error) {
......@@ -154,13 +254,13 @@ func (mt *metaTable) GetIndexState(indexBuildID UniqueID) (*indexpb.IndexInfo, e
if !ok {
return ret, fmt.Errorf("index not exists with ID = %d", indexBuildID)
}
if meta.MarkDeleted {
if meta.indexMeta.MarkDeleted {
return ret, fmt.Errorf("index not exists with ID = %d", indexBuildID)
}
ret.IndexID = meta.Req.IndexID
ret.IndexName = meta.Req.IndexName
ret.Reason = meta.FailReason
ret.State = meta.State
ret.IndexID = meta.indexMeta.Req.IndexID
ret.IndexName = meta.indexMeta.Req.IndexName
ret.Reason = meta.indexMeta.FailReason
ret.State = meta.indexMeta.State
return ret, nil
}
......@@ -174,10 +274,10 @@ func (mt *metaTable) GetIndexFilePathInfo(indexBuildID UniqueID) (*indexpb.Index
if !ok {
return nil, fmt.Errorf("index not exists with ID = %d", indexBuildID)
}
if meta.MarkDeleted {
if meta.indexMeta.MarkDeleted {
return nil, fmt.Errorf("index not exists with ID = %d", indexBuildID)
}
ret.IndexFilePaths = meta.IndexFilePaths
ret.IndexFilePaths = meta.indexMeta.IndexFilePaths
return ret, nil
}
......@@ -186,23 +286,227 @@ func (mt *metaTable) DeleteIndex(indexBuildID UniqueID) {
defer mt.lock.Unlock()
delete(mt.indexBuildID2Meta, indexBuildID)
key := "indexes/" + strconv.FormatInt(indexBuildID, 10)
err := mt.client.Remove(key)
if err != nil {
log.Debug("IndexService", zap.Any("Delete IndexMeta in etcd error", err))
}
}
func (mt *metaTable) getMarkDeleted(limit int) []indexpb.IndexMeta {
func (mt *metaTable) UpdateRecycleState(indexBuildID UniqueID) error {
mt.lock.Lock()
defer mt.lock.Unlock()
log.Debug("IndexService get mark deleted meta")
meta, ok := mt.indexBuildID2Meta[indexBuildID]
if !ok {
return fmt.Errorf("index not exists with ID = %d", indexBuildID)
}
meta.indexMeta.Recycled = true
if err := mt.saveIndexMeta(&meta); err != nil {
fn := func() error {
m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID)
if m == nil {
return err
}
var indexMetas []indexpb.IndexMeta
m.indexMeta.Recycled = true
return mt.saveIndexMeta(m)
}
err2 := retry.Retry(5, time.Millisecond*200, fn)
if err2 != nil {
return err2
}
}
return nil
}
func (mt *metaTable) GetUnusedIndexFiles(limit int) []Meta {
mt.lock.Lock()
defer mt.lock.Unlock()
var metas []Meta
for _, meta := range mt.indexBuildID2Meta {
if meta.MarkDeleted && meta.State == commonpb.IndexState_Finished {
indexMetas = append(indexMetas, meta)
if meta.indexMeta.State == commonpb.IndexState_Finished && (meta.indexMeta.MarkDeleted || !meta.indexMeta.Recycled) {
metas = append(metas, meta)
}
if len(metas) >= limit {
return metas
}
}
return metas
}
func (mt *metaTable) GetIndexMeta(indexBuildID UniqueID) Meta {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.indexBuildID2Meta[indexBuildID]
if !ok {
log.Debug("IndexService", zap.Any("Meta table does not have the meta with indexBuildID", indexBuildID))
}
return meta
}
func (mt *metaTable) GetUnassignedTasks(serverIDs []int64) [][]UniqueID {
var tasks [][]UniqueID
var indexBuildIDs []UniqueID
for indexBuildID, meta := range mt.indexBuildID2Meta {
alive := false
for _, serverID := range serverIDs {
if meta.indexMeta.NodeServerID == serverID {
alive = true
}
}
if !alive {
indexBuildIDs = append(indexBuildIDs, indexBuildID)
}
if len(indexMetas) >= limit {
return indexMetas
if len(indexBuildIDs) >= 10 {
tasks = append(tasks, indexBuildIDs)
indexBuildIDs = []UniqueID{}
}
}
tasks = append(tasks, indexBuildIDs)
return tasks
}
func compare2Array(arr1, arr2 interface{}) bool {
p1, ok := arr1.([]*commonpb.KeyValuePair)
if ok {
p2, ok1 := arr2.([]*commonpb.KeyValuePair)
if ok1 {
for _, param1 := range p1 {
sameParams := false
for _, param2 := range p2 {
if param1.Key == param2.Key && param1.Value == param2.Value {
sameParams = true
}
}
if !sameParams {
return false
}
}
return true
}
log.Error("indexservice", zap.Any("type error", "arr2 type should be commonpb.KeyValuePair"))
return false
}
v1, ok2 := arr1.([]string)
if ok2 {
v2, ok3 := arr2.([]string)
if ok3 {
for _, s1 := range v1 {
sameParams := false
for _, s2 := range v2 {
if s1 == s2 {
sameParams = true
}
}
if !sameParams {
return false
}
}
return true
}
log.Error("indexservice", zap.Any("type error", "arr2 type should be string array"))
return false
}
log.Error("indexservice", zap.Any("type error", "param type should be commonpb.KeyValuePair or string array"))
return false
}
func (mt *metaTable) HasSameReq(req *indexpb.BuildIndexRequest) (bool, UniqueID) {
mt.lock.Lock()
defer mt.lock.Unlock()
LOOP:
for _, meta := range mt.indexBuildID2Meta {
if meta.indexMeta.Req.IndexID == req.IndexID {
if len(meta.indexMeta.Req.DataPaths) != len(req.DataPaths) {
goto LOOP
}
if len(meta.indexMeta.Req.IndexParams) == len(req.IndexParams) &&
compare2Array(meta.indexMeta.Req.DataPaths, req.DataPaths) {
if !compare2Array(meta.indexMeta.Req.IndexParams, req.IndexParams) ||
!compare2Array(meta.indexMeta.Req.TypeParams, req.TypeParams) {
goto LOOP
}
return true, meta.indexMeta.IndexBuildID
}
}
}
return false, -1
}
func (mt *metaTable) LoadMetaFromETCD(indexBuildID int64, revision int64) bool {
mt.lock.Lock()
defer mt.lock.Unlock()
meta, ok := mt.indexBuildID2Meta[indexBuildID]
if ok {
if meta.revision >= revision {
return false
}
}
m, err := mt.reloadMeta(meta.indexMeta.IndexBuildID)
if m == nil {
log.Debug("IndexService", zap.Any("Load meta from etcd error", err))
return false
}
log.Debug("IndexService", zap.Any("IndexMeta", m))
mt.indexBuildID2Meta[indexBuildID] = *m
return true
}
type nodeTasks struct {
nodeID2Tasks map[int64][]UniqueID
}
func NewNodeTasks() *nodeTasks {
return &nodeTasks{
nodeID2Tasks: map[int64][]UniqueID{},
}
}
func (nt *nodeTasks) getTasksByLeaseKey(serverID int64) []UniqueID {
indexBuildIDs, ok := nt.nodeID2Tasks[serverID]
if !ok {
return nil
}
return indexBuildIDs
}
func (nt *nodeTasks) assignTask(serverID int64, indexBuildID UniqueID) {
indexBuildIDs, ok := nt.nodeID2Tasks[serverID]
if !ok {
var IDs []UniqueID
IDs = append(IDs, indexBuildID)
nt.nodeID2Tasks[serverID] = IDs
return
}
indexBuildIDs = append(indexBuildIDs, indexBuildID)
nt.nodeID2Tasks[serverID] = indexBuildIDs
}
func (nt *nodeTasks) finishTask(indexBuildID UniqueID) {
for serverID := range nt.nodeID2Tasks {
for i, buildID := range nt.nodeID2Tasks[serverID] {
if buildID == indexBuildID {
nt.nodeID2Tasks[serverID] = append(nt.nodeID2Tasks[serverID][:i], nt.nodeID2Tasks[serverID][:i+1]...)
}
}
}
}
return indexMetas
func (nt *nodeTasks) delete(serverID int64) {
delete(nt.nodeID2Tasks, serverID)
}
......@@ -53,6 +53,7 @@ func (i *IndexService) addNode(nodeID UniqueID, req *indexpb.RegisterNodeRequest
value: nodeClient,
key: nodeID,
addr: req.Address,
serverID: req.ServerID,
priority: 0,
}
i.nodeClients.Push(item)
......
......@@ -21,9 +21,12 @@ import (
// An Item is something we manage in a priority queue.
type PQItem struct {
value types.IndexNode // The value of the item; arbitrary.
key UniqueID
addr *commonpb.Address
value types.IndexNode // The value of the item; arbitrary.
key UniqueID
addr *commonpb.Address
serverID int64
priority int // The priority of the item in the queue.
// The index is needed by update and is maintained by the heap.Interface methods.
index int // The index of the item in the heap.
......@@ -135,12 +138,13 @@ func (pq *PriorityQueue) Peek() interface{} {
//return item.value
}
func (pq *PriorityQueue) PeekClient() (UniqueID, types.IndexNode) {
// PeekClient picks an IndexNode with the lowest load.
func (pq *PriorityQueue) PeekClient() (UniqueID, types.IndexNode, int64) {
item := pq.Peek()
if item == nil {
return UniqueID(-1), nil
return UniqueID(-1), nil, 0
}
return item.(*PQItem).key, item.(*PQItem).value
return item.(*PQItem).key, item.(*PQItem).value, item.(*PQItem).serverID
}
func (pq *PriorityQueue) PeekAllClients() []types.IndexNode {
......
......@@ -15,12 +15,11 @@ import (
"context"
"errors"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/types"
)
......@@ -108,36 +107,16 @@ func (it *IndexAddTask) OnEnqueue() error {
}
func (it *IndexAddTask) PreExecute(ctx context.Context) error {
log.Debug("pretend to check Index Req")
nodeID, builderClient := it.nodeClients.PeekClient()
if builderClient == nil {
return errors.New("IndexAddTask Service not available")
}
it.builderClient = builderClient
it.buildClientNodeID = nodeID
err := it.table.AddIndex(it.indexBuildID, it.req)
if err != nil {
return err
}
it.req.IndexBuildID = it.indexBuildID
return nil
}
func (it *IndexAddTask) Execute(ctx context.Context) error {
it.req.IndexBuildID = it.indexBuildID
log.Debug("before index ...")
resp, err := it.builderClient.BuildIndex(ctx, it.req)
if err != nil {
log.Debug("indexservice", zap.String("build index finish err", err.Error()))
return err
}
if resp.ErrorCode != commonpb.ErrorCode_Success {
return errors.New(resp.Reason)
}
err = it.table.BuildIndex(it.indexBuildID)
log.Debug("IndexService", zap.Any("BuildIndex, IndexBuildID = ", it.indexBuildID))
err := it.table.AddIndex(it.indexBuildID, it.req)
if err != nil {
return err
}
it.nodeClients.IncPriority(it.buildClientNodeID, 1)
return nil
}
......
......@@ -68,6 +68,27 @@ func (kv *EtcdKV) LoadWithPrefix(key string) ([]string, []string, error) {
return keys, values, nil
}
func (kv *EtcdKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) {
key = path.Join(kv.rootPath, key)
log.Debug("LoadWithPrefix ", zap.String("prefix", key))
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
defer cancel()
resp, err := kv.client.Get(ctx, key, clientv3.WithPrefix(),
clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, nil, nil, err
}
keys := make([]string, 0, resp.Count)
values := make([]string, 0, resp.Count)
versions := make([]int64, 0, resp.Count)
for _, kv := range resp.Kvs {
keys = append(keys, string(kv.Key))
values = append(values, string(kv.Value))
versions = append(versions, kv.Version)
}
return keys, values, versions, nil
}
func (kv *EtcdKV) Load(key string) (string, error) {
key = path.Join(kv.rootPath, key)
ctx, cancel := context.WithTimeout(context.TODO(), RequestTimeout)
......
......@@ -16,7 +16,6 @@ service IndexService {
rpc BuildIndex(BuildIndexRequest) returns (BuildIndexResponse){}
rpc GetIndexStates(GetIndexStatesRequest) returns (GetIndexStatesResponse) {}
rpc GetIndexFilePaths(GetIndexFilePathsRequest) returns (GetIndexFilePathsResponse){}
rpc NotifyBuildIndex(NotifyBuildIndexRequest) returns (common.Status) {}
rpc DropIndex(DropIndexRequest) returns (common.Status) {}
}
......@@ -24,13 +23,13 @@ service IndexNode {
rpc GetComponentStates(internal.GetComponentStatesRequest) returns (internal.ComponentStates) {}
rpc GetTimeTickChannel(internal.GetTimeTickChannelRequest) returns(milvus.StringResponse) {}
rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) returns(milvus.StringResponse){}
rpc BuildIndex(BuildIndexRequest) returns (common.Status){}
rpc DropIndex(DropIndexRequest) returns (common.Status) {}
rpc CreateIndex(CreateIndexRequest) returns (common.Status){}
}
message RegisterNodeRequest {
common.MsgBase base = 1;
common.Address address = 2;
int64 serverID = 3;
}
message RegisterNodeResponse {
......@@ -55,25 +54,29 @@ message GetIndexStatesResponse {
repeated IndexInfo states = 2;
}
message BuildIndexRequest {
message CreateIndexRequest {
int64 indexBuildID = 1;
string index_name = 2;
int64 indexID = 3;
repeated string data_paths = 4;
repeated common.KeyValuePair type_params = 5;
repeated common.KeyValuePair index_params = 6;
int64 version = 4;
string meta_path = 5;
repeated string data_paths = 6;
repeated common.KeyValuePair type_params = 7;
repeated common.KeyValuePair index_params = 8;
}
message BuildIndexResponse {
common.Status status = 1;
int64 indexBuildID = 2;
message BuildIndexRequest {
int64 indexBuildID = 1;
string index_name = 2;
int64 indexID = 3;
repeated string data_paths = 5;
repeated common.KeyValuePair type_params = 6;
repeated common.KeyValuePair index_params = 7;
}
message NotifyBuildIndexRequest {
message BuildIndexResponse {
common.Status status = 1;
int64 indexBuildID = 2;
repeated string index_file_paths = 3;
int64 nodeID = 4;
}
message GetIndexFilePathsRequest {
......@@ -98,6 +101,9 @@ message IndexMeta {
BuildIndexRequest req = 4;
repeated string index_file_paths = 5;
bool mark_deleted = 6;
int64 node_serverID = 7;
int64 version = 8;
bool recycled = 9;
}
message DropIndexRequest {
......
......@@ -67,8 +67,7 @@ type IndexNode interface {
Component
TimeTickProvider
BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*commonpb.Status, error)
DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error)
CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error)
}
type IndexService interface {
......@@ -80,7 +79,6 @@ type IndexService interface {
DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error)
GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error)
GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error)
NotifyBuildIndex(ctx context.Context, nty *indexpb.NotifyBuildIndexRequest) (*commonpb.Status, error)
}
type MasterService interface {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册