meta_table.go 42.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package rootcoord
13 14

import (
S
sunby 已提交
15
	"fmt"
Z
zhenshan.cao 已提交
16
	"path"
17 18 19 20
	"strconv"
	"sync"

	"github.com/golang/protobuf/proto"
B
bigsheeper 已提交
21 22
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
23 24 25 26 27
	"github.com/milvus-io/milvus/internal/kv"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
28
	"github.com/milvus-io/milvus/internal/util/typeutil"
29 30
)

Z
zhenshan.cao 已提交
31
const (
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
	// ComponentPrefix prefix for rootcoord component
	ComponentPrefix = "root-coord"

	// TenantMetaPrefix prefix for tenant meta
	TenantMetaPrefix = ComponentPrefix + "/tenant"

	// ProxyMetaPrefix prefix for proxy meta
	ProxyMetaPrefix = ComponentPrefix + "/proxy"

	// CollectionMetaPrefix prefix for collection meta
	CollectionMetaPrefix = ComponentPrefix + "/collection"

	// SegmentIndexMetaPrefix prefix for segment index meta
	SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index"

	// IndexMetaPrefix prefix for index meta
	IndexMetaPrefix = ComponentPrefix + "/index"

	// CollectionAliasMetaPrefix prefix for collection alias meta
Y
Yusup 已提交
51
	CollectionAliasMetaPrefix = ComponentPrefix + "/collection-alias"
52

53
	// TimestampPrefix prefix for timestamp
54 55
	TimestampPrefix = ComponentPrefix + "/timestamp"

56
	// DDOperationPrefix prefix for DD operation
57
	DDOperationPrefix = ComponentPrefix + "/dd-operation"
58

59 60 61 62
	// DDMsgSendPrefix prefix to indicate whether DD msg has been send
	DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send"

	// CreateCollectionDDType name of DD type for create collection
63
	CreateCollectionDDType = "CreateCollection"
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81

	// DropCollectionDDType name of DD type for drop collection
	DropCollectionDDType = "DropCollection"

	// CreatePartitionDDType name of DD type for create partition
	CreatePartitionDDType = "CreatePartition"

	// DropPartitionDDType name of DD type for drop partition
	DropPartitionDDType = "DropPartition"

	// CreateAliasDDType name of DD type for create collection alias
	CreateAliasDDType = "CreateAlias"

	// DropAliasDDType name of DD type for drop collection alias
	DropAliasDDType = "DropAlias"

	// AlterAliasDDType name of DD type for alter collection alias
	AlterAliasDDType = "AlterAlias"
Z
zhenshan.cao 已提交
82 83
)

84 85
// MetaTable store all rootcoord meta info
type MetaTable struct {
86 87 88 89 90
	client          kv.SnapShotKV                                                   // client of a reliable kv service, i.e. etcd client
	tenantID2Meta   map[typeutil.UniqueID]pb.TenantMeta                             // tenant id to tenant meta
	proxyID2Meta    map[typeutil.UniqueID]pb.ProxyMeta                              // proxy id to proxy meta
	collID2Meta     map[typeutil.UniqueID]pb.CollectionInfo                         // collection_id -> meta
	collName2ID     map[string]typeutil.UniqueID                                    // collection name to collection id
Y
Yusup 已提交
91
	collAlias2ID    map[string]typeutil.UniqueID                                    // collection alias to collection id
92
	partID2SegID    map[typeutil.UniqueID]map[typeutil.UniqueID]bool                // partition_id -> segment_id -> bool
93 94
	segID2IndexMeta map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo // collection_id/index_id/partition_id/segment_id -> meta
	indexID2Meta    map[typeutil.UniqueID]pb.IndexInfo                              // collection_id/index_id -> meta
95 96 97 98 99 100

	tenantLock sync.RWMutex
	proxyLock  sync.RWMutex
	ddLock     sync.RWMutex
}

101 102 103 104
// NewMetaTable create meta table for rootcoord, which stores all in-memory information
// for collection, partion, segment, index etc.
func NewMetaTable(kv kv.SnapShotKV) (*MetaTable, error) {
	mt := &MetaTable{
105 106 107 108 109 110 111 112 113 114 115 116
		client:     kv,
		tenantLock: sync.RWMutex{},
		proxyLock:  sync.RWMutex{},
		ddLock:     sync.RWMutex{},
	}
	err := mt.reloadFromKV()
	if err != nil {
		return nil, err
	}
	return mt, nil
}

117
func (mt *MetaTable) reloadFromKV() error {
118 119
	mt.tenantID2Meta = make(map[typeutil.UniqueID]pb.TenantMeta)
	mt.proxyID2Meta = make(map[typeutil.UniqueID]pb.ProxyMeta)
Z
zhenshan.cao 已提交
120
	mt.collID2Meta = make(map[typeutil.UniqueID]pb.CollectionInfo)
121
	mt.collName2ID = make(map[string]typeutil.UniqueID)
Y
Yusup 已提交
122
	mt.collAlias2ID = make(map[string]typeutil.UniqueID)
123
	mt.partID2SegID = make(map[typeutil.UniqueID]map[typeutil.UniqueID]bool)
124
	mt.segID2IndexMeta = make(map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo)
Z
zhenshan.cao 已提交
125
	mt.indexID2Meta = make(map[typeutil.UniqueID]pb.IndexInfo)
126

127
	_, values, err := mt.client.LoadWithPrefix(TenantMetaPrefix, 0)
128 129 130 131 132 133
	if err != nil {
		return err
	}

	for _, value := range values {
		tenantMeta := pb.TenantMeta{}
134
		err := proto.Unmarshal([]byte(value), &tenantMeta)
135
		if err != nil {
136
			return fmt.Errorf("RootCoord Unmarshal pb.TenantMeta err:%w", err)
137 138 139 140
		}
		mt.tenantID2Meta[tenantMeta.ID] = tenantMeta
	}

141
	_, values, err = mt.client.LoadWithPrefix(ProxyMetaPrefix, 0)
142 143 144 145 146 147
	if err != nil {
		return err
	}

	for _, value := range values {
		proxyMeta := pb.ProxyMeta{}
148
		err = proto.Unmarshal([]byte(value), &proxyMeta)
149
		if err != nil {
150
			return fmt.Errorf("RootCoord Unmarshal pb.ProxyMeta err:%w", err)
151 152 153 154
		}
		mt.proxyID2Meta[proxyMeta.ID] = proxyMeta
	}

155
	_, values, err = mt.client.LoadWithPrefix(CollectionMetaPrefix, 0)
156 157 158 159 160
	if err != nil {
		return err
	}

	for _, value := range values {
161
		collInfo := pb.CollectionInfo{}
162
		err = proto.Unmarshal([]byte(value), &collInfo)
163
		if err != nil {
164
			return fmt.Errorf("RootCoord Unmarshal pb.CollectionInfo err:%w", err)
165
		}
166 167
		mt.collID2Meta[collInfo.ID] = collInfo
		mt.collName2ID[collInfo.Schema.Name] = collInfo.ID
168 169
	}

170
	_, values, err = mt.client.LoadWithPrefix(SegmentIndexMetaPrefix, 0)
171 172 173
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
174 175
	for _, value := range values {
		segmentIndexInfo := pb.SegmentIndexInfo{}
176
		err = proto.Unmarshal([]byte(value), &segmentIndexInfo)
177
		if err != nil {
178
			return fmt.Errorf("RootCoord Unmarshal pb.SegmentIndexInfo err:%w", err)
179
		}
180 181 182 183 184 185 186 187 188 189 190 191

		// update partID2SegID
		segIDMap, ok := mt.partID2SegID[segmentIndexInfo.PartitionID]
		if ok {
			segIDMap[segmentIndexInfo.SegmentID] = true
		} else {
			idMap := make(map[typeutil.UniqueID]bool)
			idMap[segmentIndexInfo.SegmentID] = true
			mt.partID2SegID[segmentIndexInfo.PartitionID] = idMap
		}

		// update segID2IndexMeta
Z
zhenshan.cao 已提交
192
		idx, ok := mt.segID2IndexMeta[segmentIndexInfo.SegmentID]
193
		if ok {
194
			idx[segmentIndexInfo.IndexID] = segmentIndexInfo
Z
zhenshan.cao 已提交
195 196 197
		} else {
			meta := make(map[typeutil.UniqueID]pb.SegmentIndexInfo)
			meta[segmentIndexInfo.IndexID] = segmentIndexInfo
198
			mt.segID2IndexMeta[segmentIndexInfo.SegmentID] = meta
199 200 201
		}
	}

202
	_, values, err = mt.client.LoadWithPrefix(IndexMetaPrefix, 0)
Z
zhenshan.cao 已提交
203 204
	if err != nil {
		return err
205
	}
Z
zhenshan.cao 已提交
206 207
	for _, value := range values {
		meta := pb.IndexInfo{}
208
		err = proto.Unmarshal([]byte(value), &meta)
Z
zhenshan.cao 已提交
209
		if err != nil {
210
			return fmt.Errorf("RootCoord Unmarshal pb.IndexInfo err:%w", err)
211
		}
Z
zhenshan.cao 已提交
212
		mt.indexID2Meta[meta.IndexID] = meta
213 214
	}

Y
Yusup 已提交
215 216 217 218 219 220
	_, values, err = mt.client.LoadWithPrefix(CollectionAliasMetaPrefix, 0)
	if err != nil {
		return err
	}
	for _, value := range values {
		aliasInfo := pb.CollectionInfo{}
221
		err = proto.Unmarshal([]byte(value), &aliasInfo)
Y
Yusup 已提交
222
		if err != nil {
223
			return fmt.Errorf("RootCoord Unmarshal pb.AliasInfo err:%w", err)
Y
Yusup 已提交
224 225 226 227
		}
		mt.collAlias2ID[aliasInfo.Schema.Name] = aliasInfo.ID
	}

228
	log.Debug("reload meta table from KV successfully")
Z
zhenshan.cao 已提交
229
	return nil
230 231
}

232
func (mt *MetaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error), meta map[string]string) func(ts typeutil.Timestamp) (string, string, error) {
N
neza2017 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245
	if op == nil {
		return nil
	}
	meta[DDMsgSendPrefix] = "false"
	return func(ts typeutil.Timestamp) (string, string, error) {
		val, err := op(ts)
		if err != nil {
			return "", "", err
		}
		return DDOperationPrefix, val, nil
	}
}

246 247
// AddTenant add tenant
func (mt *MetaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error {
N
neza2017 已提交
248 249 250 251
	mt.tenantLock.Lock()
	defer mt.tenantLock.Unlock()

	k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID)
252 253 254 255 256
	v, err := proto.Marshal(te)
	if err != nil {
		log.Error("AddTenant Marshal fail", zap.Error(err))
		return err
	}
N
neza2017 已提交
257

258
	err = mt.client.Save(k, string(v), ts)
259
	if err != nil {
260 261
		log.Error("AddTenant Save fail", zap.Error(err))
		return err
N
neza2017 已提交
262 263
	}
	mt.tenantID2Meta[te.ID] = *te
264
	return nil
N
neza2017 已提交
265 266
}

267 268
// AddProxy add proxy
func (mt *MetaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error {
N
neza2017 已提交
269 270 271 272
	mt.proxyLock.Lock()
	defer mt.proxyLock.Unlock()

	k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID)
273 274 275 276 277
	v, err := proto.Marshal(po)
	if err != nil {
		log.Error("AddProxy Marshal fail", zap.Error(err))
		return err
	}
N
neza2017 已提交
278

279
	err = mt.client.Save(k, string(v), ts)
280
	if err != nil {
281 282
		log.Error("SnapShotKV Save fail", zap.Error(err))
		panic("SnapShotKV Save fail")
N
neza2017 已提交
283 284
	}
	mt.proxyID2Meta[po.ID] = *po
285
	return nil
N
neza2017 已提交
286 287
}

288 289
// AddCollection add collection
func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
290 291
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
Z
zhenshan.cao 已提交
292

293 294
	if len(coll.PartitionIDs) != len(coll.PartitionNames) ||
		len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) ||
295
		(len(coll.PartitionIDs) != 1 && len(coll.PartitionIDs) != 0) {
296
		return fmt.Errorf("PartitionIDs, PartitionNames and PartitionCreatedTimestmaps' length mis-match when creating collection")
297
	}
298
	if _, ok := mt.collName2ID[coll.Schema.Name]; ok {
299
		return fmt.Errorf("collection %s exist", coll.Schema.Name)
300
	}
N
neza2017 已提交
301
	if len(coll.FieldIndexes) != len(idx) {
302
		return fmt.Errorf("incorrect index id when creating collection")
N
neza2017 已提交
303
	}
304

N
neza2017 已提交
305 306 307
	for _, i := range idx {
		mt.indexID2Meta[i.IndexID] = *i
	}
Z
zhenshan.cao 已提交
308

309
	meta := make(map[string]string)
Z
zhenshan.cao 已提交
310

N
neza2017 已提交
311
	for _, i := range idx {
N
neza2017 已提交
312
		k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID)
313 314 315 316 317 318
		v, err := proto.Marshal(i)
		if err != nil {
			log.Error("MetaTable AddCollection Marshal fail", zap.String("key", k),
				zap.String("IndexName", i.IndexName), zap.Error(err))
			return fmt.Errorf("MetaTable AddCollection Marshal fail key:%s, err:%w", k, err)
		}
319
		meta[k] = string(v)
N
neza2017 已提交
320 321
	}

322
	// save ddOpStr into etcd
N
neza2017 已提交
323
	addition := mt.getAdditionKV(ddOpStr, meta)
324 325 326 327 328 329 330 331
	saveColl := func(ts typeutil.Timestamp) (string, string, error) {
		coll.CreateTime = ts
		if len(coll.PartitionCreatedTimestamps) == 1 {
			coll.PartitionCreatedTimestamps[0] = ts
		}
		mt.collID2Meta[coll.ID] = *coll
		mt.collName2ID[coll.Schema.Name] = coll.ID
		k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID)
332 333 334 335 336 337
		v1, err := proto.Marshal(coll)
		if err != nil {
			log.Error("MetaTable AddCollection saveColl Marshal fail",
				zap.String("key", k1), zap.Error(err))
			return "", "", fmt.Errorf("MetaTable AddCollection saveColl Marshal fail key:%s, err:%w", k1, err)
		}
338 339
		meta[k1] = string(v1)
		return k1, string(v1), nil
340 341
	}

342
	err := mt.client.MultiSave(meta, ts, addition, saveColl)
343
	if err != nil {
344 345
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
346
	}
347

348
	return nil
349 350
}

351 352
// DeleteCollection delete collection
func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
353 354 355 356 357
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
358
		return fmt.Errorf("can't find collection. id = %d", collID)
359 360
	}

Z
zhenshan.cao 已提交
361 362
	delete(mt.collID2Meta, collID)
	delete(mt.collName2ID, collMeta.Schema.Name)
363 364 365 366 367 368 369

	// update segID2IndexMeta
	for partID := range collMeta.PartitionIDs {
		if segIDMap, ok := mt.partID2SegID[typeutil.UniqueID(partID)]; ok {
			for segID := range segIDMap {
				delete(mt.segID2IndexMeta, segID)
			}
Z
zhenshan.cao 已提交
370 371
		}
	}
372 373 374 375 376 377

	// update partID2SegID
	for partID := range collMeta.PartitionIDs {
		delete(mt.partID2SegID, typeutil.UniqueID(partID))
	}

N
neza2017 已提交
378 379 380 381 382 383 384 385
	for _, idxInfo := range collMeta.FieldIndexes {
		_, ok := mt.indexID2Meta[idxInfo.IndexID]
		if !ok {
			log.Warn("index id not exist", zap.Int64("index id", idxInfo.IndexID))
			continue
		}
		delete(mt.indexID2Meta, idxInfo.IndexID)
	}
Y
Yusup 已提交
386 387 388 389 390 391 392
	var aliases []string
	// delete collection aliases
	for alias, cid := range mt.collAlias2ID {
		if cid == collID {
			aliases = append(aliases, alias)
		}
	}
393

394
	delMetakeys := []string{
N
neza2017 已提交
395 396 397 398
		fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID),
		fmt.Sprintf("%s/%d", SegmentIndexMetaPrefix, collID),
		fmt.Sprintf("%s/%d", IndexMetaPrefix, collID),
	}
399

Y
Yusup 已提交
400 401 402 403 404 405 406
	for _, alias := range aliases {
		delete(mt.collAlias2ID, alias)
		delMetakeys = append(delMetakeys,
			fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias),
		)
	}

407
	// save ddOpStr into etcd
N
neza2017 已提交
408 409
	var saveMeta = map[string]string{}
	addition := mt.getAdditionKV(ddOpStr, saveMeta)
410
	err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, ts, addition)
411
	if err != nil {
412 413
		log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
414 415
	}

416
	return nil
417 418
}

419 420
// HasCollection return collection existence
func (mt *MetaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool {
421 422
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
423 424 425 426 427 428 429
	if ts == 0 {
		_, ok := mt.collID2Meta[collID]
		return ok
	}
	key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	_, err := mt.client.Load(key, ts)
	return err == nil
430 431
}

432 433
// GetCollectionByID return collection meta by collection id
func (mt *MetaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
N
neza2017 已提交
434 435 436
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

437 438 439 440 441 442 443
	if ts == 0 {
		col, ok := mt.collID2Meta[collectionID]
		if !ok {
			return nil, fmt.Errorf("can't find collection id : %d", collectionID)
		}
		colCopy := proto.Clone(&col)
		return colCopy.(*pb.CollectionInfo), nil
N
neza2017 已提交
444
	}
445 446 447 448 449 450
	key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID)
	val, err := mt.client.Load(key, ts)
	if err != nil {
		return nil, err
	}
	colMeta := pb.CollectionInfo{}
451
	err = proto.Unmarshal([]byte(val), &colMeta)
452 453 454 455
	if err != nil {
		return nil, err
	}
	return &colMeta, nil
N
neza2017 已提交
456 457
}

458 459
// GetCollectionByName return collection meta by collection name
func (mt *MetaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
460 461 462
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

463 464 465
	if ts == 0 {
		vid, ok := mt.collName2ID[collectionName]
		if !ok {
Y
Yusup 已提交
466 467 468
			if vid, ok = mt.collAlias2ID[collectionName]; !ok {
				return nil, fmt.Errorf("can't find collection: " + collectionName)
			}
469 470 471
		}
		col, ok := mt.collID2Meta[vid]
		if !ok {
S
sunby 已提交
472
			return nil, fmt.Errorf("can't find collection %s with id %d", collectionName, vid)
473 474 475
		}
		colCopy := proto.Clone(&col)
		return colCopy.(*pb.CollectionInfo), nil
N
neza2017 已提交
476
	}
477 478 479
	_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
	if err != nil {
		return nil, err
N
neza2017 已提交
480
	}
481 482
	for _, val := range vals {
		collMeta := pb.CollectionInfo{}
483
		err = proto.Unmarshal([]byte(val), &collMeta)
484 485 486 487 488 489 490 491 492
		if err != nil {
			log.Debug("unmarshal collection info failed", zap.Error(err))
			continue
		}
		if collMeta.Schema.Name == collectionName {
			return &collMeta, nil
		}
	}
	return nil, fmt.Errorf("can't find collection: %s, at timestamp = %d", collectionName, ts)
N
neza2017 已提交
493 494
}

495 496
// ListCollections list all collection names
func (mt *MetaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.CollectionInfo, error) {
497 498
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
499
	colls := make(map[string]*pb.CollectionInfo)
500

501
	if ts == 0 {
502 503 504 505
		for collName, collID := range mt.collName2ID {
			coll := mt.collID2Meta[collID]
			colCopy := proto.Clone(&coll)
			colls[collName] = colCopy.(*pb.CollectionInfo)
N
neza2017 已提交
506 507
		}
		return colls, nil
508 509 510
	}
	_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
	if err != nil {
N
neza2017 已提交
511
		log.Debug("load with prefix error", zap.Uint64("timestamp", ts), zap.Error(err))
512
		return nil, nil
513 514 515
	}
	for _, val := range vals {
		collMeta := pb.CollectionInfo{}
516
		err := proto.Unmarshal([]byte(val), &collMeta)
517 518 519
		if err != nil {
			log.Debug("unmarshal collection info failed", zap.Error(err))
		}
520
		colls[collMeta.Schema.Name] = &collMeta
521 522 523 524
	}
	return colls, nil
}

525 526
// ListAliases list all collection aliases
func (mt *MetaTable) ListAliases(collID typeutil.UniqueID) []string {
527 528 529 530 531 532 533 534 535 536 537
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	var aliases []string
	for alias, cid := range mt.collAlias2ID {
		if cid == collID {
			aliases = append(aliases, alias)
		}
	}
	return aliases
}

538 539
// ListCollectionVirtualChannels list virtual channels of all collections
func (mt *MetaTable) ListCollectionVirtualChannels() []string {
540 541 542 543 544 545 546 547 548 549
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	vlist := []string{}

	for _, c := range mt.collID2Meta {
		vlist = append(vlist, c.VirtualChannelNames...)
	}
	return vlist
}

550 551
// ListCollectionPhysicalChannels list physical channels of all collections
func (mt *MetaTable) ListCollectionPhysicalChannels() []string {
552 553 554 555 556 557 558 559 560 561
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	plist := []string{}

	for _, c := range mt.collID2Meta {
		plist = append(plist, c.PhysicalChannelNames...)
	}
	return plist
}

562 563
// AddPartition add partition
func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
564 565 566 567
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	coll, ok := mt.collID2Meta[collID]
	if !ok {
568
		return fmt.Errorf("can't find collection. id = %d", collID)
569 570 571
	}

	// number of partition tags (except _default) should be limited to 4096 by default
N
neza2017 已提交
572
	if int64(len(coll.PartitionIDs)) >= Params.MaxPartitionNum {
573
		return fmt.Errorf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)
574
	}
575

576
	if len(coll.PartitionIDs) != len(coll.PartitionNames) {
577
		return fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionNames)=%d", len(coll.PartitionIDs), len(coll.PartitionNames))
578 579
	}

580
	if len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) {
581
		return fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionIDs), len(coll.PartitionCreatedTimestamps))
582 583 584
	}

	if len(coll.PartitionNames) != len(coll.PartitionCreatedTimestamps) {
585
		return fmt.Errorf("len(coll.PartitionNames)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionNames), len(coll.PartitionCreatedTimestamps))
586 587
	}

588 589
	for idx := range coll.PartitionIDs {
		if coll.PartitionIDs[idx] == partitionID {
590
			return fmt.Errorf("partition id = %d already exists", partitionID)
Z
zhenshan.cao 已提交
591
		}
592
		if coll.PartitionNames[idx] == partitionName {
593
			return fmt.Errorf("partition name = %s already exists", partitionName)
594
		}
595
		// no necessary to check created timestamp
596
	}
597
	meta := make(map[string]string)
Z
zhenshan.cao 已提交
598

599
	// save ddOpStr into etcd
N
neza2017 已提交
600
	addition := mt.getAdditionKV(ddOpStr, meta)
601

602 603 604 605 606 607 608
	saveColl := func(ts typeutil.Timestamp) (string, string, error) {
		coll.PartitionIDs = append(coll.PartitionIDs, partitionID)
		coll.PartitionNames = append(coll.PartitionNames, partitionName)
		coll.PartitionCreatedTimestamps = append(coll.PartitionCreatedTimestamps, ts)
		mt.collID2Meta[collID] = coll

		k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
609 610 611 612 613 614
		v1, err := proto.Marshal(&coll)
		if err != nil {
			log.Error("MetaTable AddPartition saveColl Marshal fail",
				zap.String("key", k1), zap.Error(err))
			return "", "", fmt.Errorf("MetaTable AddPartition saveColl Marshal fail, k1:%s, err:%w", k1, err)
		}
615
		meta[k1] = string(v1)
616

617
		return k1, string(v1), nil
618 619
	}

620
	err := mt.client.MultiSave(meta, ts, addition, saveColl)
621
	if err != nil {
622 623
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
624
	}
625
	return nil
626 627
}

628 629
// GetPartitionNameByID return partition name by partition id
func (mt *MetaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (string, error) {
630
	if ts == 0 {
631 632
		mt.ddLock.RLock()
		defer mt.ddLock.RUnlock()
633 634
		collMeta, ok := mt.collID2Meta[collID]
		if !ok {
635
			return "", fmt.Errorf("can't find collection id = %d", collID)
636
		}
637 638
		for idx := range collMeta.PartitionIDs {
			if collMeta.PartitionIDs[idx] == partitionID {
639
				return collMeta.PartitionNames[idx], nil
640 641
			}
		}
642
		return "", fmt.Errorf("partition %d does not exist", partitionID)
643 644 645 646
	}
	collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	collVal, err := mt.client.Load(collKey, ts)
	if err != nil {
647
		return "", err
648
	}
649
	collMeta := pb.CollectionInfo{}
650
	err = proto.Unmarshal([]byte(collVal), &collMeta)
651
	if err != nil {
652
		return "", err
653
	}
654 655
	for idx := range collMeta.PartitionIDs {
		if collMeta.PartitionIDs[idx] == partitionID {
656
			return collMeta.PartitionNames[idx], nil
657
		}
658 659 660 661
	}
	return "", fmt.Errorf("partition %d does not exist", partitionID)
}

662
func (mt *MetaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
663 664 665 666
	if ts == 0 {
		collMeta, ok := mt.collID2Meta[collID]
		if !ok {
			return 0, fmt.Errorf("can't find collection id = %d", collID)
667
		}
668
		for idx := range collMeta.PartitionIDs {
669
			if collMeta.PartitionNames[idx] == partitionName {
670 671 672 673 674 675 676 677 678 679 680
				return collMeta.PartitionIDs[idx], nil
			}
		}
		return 0, fmt.Errorf("partition %s does not exist", partitionName)
	}
	collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	collVal, err := mt.client.Load(collKey, ts)
	if err != nil {
		return 0, err
	}
	collMeta := pb.CollectionInfo{}
681
	err = proto.Unmarshal([]byte(collVal), &collMeta)
682 683 684 685
	if err != nil {
		return 0, err
	}
	for idx := range collMeta.PartitionIDs {
686
		if collMeta.PartitionNames[idx] == partitionName {
687
			return collMeta.PartitionIDs[idx], nil
688 689
		}
	}
690
	return 0, fmt.Errorf("partition %s does not exist", partitionName)
691 692
}

693 694
// GetPartitionByName return partition id by partition name
func (mt *MetaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
695 696
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
697
	return mt.getPartitionByName(collID, partitionName, ts)
698 699
}

700 701
// HasPartition check partition existence
func (mt *MetaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool {
702 703
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
704
	_, err := mt.getPartitionByName(collID, partitionName, ts)
705 706 707
	return err == nil
}

708 709
// DeletePartition delete partition
func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.UniqueID, error) {
710 711 712
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

Z
zhenshan.cao 已提交
713
	if partitionName == Params.DefaultPartitionName {
714
		return 0, fmt.Errorf("default partition cannot be deleted")
715 716 717 718
	}

	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
719
		return 0, fmt.Errorf("can't find collection id = %d", collID)
720 721 722 723 724 725
	}

	// check tag exists
	exist := false

	pd := make([]typeutil.UniqueID, 0, len(collMeta.PartitionIDs))
726
	pn := make([]string, 0, len(collMeta.PartitionNames))
727
	pts := make([]uint64, 0, len(collMeta.PartitionCreatedTimestamps))
728 729
	var partID typeutil.UniqueID
	for idx := range collMeta.PartitionIDs {
730
		if collMeta.PartitionNames[idx] == partitionName {
731 732 733 734
			partID = collMeta.PartitionIDs[idx]
			exist = true
		} else {
			pd = append(pd, collMeta.PartitionIDs[idx])
735
			pn = append(pn, collMeta.PartitionNames[idx])
736
			pts = append(pts, collMeta.PartitionCreatedTimestamps[idx])
737 738 739
		}
	}
	if !exist {
740
		return 0, fmt.Errorf("partition %s does not exist", partitionName)
741
	}
Z
zhenshan.cao 已提交
742
	collMeta.PartitionIDs = pd
743
	collMeta.PartitionNames = pn
744
	collMeta.PartitionCreatedTimestamps = pts
Z
zhenshan.cao 已提交
745
	mt.collID2Meta[collID] = collMeta
746

747 748 749 750
	// update segID2IndexMeta and partID2SegID
	if segIDMap, ok := mt.partID2SegID[partID]; ok {
		for segID := range segIDMap {
			delete(mt.segID2IndexMeta, segID)
751 752
		}
	}
753 754
	delete(mt.partID2SegID, partID)

755
	k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10))
756 757 758 759 760 761
	v, err := proto.Marshal(&collMeta)
	if err != nil {
		log.Error("MetaTable DeletePartition Marshal collectionMeta fail",
			zap.String("key", k), zap.Error(err))
		return 0, fmt.Errorf("MetaTable DeletePartition Marshal collectionMeta fail key:%s, err:%w", k, err)
	}
762
	meta := map[string]string{k: string(v)}
763
	var delMetaKeys []string
N
neza2017 已提交
764
	for _, idxInfo := range collMeta.FieldIndexes {
765
		k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partID)
N
neza2017 已提交
766 767 768
		delMetaKeys = append(delMetaKeys, k)
	}

769
	// save ddOpStr into etcd
N
neza2017 已提交
770
	addition := mt.getAdditionKV(ddOpStr, meta)
771

772
	err = mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, ts, addition)
773
	if err != nil {
774 775
		log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
776
	}
777
	return partID, nil
778 779
}

780 781
// AddIndex add index
func (mt *MetaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Timestamp) error {
782 783
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
784

785
	collMeta, ok := mt.collID2Meta[segIdxInfo.CollectionID]
786
	if !ok {
787
		return fmt.Errorf("collection id = %d not found", segIdxInfo.CollectionID)
788
	}
789 790 791 792 793 794
	exist := false
	for _, fidx := range collMeta.FieldIndexes {
		if fidx.IndexID == segIdxInfo.IndexID {
			exist = true
			break
		}
795
	}
796
	if !exist {
797
		return fmt.Errorf("index id = %d not found", segIdxInfo.IndexID)
798
	}
799

800 801 802 803
	segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID]
	if !ok {
		idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{segIdxInfo.IndexID: *segIdxInfo}
		mt.segID2IndexMeta[segIdxInfo.SegmentID] = idxMap
804 805 806

		segIDMap := map[typeutil.UniqueID]bool{segIdxInfo.SegmentID: true}
		mt.partID2SegID[segIdxInfo.PartitionID] = segIDMap
807 808 809 810 811
	} else {
		tmpInfo, ok := segIdxMap[segIdxInfo.IndexID]
		if ok {
			if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) {
				if segIdxInfo.BuildID == tmpInfo.BuildID {
812
					log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID))
813
					return nil
814
				}
815
				return fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
816
			}
817 818 819
		}
	}

820
	mt.segID2IndexMeta[segIdxInfo.SegmentID][segIdxInfo.IndexID] = *segIdxInfo
821 822
	mt.partID2SegID[segIdxInfo.PartitionID][segIdxInfo.SegmentID] = true

823
	k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, segIdxInfo.CollectionID, segIdxInfo.IndexID, segIdxInfo.PartitionID, segIdxInfo.SegmentID)
824 825 826 827 828 829
	v, err := proto.Marshal(segIdxInfo)
	if err != nil {
		log.Error("MetaTable AddIndex Marshal segIdxInfo fail",
			zap.String("key", k), zap.Error(err))
		return fmt.Errorf("MetaTable AddIndex Marshal segIdxInfo fail key:%s, err:%w", k, err)
	}
N
neza2017 已提交
830

831
	err = mt.client.Save(k, string(v), ts)
N
neza2017 已提交
832
	if err != nil {
833 834
		log.Error("SnapShotKV Save fail", zap.Error(err))
		panic("SnapShotKV Save fail")
N
neza2017 已提交
835
	}
836

837
	return nil
N
neza2017 已提交
838 839
}

840 841
// DropIndex drop index
func (mt *MetaTable) DropIndex(collName, fieldName, indexName string, ts typeutil.Timestamp) (typeutil.UniqueID, bool, error) {
N
neza2017 已提交
842 843 844 845 846
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	collID, ok := mt.collName2ID[collName]
	if !ok {
Y
Yusup 已提交
847 848 849 850
		collID, ok = mt.collAlias2ID[collName]
		if !ok {
			return 0, false, fmt.Errorf("collection name = %s not exist", collName)
		}
N
neza2017 已提交
851 852 853
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
854
		return 0, false, fmt.Errorf("collection name  = %s not has meta", collName)
N
neza2017 已提交
855 856 857
	}
	fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName)
	if err != nil {
858
		return 0, false, err
N
neza2017 已提交
859 860 861 862 863 864 865 866 867 868 869
	}
	fieldIdxInfo := make([]*pb.FieldIndexInfo, 0, len(collMeta.FieldIndexes))
	var dropIdxID typeutil.UniqueID
	for i, info := range collMeta.FieldIndexes {
		if info.FiledID != fieldSch.FieldID {
			fieldIdxInfo = append(fieldIdxInfo, info)
			continue
		}
		idxMeta, ok := mt.indexID2Meta[info.IndexID]
		if !ok {
			fieldIdxInfo = append(fieldIdxInfo, info)
N
neza2017 已提交
870
			log.Warn("index id not has meta", zap.Int64("index id", info.IndexID))
N
neza2017 已提交
871 872 873 874 875 876 877 878 879 880 881
			continue
		}
		if idxMeta.IndexName != indexName {
			fieldIdxInfo = append(fieldIdxInfo, info)
			continue
		}
		dropIdxID = info.IndexID
		fieldIdxInfo = append(fieldIdxInfo, collMeta.FieldIndexes[i+1:]...)
		break
	}
	if len(fieldIdxInfo) == len(collMeta.FieldIndexes) {
N
neza2017 已提交
882
		log.Warn("drop index,index not found", zap.String("collection name", collName), zap.String("filed name", fieldName), zap.String("index name", indexName))
883
		return 0, false, nil
N
neza2017 已提交
884 885 886
	}
	collMeta.FieldIndexes = fieldIdxInfo
	mt.collID2Meta[collID] = collMeta
887
	k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10))
888 889 890 891 892 893
	v, err := proto.Marshal(&collMeta)
	if err != nil {
		log.Error("MetaTable DropIndex Marshal collMeta fail",
			zap.String("key", k), zap.Error(err))
		return 0, false, fmt.Errorf("MetaTable DropIndex Marshal collMeta fail key:%s, err:%w", k, err)
	}
894
	saveMeta := map[string]string{k: string(v)}
N
neza2017 已提交
895 896 897

	delete(mt.indexID2Meta, dropIdxID)

898 899 900 901 902 903 904
	// update segID2IndexMeta
	for partID := range collMeta.PartitionIDs {
		if segIDMap, ok := mt.partID2SegID[typeutil.UniqueID(partID)]; ok {
			for segID := range segIDMap {
				if segIndexInfos, ok := mt.segID2IndexMeta[segID]; ok {
					delete(segIndexInfos, dropIdxID)
				}
N
neza2017 已提交
905 906 907
			}
		}
	}
908

N
neza2017 已提交
909 910 911 912
	delMeta := []string{
		fmt.Sprintf("%s/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, dropIdxID),
		fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID),
	}
N
neza2017 已提交
913

914
	err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta, ts)
N
neza2017 已提交
915
	if err != nil {
916 917
		log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
N
neza2017 已提交
918 919
	}

920
	return dropIdxID, true, nil
N
neza2017 已提交
921 922
}

923 924
// GetSegmentIndexInfoByID return segment index info by segment id
func (mt *MetaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) {
N
neza2017 已提交
925 926 927 928 929
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	segIdxMap, ok := mt.segID2IndexMeta[segID]
	if !ok {
930 931 932 933 934 935 936
		return pb.SegmentIndexInfo{
			SegmentID:   segID,
			FieldID:     filedID,
			IndexID:     0,
			BuildID:     0,
			EnableIndex: false,
		}, nil
N
neza2017 已提交
937
	}
938
	if len(segIdxMap) == 0 {
S
sunby 已提交
939
		return pb.SegmentIndexInfo{}, fmt.Errorf("segment id %d not has any index", segID)
N
neza2017 已提交
940 941
	}

B
bigsheeper 已提交
942
	if filedID == -1 && idxName == "" { // return default index
943
		for _, seg := range segIdxMap {
B
bigsheeper 已提交
944 945 946 947
			info, ok := mt.indexID2Meta[seg.IndexID]
			if ok && info.IndexName == Params.DefaultIndexName {
				return seg, nil
			}
N
neza2017 已提交
948 949
		}
	} else {
950
		for idxID, seg := range segIdxMap {
N
neza2017 已提交
951 952 953 954 955 956 957 958 959 960 961 962
			idxMeta, ok := mt.indexID2Meta[idxID]
			if ok {
				if idxMeta.IndexName != idxName {
					continue
				}
				if seg.FieldID != filedID {
					continue
				}
				return seg, nil
			}
		}
	}
S
sunby 已提交
963
	return pb.SegmentIndexInfo{}, fmt.Errorf("can't find index name = %s on segment = %d, with filed id = %d", idxName, segID, filedID)
N
neza2017 已提交
964 965
}

966 967
// GetFieldSchema return field schema
func (mt *MetaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
N
neza2017 已提交
968 969 970
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

N
neza2017 已提交
971 972 973
	return mt.unlockGetFieldSchema(collName, fieldName)
}

974
func (mt *MetaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
N
neza2017 已提交
975 976
	collID, ok := mt.collName2ID[collName]
	if !ok {
Y
Yusup 已提交
977 978 979 980
		collID, ok = mt.collAlias2ID[collName]
		if !ok {
			return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
		}
N
neza2017 已提交
981 982 983
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
S
sunby 已提交
984
		return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
985 986 987 988 989 990 991
	}

	for _, field := range collMeta.Schema.Fields {
		if field.Name == fieldName {
			return *field, nil
		}
	}
S
sunby 已提交
992
	return schemapb.FieldSchema{}, fmt.Errorf("collection %s doesn't have filed %s", collName, fieldName)
N
neza2017 已提交
993 994
}

995 996
// IsSegmentIndexed check if segment has index
func (mt *MetaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
N
neza2017 已提交
997 998
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
N
neza2017 已提交
999 1000 1001
	return mt.unlockIsSegmentIndexed(segID, fieldSchema, indexParams)
}

1002
func (mt *MetaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
N
neza2017 已提交
1003 1004 1005 1006 1007
	segIdx, ok := mt.segID2IndexMeta[segID]
	if !ok {
		return false
	}
	exist := false
1008
	for idxID, meta := range segIdx {
N
neza2017 已提交
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
		if meta.FieldID != fieldSchema.FieldID {
			continue
		}
		idxMeta, ok := mt.indexID2Meta[idxID]
		if !ok {
			continue
		}
		if EqualKeyPairArray(indexParams, idxMeta.IndexParams) {
			exist = true
			break
		}
	}
	return exist
}

1024 1025
// GetNotIndexedSegments return segment ids which have no index
func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID, ts typeutil.Timestamp) ([]typeutil.UniqueID, schemapb.FieldSchema, error) {
N
neza2017 已提交
1026 1027 1028
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

N
neza2017 已提交
1029 1030 1031
	if idxInfo.IndexParams == nil {
		return nil, schemapb.FieldSchema{}, fmt.Errorf("index param is nil")
	}
N
neza2017 已提交
1032 1033
	collID, ok := mt.collName2ID[collName]
	if !ok {
Y
Yusup 已提交
1034 1035 1036 1037
		collID, ok = mt.collAlias2ID[collName]
		if !ok {
			return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
		}
N
neza2017 已提交
1038 1039 1040
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
S
sunby 已提交
1041
		return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
1042
	}
N
neza2017 已提交
1043
	fieldSchema, err := mt.unlockGetFieldSchema(collName, fieldName)
N
neza2017 已提交
1044 1045 1046 1047
	if err != nil {
		return nil, fieldSchema, err
	}

N
neza2017 已提交
1048 1049
	var dupIdx typeutil.UniqueID = 0
	for _, f := range collMeta.FieldIndexes {
1050 1051 1052 1053
		if info, ok := mt.indexID2Meta[f.IndexID]; ok {
			if info.IndexName == idxInfo.IndexName {
				dupIdx = info.IndexID
				break
N
neza2017 已提交
1054 1055 1056
			}
		}
	}
N
neza2017 已提交
1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072

	exist := false
	var existInfo pb.IndexInfo
	for _, f := range collMeta.FieldIndexes {
		if f.FiledID == fieldSchema.FieldID {
			existInfo, ok = mt.indexID2Meta[f.IndexID]
			if !ok {
				return nil, schemapb.FieldSchema{}, fmt.Errorf("index id = %d not found", f.IndexID)
			}
			if EqualKeyPairArray(existInfo.IndexParams, idxInfo.IndexParams) {
				exist = true
				break
			}
		}
	}
	if !exist {
N
neza2017 已提交
1073
		idx := &pb.FieldIndexInfo{
N
neza2017 已提交
1074 1075
			FiledID: fieldSchema.FieldID,
			IndexID: idxInfo.IndexID,
N
neza2017 已提交
1076 1077
		}
		collMeta.FieldIndexes = append(collMeta.FieldIndexes, idx)
N
neza2017 已提交
1078 1079
		mt.collID2Meta[collMeta.ID] = collMeta
		k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(collMeta.ID, 10))
1080 1081 1082 1083 1084 1085
		v1, err := proto.Marshal(&collMeta)
		if err != nil {
			log.Error("MetaTable GetNotIndexedSegments Marshal collMeta fail",
				zap.String("key", k1), zap.Error(err))
			return nil, schemapb.FieldSchema{}, fmt.Errorf("MetaTable GetNotIndexedSegments Marshal collMeta fail key:%s, err:%w", k1, err)
		}
N
neza2017 已提交
1086

N
neza2017 已提交
1087 1088
		mt.indexID2Meta[idx.IndexID] = *idxInfo
		k2 := path.Join(IndexMetaPrefix, strconv.FormatInt(idx.IndexID, 10))
1089 1090 1091 1092 1093 1094
		v2, err := proto.Marshal(idxInfo)
		if err != nil {
			log.Error("MetaTable GetNotIndexedSegments Marshal idxInfo fail",
				zap.String("key", k2), zap.Error(err))
			return nil, schemapb.FieldSchema{}, fmt.Errorf("MetaTable GetNotIndexedSegments Marshal idxInfo fail key:%s, err:%w", k2, err)
		}
1095
		meta := map[string]string{k1: string(v1), k2: string(v2)}
N
neza2017 已提交
1096

N
neza2017 已提交
1097 1098 1099 1100 1101
		if dupIdx != 0 {
			dupInfo := mt.indexID2Meta[dupIdx]
			dupInfo.IndexName = dupInfo.IndexName + "_bak"
			mt.indexID2Meta[dupIdx] = dupInfo
			k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
1102 1103 1104 1105 1106 1107
			v, err := proto.Marshal(&dupInfo)
			if err != nil {
				log.Error("MetaTable GetNotIndexedSegments Marshal dupInfo fail",
					zap.String("key", k), zap.Error(err))
				return nil, schemapb.FieldSchema{}, fmt.Errorf("MetaTable GetNotIndexedSegments Marshal dupInfo fail key:%s, err:%w", k, err)
			}
1108
			meta[k] = string(v)
N
neza2017 已提交
1109
		}
C
congqixia 已提交
1110
		err = mt.client.MultiSave(meta, ts)
N
neza2017 已提交
1111
		if err != nil {
1112 1113
			log.Error("SnapShotKV MultiSave fail", zap.Error(err))
			panic("SnapShotKV MultiSave fail")
N
neza2017 已提交
1114
		}
N
neza2017 已提交
1115 1116 1117 1118 1119 1120
	} else {
		idxInfo.IndexID = existInfo.IndexID
		if existInfo.IndexName != idxInfo.IndexName { //replace index name
			existInfo.IndexName = idxInfo.IndexName
			mt.indexID2Meta[existInfo.IndexID] = existInfo
			k := path.Join(IndexMetaPrefix, strconv.FormatInt(existInfo.IndexID, 10))
1121 1122 1123 1124 1125 1126
			v, err := proto.Marshal(&existInfo)
			if err != nil {
				log.Error("MetaTable GetNotIndexedSegments Marshal existInfo fail",
					zap.String("key", k), zap.Error(err))
				return nil, schemapb.FieldSchema{}, fmt.Errorf("MetaTable GetNotIndexedSegments Marshal existInfo fail key:%s, err:%w", k, err)
			}
1127
			meta := map[string]string{k: string(v)}
N
neza2017 已提交
1128 1129 1130 1131 1132
			if dupIdx != 0 {
				dupInfo := mt.indexID2Meta[dupIdx]
				dupInfo.IndexName = dupInfo.IndexName + "_bak"
				mt.indexID2Meta[dupIdx] = dupInfo
				k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
1133 1134 1135 1136 1137 1138
				v, err := proto.Marshal(&dupInfo)
				if err != nil {
					log.Error("MetaTable GetNotIndexedSegments Marshal dupInfo fail",
						zap.String("key", k), zap.Error(err))
					return nil, schemapb.FieldSchema{}, fmt.Errorf("MetaTable GetNotIndexedSegments Marshal dupInfo fail key:%s, err:%w", k, err)
				}
1139
				meta[k] = string(v)
N
neza2017 已提交
1140 1141
			}

C
congqixia 已提交
1142
			err = mt.client.MultiSave(meta, ts)
N
neza2017 已提交
1143
			if err != nil {
1144 1145
				log.Error("SnapShotKV MultiSave fail", zap.Error(err))
				panic("SnapShotKV MultiSave fail")
N
neza2017 已提交
1146 1147
			}
		}
N
neza2017 已提交
1148 1149
	}

N
neza2017 已提交
1150
	rstID := make([]typeutil.UniqueID, 0, 16)
1151 1152 1153
	for _, segID := range segIDs {
		if exist := mt.unlockIsSegmentIndexed(segID, &fieldSchema, idxInfo.IndexParams); !exist {
			rstID = append(rstID, segID)
N
neza2017 已提交
1154 1155 1156 1157 1158
		}
	}
	return rstID, fieldSchema, nil
}

1159 1160
// GetIndexByName return index info by index name
func (mt *MetaTable) GetIndexByName(collName, indexName string) (pb.CollectionInfo, []pb.IndexInfo, error) {
N
neza2017 已提交
1161
	mt.ddLock.RLock()
S
sunby 已提交
1162
	defer mt.ddLock.RUnlock()
N
neza2017 已提交
1163 1164 1165

	collID, ok := mt.collName2ID[collName]
	if !ok {
Y
Yusup 已提交
1166 1167 1168 1169
		collID, ok = mt.collAlias2ID[collName]
		if !ok {
			return pb.CollectionInfo{}, nil, fmt.Errorf("collection %s not found", collName)
		}
N
neza2017 已提交
1170 1171 1172
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
1173
		return pb.CollectionInfo{}, nil, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
1174 1175
	}

N
neza2017 已提交
1176
	rstIndex := make([]pb.IndexInfo, 0, len(collMeta.FieldIndexes))
Z
zhenshan.cao 已提交
1177
	for _, idx := range collMeta.FieldIndexes {
1178 1179
		idxInfo, ok := mt.indexID2Meta[idx.IndexID]
		if !ok {
1180
			return pb.CollectionInfo{}, nil, fmt.Errorf("index id = %d not found", idx.IndexID)
1181 1182 1183
		}
		if indexName == "" || idxInfo.IndexName == indexName {
			rstIndex = append(rstIndex, idxInfo)
N
neza2017 已提交
1184 1185
		}
	}
1186
	return collMeta, rstIndex, nil
N
neza2017 已提交
1187
}
B
bigsheeper 已提交
1188

1189 1190
// GetIndexByID return index info by index id
func (mt *MetaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error) {
B
bigsheeper 已提交
1191
	mt.ddLock.RLock()
S
sunby 已提交
1192
	defer mt.ddLock.RUnlock()
B
bigsheeper 已提交
1193 1194 1195

	indexInfo, ok := mt.indexID2Meta[indexID]
	if !ok {
S
sunby 已提交
1196
		return nil, fmt.Errorf("cannot find index, id = %d", indexID)
B
bigsheeper 已提交
1197 1198 1199
	}
	return &indexInfo, nil
}
1200

1201
func (mt *MetaTable) dupMeta() (
1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225
	map[typeutil.UniqueID]pb.CollectionInfo,
	map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo,
	map[typeutil.UniqueID]pb.IndexInfo,
) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	collID2Meta := map[typeutil.UniqueID]pb.CollectionInfo{}
	segID2IndexMeta := map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo{}
	indexID2Meta := map[typeutil.UniqueID]pb.IndexInfo{}
	for k, v := range mt.collID2Meta {
		collID2Meta[k] = v
	}
	for k, v := range mt.segID2IndexMeta {
		segID2IndexMeta[k] = map[typeutil.UniqueID]pb.SegmentIndexInfo{}
		for k2, v2 := range v {
			segID2IndexMeta[k][k2] = v2
		}
	}
	for k, v := range mt.indexID2Meta {
		indexID2Meta[k] = v
	}
	return collID2Meta, segID2IndexMeta, indexID2Meta
}
Y
Yusup 已提交
1226

1227 1228
// AddAlias add collection alias
func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string,
Y
Yusup 已提交
1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249
	ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	if _, ok := mt.collAlias2ID[collectionAlias]; ok {
		return fmt.Errorf("duplicate collection alias, alias = %s", collectionAlias)
	}

	if _, ok := mt.collName2ID[collectionAlias]; ok {
		return fmt.Errorf("collection alias collides with existing collection name. collection = %s, alias = %s", collectionAlias, collectionAlias)
	}

	id, ok := mt.collName2ID[collectionName]
	if !ok {
		return fmt.Errorf("aliased collection name does not exist, name = %s", collectionName)
	}
	mt.collAlias2ID[collectionAlias] = id

	meta := make(map[string]string)
	addition := mt.getAdditionKV(ddOpStr, meta)
	saveAlias := func(ts typeutil.Timestamp) (string, string, error) {
		k1 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
1250 1251 1252 1253 1254 1255
		v1, err := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
		if err != nil {
			log.Error("MetaTable AddAlias saveAlias Marshal CollectionInfo fail",
				zap.String("key", k1), zap.Error(err))
			return "", "", fmt.Errorf("MetaTable AddAlias saveAlias Marshal CollectionInfo fail key:%s, err:%w", k1, err)
		}
1256 1257
		meta[k1] = string(v1)
		return k1, string(v1), nil
Y
Yusup 已提交
1258 1259 1260 1261 1262 1263 1264 1265 1266 1267
	}

	err := mt.client.MultiSave(meta, ts, addition, saveAlias)
	if err != nil {
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
	}
	return nil
}

1268 1269
// DeleteAlias delete collection alias
func (mt *MetaTable) DeleteAlias(collectionAlias string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
Y
Yusup 已提交
1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	if _, ok := mt.collAlias2ID[collectionAlias]; !ok {
		return fmt.Errorf("alias does not exist, alias = %s", collectionAlias)
	}
	delete(mt.collAlias2ID, collectionAlias)

	delMetakeys := []string{
		fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias),
	}
	meta := make(map[string]string)
	addition := mt.getAdditionKV(ddOpStr, meta)
	err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetakeys, ts, addition)
	if err != nil {
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
	}
	return nil
}

1290 1291
// AlterAlias alter collection alias
func (mt *MetaTable) AlterAlias(collectionAlias string, collectionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
Y
Yusup 已提交
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	if _, ok := mt.collAlias2ID[collectionAlias]; !ok {
		return fmt.Errorf("alias does not exist, alias = %s", collectionAlias)
	}

	id, ok := mt.collName2ID[collectionName]
	if !ok {
		return fmt.Errorf("aliased collection name does not exist, name = %s", collectionName)
	}
	mt.collAlias2ID[collectionAlias] = id
	meta := make(map[string]string)
	addition := mt.getAdditionKV(ddOpStr, meta)
	alterAlias := func(ts typeutil.Timestamp) (string, string, error) {
		k1 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
1307 1308 1309 1310 1311 1312
		v1, err := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
		if err != nil {
			log.Error("MetaTable AlterAlias alterAlias Marshal CollectionInfo fail",
				zap.String("key", k1), zap.Error(err))
			return "", "", fmt.Errorf("MetaTable AlterAlias alterAlias Marshal CollectionInfo fail key:%s, err:%w", k1, err)
		}
1313 1314
		meta[k1] = string(v1)
		return k1, string(v1), nil
Y
Yusup 已提交
1315 1316 1317 1318 1319 1320 1321 1322 1323
	}

	err := mt.client.MultiSave(meta, ts, addition, alterAlias)
	if err != nil {
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
	}
	return nil
}