提交 add31188 编写于 作者: S sunby 提交者: yefu.chen

Add CreateIndex RPC

Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 14332d49
......@@ -443,8 +443,39 @@ func (s *Master) AssignSegmentID(ctx context.Context, request *internalpb.Assign
}, nil
}
func (s *Master) CreateIndex(context.Context, *internalpb.CreateIndexRequest) (*commonpb.Status, error) {
return nil, nil
func (s *Master) CreateIndex(ctx context.Context, req *internalpb.CreateIndexRequest) (*commonpb.Status, error) {
task := &createIndexTask{
baseTask: baseTask{
sch: s.scheduler,
mt: s.metaTable,
cv: make(chan error),
},
req: req,
indexBuildScheduler: s.indexBuildSch,
indexLoadScheduler: s.indexLoadSch,
segManager: s.segmentManager,
}
err := s.scheduler.Enqueue(task)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Enqueue failed: " + err.Error(),
}, nil
}
err = task.WaitToFinish(ctx)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Create Index error: " + err.Error(),
}, nil
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
Reason: "",
}, nil
}
func (s *Master) DescribeIndex(context.Context, *internalpb.DescribeIndexRequest) (*servicepb.DescribeIndexResponse, error) {
......
package master
import (
"fmt"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
type createIndexTask struct {
baseTask
req *internalpb.CreateIndexRequest
indexBuildScheduler *IndexBuildScheduler
indexLoadScheduler *IndexLoadScheduler
segManager *SegmentManager
}
func (task *createIndexTask) Type() internalpb.MsgType {
return internalpb.MsgType_kCreateIndex
}
func (task *createIndexTask) Ts() (Timestamp, error) {
return task.req.Timestamp, nil
}
func (task *createIndexTask) Execute() error {
// modify schema
if err := task.mt.UpdateFieldIndexParams(task.req.CollectionName, task.req.FieldName, task.req.ExtraParams); err != nil {
return err
}
// check if closed segment has the same index build history
collMeta, err := task.mt.GetCollectionByName(task.req.CollectionName)
if err != nil {
return err
}
var fieldID int64 = -1
for _, fieldSchema := range collMeta.Schema.Fields {
if fieldSchema.Name == task.req.FieldName {
fieldID = fieldSchema.FieldID
break
}
}
if fieldID == -1 {
return fmt.Errorf("can not find field name %s", task.req.FieldName)
}
for _, segID := range collMeta.SegmentIDs {
segMeta, err := task.mt.GetSegmentByID(segID)
if err != nil {
return err
}
if segMeta.CloseTime == 0 {
continue
}
hasIndexMeta, err := task.mt.HasFieldIndexMeta(segID, fieldID, task.req.ExtraParams)
if err != nil {
return err
}
if hasIndexMeta {
// load index
indexMeta, err := task.mt.GetFieldIndexMeta(segID, fieldID, task.req.ExtraParams)
if err != nil {
return err
}
err = task.indexLoadScheduler.Enqueue(&IndexLoadInfo{
segmentID: segID,
fieldID: fieldID,
fieldName: task.req.FieldName,
indexFilePaths: indexMeta.IndexFilePaths,
})
if err != nil {
return err
}
} else {
// create index
for _, kv := range segMeta.BinlogFilePaths {
if kv.FieldID != fieldID {
continue
}
err := task.indexBuildScheduler.Enqueue(&IndexBuildInfo{
segmentID: segID,
fieldID: fieldID,
binlogFilePath: kv.BinlogFiles,
})
if err != nil {
return err
}
break
}
}
}
// close unfilled segment
return task.segManager.ForceClose(collMeta.ID)
}
......@@ -524,7 +524,7 @@ func (mt *metaTable) saveFieldIndexMetaToEtcd(meta *pb.FieldIndexMeta) error {
return mt.client.Save(key, marshaledMeta)
}
func (mt *metaTable) DeleteFieldIndexMeta(segID UniqueID, fieldID UniqueID, indexType string, indexParams []*commonpb.KeyValuePair) error {
func (mt *metaTable) DeleteFieldIndexMeta(segID UniqueID, fieldID UniqueID, indexParams []*commonpb.KeyValuePair) error {
mt.indexLock.Lock()
defer mt.indexLock.Unlock()
......@@ -568,6 +568,22 @@ func (mt *metaTable) HasFieldIndexMeta(segID UniqueID, fieldID UniqueID, indexPa
return false, nil
}
func (mt *metaTable) GetFieldIndexMeta(segID UniqueID, fieldID UniqueID, indexParams []*commonpb.KeyValuePair) (*pb.FieldIndexMeta, error) {
mt.indexLock.RLock()
defer mt.indexLock.RUnlock()
if _, ok := mt.segID2IndexMetas[segID]; !ok {
return nil, fmt.Errorf("can not find segment %d", segID)
}
for _, v := range mt.segID2IndexMetas[segID] {
if v.FieldID == fieldID && typeutil.CompareIndexParams(v.IndexParams, indexParams) {
return &v, nil
}
}
return nil, fmt.Errorf("can not find field %d", fieldID)
}
func (mt *metaTable) UpdateFieldIndexMeta(meta *pb.FieldIndexMeta) error {
mt.indexLock.Lock()
defer mt.indexLock.Unlock()
......@@ -635,3 +651,30 @@ func (mt *metaTable) GetFieldIndexParams(collID UniqueID, fieldID UniqueID) ([]*
}
return nil, fmt.Errorf("can not find field %d in collection %d", fieldID, collID)
}
func (mt *metaTable) UpdateFieldIndexParams(collName string, fieldName string, indexParams []*commonpb.KeyValuePair) error {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
vid, ok := mt.collName2ID[collName]
if !ok {
return errors.Errorf("can't find collection: " + collName)
}
meta, ok := mt.collID2Meta[vid]
if !ok {
return errors.Errorf("can't find collection: " + collName)
}
for _, fieldSchema := range meta.Schema.Fields {
if fieldSchema.Name == fieldName {
fieldSchema.IndexParams = indexParams
if err := mt.saveCollectionMeta(&meta); err != nil {
_ = mt.reloadFromKV()
return err
}
return nil
}
}
return fmt.Errorf("can not find field with id %s", fieldName)
}
......@@ -497,7 +497,7 @@ func TestMetaTable_IndexMeta(t *testing.T) {
})
assert.Nil(t, err)
assert.EqualValues(t, indexbuilderpb.IndexStatus_FINISHED, meta.segID2IndexMetas[1][0].Status)
err = meta.DeleteFieldIndexMeta(1, 100, "type1", []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}})
err = meta.DeleteFieldIndexMeta(1, 100, []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}})
assert.Nil(t, err)
res, err = meta.HasFieldIndexMeta(1, 100, []*commonpb.KeyValuePair{{Key: "k1", Value: "v1"}})
assert.Nil(t, err)
......
......@@ -358,6 +358,20 @@ func (manager *SegmentManager) initChannelRanges() error {
}
return nil
}
// ForceClose set segments of collection with collID closable, segment will be closed after the assignments of it has expired
func (manager *SegmentManager) ForceClose(collID UniqueID) error {
status, ok := manager.collStatus[collID]
if !ok {
return nil
}
for _, segStatus := range status.segments {
segStatus.closable = true
}
return nil
}
func NewSegmentManager(ctx context.Context,
meta *metaTable,
globalIDAllocator func() (UniqueID, error),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册