未验证 提交 be688d89 编写于 作者: J Jiquan Long 提交者: GitHub

Forbid multiple indexes on same field (#16875)

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 4ee78eb5
......@@ -1036,13 +1036,77 @@ func (mt *MetaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema
return exist
}
func (mt *MetaTable) unlockGetCollectionInfo(collName string) (pb.CollectionInfo, error) {
collID, ok := mt.collName2ID[collName]
if !ok {
collID, ok = mt.collAlias2ID[collName]
if !ok {
return pb.CollectionInfo{}, fmt.Errorf("collection not found: %s", collName)
}
}
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return pb.CollectionInfo{}, fmt.Errorf("collection not found: %s", collName)
}
return collMeta, nil
}
func (mt *MetaTable) checkFieldCanBeIndexed(collMeta pb.CollectionInfo, fieldSchema schemapb.FieldSchema, idxInfo *pb.IndexInfo) error {
for _, f := range collMeta.FieldIndexes {
if f.GetFiledID() == fieldSchema.GetFieldID() {
if info, ok := mt.indexID2Meta[f.GetIndexID()]; ok {
if idxInfo.GetIndexName() != info.GetIndexName() {
return fmt.Errorf(
"creating multiple indexes on same field is not supported, "+
"collection: %s, field: %s, index name: %s, new index name: %s",
collMeta.GetSchema().GetName(), fieldSchema.GetName(),
info.GetIndexName(), idxInfo.GetIndexName())
}
} else {
// TODO: unexpected: what if index id not exist? Meta incomplete.
log.Warn("index meta was incomplete, index id missing in indexID2Meta",
zap.String("collection", collMeta.GetSchema().GetName()),
zap.String("field", fieldSchema.GetName()),
zap.Int64("collection id", collMeta.GetID()),
zap.Int64("field id", fieldSchema.GetFieldID()),
zap.Int64("index id", f.GetIndexID()))
}
}
}
return nil
}
func (mt *MetaTable) checkFieldIndexDuplicate(collMeta pb.CollectionInfo, fieldSchema schemapb.FieldSchema, idxInfo *pb.IndexInfo) (duplicate bool, err error) {
for _, f := range collMeta.FieldIndexes {
if info, ok := mt.indexID2Meta[f.IndexID]; ok {
if info.IndexName == idxInfo.IndexName {
// the index name must be different for different indexes
if f.FiledID != fieldSchema.FieldID || !EqualKeyPairArray(info.IndexParams, idxInfo.IndexParams) {
return false, fmt.Errorf("index already exists, collection: %s, field: %s, index: %s", collMeta.GetSchema().GetName(), fieldSchema.GetName(), idxInfo.GetIndexName())
}
// same index name, index params, and fieldId
return true, nil
}
}
}
return false, nil
}
// GetNotIndexedSegments return segment ids which have no index
func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID) ([]typeutil.UniqueID, schemapb.FieldSchema, error) {
mt.ddLock.Lock()
defer mt.ddLock.Unlock()
collMeta, err := mt.unlockGetCollectionInfo(collName)
if err != nil {
// error here if collection not found.
return nil, schemapb.FieldSchema{}, err
}
fieldSchema, err := mt.unlockGetFieldSchema(collName, fieldName)
if err != nil {
// error here if field not found.
return nil, fieldSchema, err
}
......@@ -1059,31 +1123,15 @@ func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, id
if idxInfo.IndexParams == nil {
return nil, schemapb.FieldSchema{}, fmt.Errorf("index param is nil")
}
collID, ok := mt.collName2ID[collName]
if !ok {
collID, ok = mt.collAlias2ID[collName]
if !ok {
return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
}
}
collMeta, ok := mt.collID2Meta[collID]
if !ok {
return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
}
dupIdx := false
for _, f := range collMeta.FieldIndexes {
if info, ok := mt.indexID2Meta[f.IndexID]; ok {
if info.IndexName == idxInfo.IndexName {
// the index name must be different for different indexes
if !EqualKeyPairArray(info.IndexParams, idxInfo.IndexParams) || f.FiledID != fieldSchema.FieldID {
return nil, schemapb.FieldSchema{}, fmt.Errorf("index name(%s) has been exist in collectio(%s), field(%s)", info.IndexName, collName, fieldName)
}
if err := mt.checkFieldCanBeIndexed(collMeta, fieldSchema, idxInfo); err != nil {
return nil, schemapb.FieldSchema{}, err
}
// same index name, index params, and fieldId
dupIdx = true
}
}
dupIdx, err := mt.checkFieldIndexDuplicate(collMeta, fieldSchema, idxInfo)
if err != nil {
// error here if index already exists.
return nil, fieldSchema, err
}
// if no same index exist, save new index info to etcd
......@@ -1101,7 +1149,7 @@ func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, id
return nil, schemapb.FieldSchema{}, fmt.Errorf("metaTable GetNotIndexedSegments Marshal collMeta fail key:%s, err:%w", k1, err)
}
k2 := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collID, idx.IndexID)
k2 := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, idx.IndexID)
//k2 := path.Join(IndexMetaPrefix, strconv.FormatInt(idx.IndexID, 10))
v2, err := proto.Marshal(idxInfo)
if err != nil {
......
......@@ -194,6 +194,7 @@ func TestMetaTable(t *testing.T) {
indexID = typeutil.UniqueID(10000)
indexID2 = typeutil.UniqueID(10001)
buildID = typeutil.UniqueID(201)
indexName = "testColl_index_110"
)
rand.Seed(time.Now().UnixNano())
......@@ -266,7 +267,7 @@ func TestMetaTable(t *testing.T) {
}
idxInfo := []*pb.IndexInfo{
{
IndexName: "testColl_index_110",
IndexName: indexName,
IndexID: indexID,
IndexParams: []*commonpb.KeyValuePair{
{
......@@ -424,16 +425,6 @@ func TestMetaTable(t *testing.T) {
},
}
tparams := []*commonpb.KeyValuePair{
{
Key: "field110-k1",
Value: "field110-v1",
},
{
Key: "field110-k2",
Value: "field110-v2",
},
}
idxInfo := &pb.IndexInfo{
IndexName: "field110",
IndexID: 2000,
......@@ -442,11 +433,8 @@ func TestMetaTable(t *testing.T) {
_, _, err := mt.GetNotIndexedSegments("collTest", "field110", idxInfo, nil)
assert.NotNil(t, err)
seg, field, err := mt.GetNotIndexedSegments(collName, "field110", idxInfo, []typeutil.UniqueID{segID, segID2})
assert.Nil(t, err)
assert.Equal(t, 1, len(seg))
assert.Equal(t, segID2, seg[0])
assert.True(t, EqualKeyPairArray(field.TypeParams, tparams))
_, _, err = mt.GetNotIndexedSegments(collName, "field110", idxInfo, []typeutil.UniqueID{segID, segID2})
assert.NotNil(t, err)
params = []*commonpb.KeyValuePair{
{
......@@ -458,21 +446,17 @@ func TestMetaTable(t *testing.T) {
idxInfo.IndexID = 2001
idxInfo.IndexName = "field110-1"
seg, field, err = mt.GetNotIndexedSegments(collName, "field110", idxInfo, []typeutil.UniqueID{segID, segID2})
assert.Nil(t, err)
assert.Equal(t, 2, len(seg))
assert.Equal(t, segID, seg[0])
assert.Equal(t, segID2, seg[1])
assert.True(t, EqualKeyPairArray(field.TypeParams, tparams))
_, _, err = mt.GetNotIndexedSegments(collName, "field110", idxInfo, []typeutil.UniqueID{segID, segID2})
assert.NotNil(t, err)
})
wg.Add(1)
t.Run("get index by name", func(t *testing.T) {
defer wg.Done()
_, idx, err := mt.GetIndexByName(collName, "field110")
_, idx, err := mt.GetIndexByName(collName, indexName)
assert.Nil(t, err)
assert.Equal(t, 1, len(idx))
assert.Equal(t, int64(2000), idx[0].IndexID)
assert.Equal(t, idxInfo[0].IndexID, idx[0].IndexID)
params := []*commonpb.KeyValuePair{
{
Key: "field110-i1",
......@@ -506,10 +490,10 @@ func TestMetaTable(t *testing.T) {
wg.Add(1)
t.Run("drop index", func(t *testing.T) {
defer wg.Done()
idx, ok, err := mt.DropIndex(collName, "field110", "field110")
idx, ok, err := mt.DropIndex(collName, "field110", indexName)
assert.Nil(t, err)
assert.True(t, ok)
assert.Equal(t, int64(2000), idx)
assert.Equal(t, idxInfo[0].IndexID, idx)
_, ok, err = mt.DropIndex(collName, "field110", "field110-error")
assert.Nil(t, err)
......@@ -521,8 +505,7 @@ func TestMetaTable(t *testing.T) {
_, idxs, err = mt.GetIndexByName(collName, "field110-1")
assert.Nil(t, err)
assert.Equal(t, len(idxs), 1)
assert.Equal(t, idxs[0].IndexID, int64(2001))
assert.Zero(t, len(idxs))
_, err = mt.GetSegmentIndexInfoByID(segID, -1, "")
assert.NotNil(t, err)
......@@ -1021,7 +1004,6 @@ func TestMetaTable(t *testing.T) {
mt.collName2ID["abc"] = 123
_, _, err = mt.GetNotIndexedSegments("abc", "no-field", idx, nil)
assert.NotNil(t, err)
assert.EqualError(t, err, "collection abc not found")
mockKV.multiSave = func(kvs map[string]string, ts typeutil.Timestamp) error {
return nil
......@@ -1045,39 +1027,13 @@ func TestMetaTable(t *testing.T) {
bakMeta := mt.indexID2Meta
mt.indexID2Meta = make(map[int64]pb.IndexInfo)
_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil)
assert.Nil(t, err)
mt.indexID2Meta = bakMeta
mockTxnKV.multiSave = func(kvs map[string]string) error {
return fmt.Errorf("multi save error")
}
assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil) })
//_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save error")
mockTxnKV.multiSave = func(kvs map[string]string) error {
return nil
}
collInfo.PartitionIDs = nil
collInfo.PartitionNames = nil
collInfo.PartitionCreatedTimestamps = nil
//err = mt.AddCollection(collInfo, ts, idxInfo, nil)
//assert.Nil(t, err)
coll, ok := mt.collID2Meta[collInfo.ID]
assert.True(t, ok)
coll.FieldIndexes = append(coll.FieldIndexes, &pb.FieldIndexInfo{FiledID: coll.FieldIndexes[0].FiledID, IndexID: coll.FieldIndexes[0].IndexID + 1})
mt.collID2Meta[coll.ID] = coll
idx.IndexName = "no-index-2"
mockTxnKV.multiSave = func(kvs map[string]string) error {
return fmt.Errorf("multi save error")
}
assert.Panics(t, func() { mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil) })
//_, _, err = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idx, nil)
//assert.NotNil(t, err)
//assert.EqualError(t, err, "multi save error")
assert.Panics(t, func() {
_, _, _ = mt.GetNotIndexedSegments(collInfo.Schema.Name, collInfo.Schema.Fields[0].Name, idxInfo[0], []UniqueID{10001, 10002})
})
mt.indexID2Meta = bakMeta
})
wg.Add(1)
......@@ -1378,3 +1334,158 @@ func TestMetaTable_GetSegmentIndexInfos(t *testing.T) {
assert.Equal(t, typeutil.UniqueID(6), indexInfos.GetBuildID())
assert.Equal(t, true, indexInfos.GetEnableIndex())
}
func TestMetaTable_unlockGetCollectionInfo(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
mt := &MetaTable{
collName2ID: map[string]typeutil.UniqueID{"test": 100},
collID2Meta: map[typeutil.UniqueID]pb.CollectionInfo{
100: {ID: 100, Schema: &schemapb.CollectionSchema{Name: "test"}},
},
}
info, err := mt.unlockGetCollectionInfo("test")
assert.NoError(t, err)
assert.Equal(t, UniqueID(100), info.ID)
assert.Equal(t, "test", info.GetSchema().GetName())
})
t.Run("collection name not found", func(t *testing.T) {
mt := &MetaTable{collName2ID: nil, collAlias2ID: nil}
_, err := mt.unlockGetCollectionInfo("test")
assert.Error(t, err)
})
t.Run("name found, meta not found", func(t *testing.T) {
mt := &MetaTable{
collName2ID: map[string]typeutil.UniqueID{"test": 100},
collAlias2ID: nil,
collID2Meta: nil,
}
_, err := mt.unlockGetCollectionInfo("test")
assert.Error(t, err)
})
t.Run("alias found, meta not found", func(t *testing.T) {
mt := &MetaTable{
collName2ID: nil,
collAlias2ID: map[string]typeutil.UniqueID{"test": 100},
collID2Meta: nil,
}
_, err := mt.unlockGetCollectionInfo("test")
assert.Error(t, err)
})
}
func TestMetaTable_checkFieldCanBeIndexed(t *testing.T) {
t.Run("field not indexed", func(t *testing.T) {
mt := &MetaTable{}
collMeta := pb.CollectionInfo{
FieldIndexes: []*pb.FieldIndexInfo{{FiledID: 100, IndexID: 200}},
}
fieldSchema := schemapb.FieldSchema{
FieldID: 101,
}
idxInfo := &pb.IndexInfo{}
err := mt.checkFieldCanBeIndexed(collMeta, fieldSchema, idxInfo)
assert.NoError(t, err)
})
t.Run("field already indexed", func(t *testing.T) {
mt := &MetaTable{
indexID2Meta: map[typeutil.UniqueID]pb.IndexInfo{
200: {IndexID: 200, IndexName: "test"},
},
}
collMeta := pb.CollectionInfo{
Schema: &schemapb.CollectionSchema{Name: "test"},
FieldIndexes: []*pb.FieldIndexInfo{{FiledID: 100, IndexID: 200}},
}
fieldSchema := schemapb.FieldSchema{Name: "test", FieldID: 100}
idxInfo := &pb.IndexInfo{IndexName: "not_test"}
err := mt.checkFieldCanBeIndexed(collMeta, fieldSchema, idxInfo)
assert.Error(t, err)
})
t.Run("unexpected", func(t *testing.T) {
mt := &MetaTable{
// index meta incomplete.
indexID2Meta: map[typeutil.UniqueID]pb.IndexInfo{},
}
collMeta := pb.CollectionInfo{
Schema: &schemapb.CollectionSchema{Name: "test"},
ID: 1000,
FieldIndexes: []*pb.FieldIndexInfo{{FiledID: 100, IndexID: 200}},
}
fieldSchema := schemapb.FieldSchema{Name: "test", FieldID: 100}
idxInfo := &pb.IndexInfo{IndexName: "not_test"}
err := mt.checkFieldCanBeIndexed(collMeta, fieldSchema, idxInfo)
assert.NoError(t, err)
})
}
func TestMetaTable_checkFieldIndexDuplicate(t *testing.T) {
t.Run("index already exists", func(t *testing.T) {
mt := &MetaTable{
indexID2Meta: map[typeutil.UniqueID]pb.IndexInfo{
200: {IndexID: 200, IndexName: "test"},
},
}
collMeta := pb.CollectionInfo{
Schema: &schemapb.CollectionSchema{Name: "test"},
FieldIndexes: []*pb.FieldIndexInfo{{FiledID: 100, IndexID: 200}},
}
fieldSchema := schemapb.FieldSchema{Name: "test", FieldID: 101}
idxInfo := &pb.IndexInfo{IndexName: "test"}
_, err := mt.checkFieldIndexDuplicate(collMeta, fieldSchema, idxInfo)
assert.Error(t, err)
})
t.Run("index parameters mismatch", func(t *testing.T) {
mt := &MetaTable{
indexID2Meta: map[typeutil.UniqueID]pb.IndexInfo{
200: {IndexID: 200, IndexName: "test",
IndexParams: []*commonpb.KeyValuePair{{Key: "Key", Value: "Value"}}},
},
}
collMeta := pb.CollectionInfo{
Schema: &schemapb.CollectionSchema{Name: "test"},
FieldIndexes: []*pb.FieldIndexInfo{{FiledID: 100, IndexID: 200}},
}
fieldSchema := schemapb.FieldSchema{Name: "test", FieldID: 100}
idxInfo := &pb.IndexInfo{IndexName: "test", IndexParams: []*commonpb.KeyValuePair{{Key: "Key", Value: "not_Value"}}}
_, err := mt.checkFieldIndexDuplicate(collMeta, fieldSchema, idxInfo)
assert.Error(t, err)
})
t.Run("index parameters match", func(t *testing.T) {
mt := &MetaTable{
indexID2Meta: map[typeutil.UniqueID]pb.IndexInfo{
200: {IndexID: 200, IndexName: "test",
IndexParams: []*commonpb.KeyValuePair{{Key: "Key", Value: "Value"}}},
},
}
collMeta := pb.CollectionInfo{
Schema: &schemapb.CollectionSchema{Name: "test"},
FieldIndexes: []*pb.FieldIndexInfo{{FiledID: 100, IndexID: 200}},
}
fieldSchema := schemapb.FieldSchema{Name: "test", FieldID: 100}
idxInfo := &pb.IndexInfo{IndexName: "test", IndexParams: []*commonpb.KeyValuePair{{Key: "Key", Value: "Value"}}}
duplicate, err := mt.checkFieldIndexDuplicate(collMeta, fieldSchema, idxInfo)
assert.NoError(t, err)
assert.True(t, duplicate)
})
t.Run("field not found", func(t *testing.T) {
mt := &MetaTable{}
collMeta := pb.CollectionInfo{
FieldIndexes: []*pb.FieldIndexInfo{{FiledID: 100, IndexID: 200}},
}
fieldSchema := schemapb.FieldSchema{
FieldID: 101,
}
idxInfo := &pb.IndexInfo{}
duplicate, err := mt.checkFieldIndexDuplicate(collMeta, fieldSchema, idxInfo)
assert.NoError(t, err)
assert.False(t, duplicate)
})
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册