meta_table.go 45.3 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package rootcoord
18 19

import (
20
	"bytes"
S
sunby 已提交
21
	"fmt"
Z
zhenshan.cao 已提交
22
	"path"
23 24 25 26
	"strconv"
	"sync"

	"github.com/golang/protobuf/proto"
B
bigsheeper 已提交
27 28
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
29 30 31 32
	"github.com/milvus-io/milvus/internal/kv"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
33 34
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
X
Xiangyu Wang 已提交
35
	"github.com/milvus-io/milvus/internal/proto/schemapb"
36
	"github.com/milvus-io/milvus/internal/util/typeutil"
37 38
)

Z
zhenshan.cao 已提交
39
const (
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
	// ComponentPrefix prefix for rootcoord component
	ComponentPrefix = "root-coord"

	// TenantMetaPrefix prefix for tenant meta
	TenantMetaPrefix = ComponentPrefix + "/tenant"

	// ProxyMetaPrefix prefix for proxy meta
	ProxyMetaPrefix = ComponentPrefix + "/proxy"

	// CollectionMetaPrefix prefix for collection meta
	CollectionMetaPrefix = ComponentPrefix + "/collection"

	// SegmentIndexMetaPrefix prefix for segment index meta
	SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index"

	// IndexMetaPrefix prefix for index meta
	IndexMetaPrefix = ComponentPrefix + "/index"

	// CollectionAliasMetaPrefix prefix for collection alias meta
Y
Yusup 已提交
59
	CollectionAliasMetaPrefix = ComponentPrefix + "/collection-alias"
60

61
	// TimestampPrefix prefix for timestamp
62 63
	TimestampPrefix = ComponentPrefix + "/timestamp"

64
	// DDOperationPrefix prefix for DD operation
65
	DDOperationPrefix = ComponentPrefix + "/dd-operation"
66

67 68 69 70
	// DDMsgSendPrefix prefix to indicate whether DD msg has been send
	DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send"

	// CreateCollectionDDType name of DD type for create collection
71
	CreateCollectionDDType = "CreateCollection"
72 73 74 75 76 77 78 79 80

	// DropCollectionDDType name of DD type for drop collection
	DropCollectionDDType = "DropCollection"

	// CreatePartitionDDType name of DD type for create partition
	CreatePartitionDDType = "CreatePartition"

	// DropPartitionDDType name of DD type for drop partition
	DropPartitionDDType = "DropPartition"
81 82 83 84 85 86

	// UserSubPrefix subpath for credential user
	UserSubPrefix = "/credential/users"

	// CredentialPrefix prefix for credential user
	CredentialPrefix = ComponentPrefix + UserSubPrefix
Z
zhenshan.cao 已提交
87 88
)

89 90
// MetaTable store all rootcoord meta info
type MetaTable struct {
91 92
	txn             kv.TxnKV                                                        // client of a reliable txnkv service, i.e. etcd client
	snapshot        kv.SnapShotKV                                                   // client of a reliable snapshotkv service, i.e. etcd client
93 94 95 96
	tenantID2Meta   map[typeutil.UniqueID]pb.TenantMeta                             // tenant id to tenant meta
	proxyID2Meta    map[typeutil.UniqueID]pb.ProxyMeta                              // proxy id to proxy meta
	collID2Meta     map[typeutil.UniqueID]pb.CollectionInfo                         // collection_id -> meta
	collName2ID     map[string]typeutil.UniqueID                                    // collection name to collection id
Y
Yusup 已提交
97
	collAlias2ID    map[string]typeutil.UniqueID                                    // collection alias to collection id
98
	partID2SegID    map[typeutil.UniqueID]map[typeutil.UniqueID]bool                // partition_id -> segment_id -> bool
99 100
	segID2IndexMeta map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo // collection_id/index_id/partition_id/segment_id -> meta
	indexID2Meta    map[typeutil.UniqueID]pb.IndexInfo                              // collection_id/index_id -> meta
101 102 103 104

	tenantLock sync.RWMutex
	proxyLock  sync.RWMutex
	ddLock     sync.RWMutex
105
	credLock   sync.RWMutex
106 107
}

C
cxytz01 已提交
108
// NewMetaTable creates meta table for rootcoord, which stores all in-memory information
C
Cai Yudong 已提交
109
// for collection, partition, segment, index etc.
110
func NewMetaTable(txn kv.TxnKV, snap kv.SnapShotKV) (*MetaTable, error) {
111
	mt := &MetaTable{
112 113
		txn:        txn,
		snapshot:   snap,
114 115 116
		tenantLock: sync.RWMutex{},
		proxyLock:  sync.RWMutex{},
		ddLock:     sync.RWMutex{},
117
		credLock:   sync.RWMutex{},
118 119 120 121 122 123 124 125
	}
	err := mt.reloadFromKV()
	if err != nil {
		return nil, err
	}
	return mt, nil
}

126
func (mt *MetaTable) reloadFromKV() error {
127 128
	mt.tenantID2Meta = make(map[typeutil.UniqueID]pb.TenantMeta)
	mt.proxyID2Meta = make(map[typeutil.UniqueID]pb.ProxyMeta)
Z
zhenshan.cao 已提交
129
	mt.collID2Meta = make(map[typeutil.UniqueID]pb.CollectionInfo)
130
	mt.collName2ID = make(map[string]typeutil.UniqueID)
Y
Yusup 已提交
131
	mt.collAlias2ID = make(map[string]typeutil.UniqueID)
132
	mt.partID2SegID = make(map[typeutil.UniqueID]map[typeutil.UniqueID]bool)
133
	mt.segID2IndexMeta = make(map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo)
Z
zhenshan.cao 已提交
134
	mt.indexID2Meta = make(map[typeutil.UniqueID]pb.IndexInfo)
135

136
	_, values, err := mt.snapshot.LoadWithPrefix(TenantMetaPrefix, 0)
137 138 139 140 141 142
	if err != nil {
		return err
	}

	for _, value := range values {
		tenantMeta := pb.TenantMeta{}
143
		err := proto.Unmarshal([]byte(value), &tenantMeta)
144
		if err != nil {
145
			return fmt.Errorf("rootcoord Unmarshal pb.TenantMeta err:%w", err)
146 147 148 149
		}
		mt.tenantID2Meta[tenantMeta.ID] = tenantMeta
	}

150
	_, values, err = mt.txn.LoadWithPrefix(ProxyMetaPrefix)
151 152 153 154 155
	if err != nil {
		return err
	}

	for _, value := range values {
156 157 158 159
		if bytes.Equal([]byte(value), suffixSnapshotTombstone) {
			// backward compatibility, IndexMeta used to be in SnapshotKV
			continue
		}
160
		proxyMeta := pb.ProxyMeta{}
161
		err = proto.Unmarshal([]byte(value), &proxyMeta)
162
		if err != nil {
163
			return fmt.Errorf("rootcoord Unmarshal pb.ProxyMeta err:%w", err)
164 165 166 167
		}
		mt.proxyID2Meta[proxyMeta.ID] = proxyMeta
	}

168
	_, values, err = mt.snapshot.LoadWithPrefix(CollectionMetaPrefix, 0)
169 170 171 172 173
	if err != nil {
		return err
	}

	for _, value := range values {
174
		collInfo := pb.CollectionInfo{}
175
		err = proto.Unmarshal([]byte(value), &collInfo)
176
		if err != nil {
177
			return fmt.Errorf("rootcoord Unmarshal pb.CollectionInfo err:%w", err)
178
		}
179 180
		mt.collID2Meta[collInfo.ID] = collInfo
		mt.collName2ID[collInfo.Schema.Name] = collInfo.ID
181 182
	}

183
	_, values, err = mt.txn.LoadWithPrefix(SegmentIndexMetaPrefix)
184 185 186
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
187
	for _, value := range values {
188 189 190 191
		if bytes.Equal([]byte(value), suffixSnapshotTombstone) {
			// backward compatibility, IndexMeta used to be in SnapshotKV
			continue
		}
Z
zhenshan.cao 已提交
192
		segmentIndexInfo := pb.SegmentIndexInfo{}
193
		err = proto.Unmarshal([]byte(value), &segmentIndexInfo)
194
		if err != nil {
195
			return fmt.Errorf("rootcoord Unmarshal pb.SegmentIndexInfo err:%w", err)
196
		}
197 198 199 200 201 202 203 204 205 206 207 208

		// update partID2SegID
		segIDMap, ok := mt.partID2SegID[segmentIndexInfo.PartitionID]
		if ok {
			segIDMap[segmentIndexInfo.SegmentID] = true
		} else {
			idMap := make(map[typeutil.UniqueID]bool)
			idMap[segmentIndexInfo.SegmentID] = true
			mt.partID2SegID[segmentIndexInfo.PartitionID] = idMap
		}

		// update segID2IndexMeta
Z
zhenshan.cao 已提交
209
		idx, ok := mt.segID2IndexMeta[segmentIndexInfo.SegmentID]
210
		if ok {
211
			idx[segmentIndexInfo.IndexID] = segmentIndexInfo
Z
zhenshan.cao 已提交
212 213 214
		} else {
			meta := make(map[typeutil.UniqueID]pb.SegmentIndexInfo)
			meta[segmentIndexInfo.IndexID] = segmentIndexInfo
215
			mt.segID2IndexMeta[segmentIndexInfo.SegmentID] = meta
216 217 218
		}
	}

219
	_, values, err = mt.txn.LoadWithPrefix(IndexMetaPrefix)
Z
zhenshan.cao 已提交
220 221
	if err != nil {
		return err
222
	}
Z
zhenshan.cao 已提交
223
	for _, value := range values {
224 225 226 227
		if bytes.Equal([]byte(value), suffixSnapshotTombstone) {
			// backward compatibility, IndexMeta used to be in SnapshotKV
			continue
		}
Z
zhenshan.cao 已提交
228
		meta := pb.IndexInfo{}
229
		err = proto.Unmarshal([]byte(value), &meta)
Z
zhenshan.cao 已提交
230
		if err != nil {
231
			return fmt.Errorf("rootcoord Unmarshal pb.IndexInfo err:%w", err)
232
		}
Z
zhenshan.cao 已提交
233
		mt.indexID2Meta[meta.IndexID] = meta
234 235
	}

236
	_, values, err = mt.snapshot.LoadWithPrefix(CollectionAliasMetaPrefix, 0)
Y
Yusup 已提交
237 238 239 240 241
	if err != nil {
		return err
	}
	for _, value := range values {
		aliasInfo := pb.CollectionInfo{}
242
		err = proto.Unmarshal([]byte(value), &aliasInfo)
Y
Yusup 已提交
243
		if err != nil {
244
			return fmt.Errorf("rootcoord Unmarshal pb.AliasInfo err:%w", err)
Y
Yusup 已提交
245 246 247 248
		}
		mt.collAlias2ID[aliasInfo.Schema.Name] = aliasInfo.ID
	}

249
	log.Debug("reload meta table from KV successfully")
Z
zhenshan.cao 已提交
250
	return nil
251 252
}

253 254
// AddTenant add tenant
func (mt *MetaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error {
N
neza2017 已提交
255 256 257 258
	mt.tenantLock.Lock()
	defer mt.tenantLock.Unlock()

	k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID)
259 260
	v, err := proto.Marshal(te)
	if err != nil {
261
		log.Error("Failed to marshal TenantMeta in AddTenant", zap.Error(err))
262 263
		return err
	}
N
neza2017 已提交
264

265
	err = mt.snapshot.Save(k, string(v), ts)
266
	if err != nil {
267
		log.Error("Failed to save TenantMeta in AddTenant", zap.Error(err))
268
		return err
N
neza2017 已提交
269 270
	}
	mt.tenantID2Meta[te.ID] = *te
271
	return nil
N
neza2017 已提交
272 273
}

274
// AddProxy add proxy
275
func (mt *MetaTable) AddProxy(po *pb.ProxyMeta) error {
N
neza2017 已提交
276 277 278 279
	mt.proxyLock.Lock()
	defer mt.proxyLock.Unlock()

	k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID)
280 281
	v, err := proto.Marshal(po)
	if err != nil {
282
		log.Error("Failed to marshal ProxyMeta in AddProxy", zap.Error(err))
283 284
		return err
	}
N
neza2017 已提交
285

286
	err = mt.txn.Save(k, string(v))
287
	if err != nil {
288 289
		log.Error("SnapShotKV Save fail", zap.Error(err))
		panic("SnapShotKV Save fail")
N
neza2017 已提交
290 291
	}
	mt.proxyID2Meta[po.ID] = *po
292
	return nil
N
neza2017 已提交
293 294
}

295
// AddCollection add collection
296
func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr string) error {
297 298
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
Z
zhenshan.cao 已提交
299

300 301
	if len(coll.PartitionIDs) != len(coll.PartitionNames) ||
		len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) ||
302
		(len(coll.PartitionIDs) != 1 && len(coll.PartitionIDs) != 0) {
303
		return fmt.Errorf("partition parameters' length mis-match when creating collection")
304
	}
305
	if _, ok := mt.collName2ID[coll.Schema.Name]; ok {
306
		return fmt.Errorf("collection %s exist", coll.Schema.Name)
307
	}
N
neza2017 已提交
308
	if len(coll.FieldIndexes) != len(idx) {
309
		return fmt.Errorf("incorrect index id when creating collection")
N
neza2017 已提交
310
	}
311

312 313 314 315 316 317
	coll.CreateTime = ts
	if len(coll.PartitionCreatedTimestamps) == 1 {
		coll.PartitionCreatedTimestamps[0] = ts
	}
	mt.collID2Meta[coll.ID] = *coll
	mt.collName2ID[coll.Schema.Name] = coll.ID
N
neza2017 已提交
318 319 320
	for _, i := range idx {
		mt.indexID2Meta[i.IndexID] = *i
	}
Z
zhenshan.cao 已提交
321

322 323 324 325 326
	k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID)
	v1, err := proto.Marshal(coll)
	if err != nil {
		log.Error("MetaTable AddCollection saveColl Marshal fail",
			zap.String("key", k1), zap.Error(err))
327
		return fmt.Errorf("metaTable AddCollection Marshal fail key:%s, err:%w", k1, err)
328 329
	}
	meta := map[string]string{k1: string(v1)}
Z
zhenshan.cao 已提交
330

N
neza2017 已提交
331
	for _, i := range idx {
N
neza2017 已提交
332
		k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID)
333 334 335 336
		v, err := proto.Marshal(i)
		if err != nil {
			log.Error("MetaTable AddCollection Marshal fail", zap.String("key", k),
				zap.String("IndexName", i.IndexName), zap.Error(err))
337
			return fmt.Errorf("metaTable AddCollection Marshal fail key:%s, err:%w", k, err)
338
		}
339
		meta[k] = string(v)
N
neza2017 已提交
340 341
	}

342
	// save ddOpStr into etcd
343 344
	meta[DDMsgSendPrefix] = "false"
	meta[DDOperationPrefix] = ddOpStr
345

346
	err = mt.snapshot.MultiSave(meta, ts)
347
	if err != nil {
348 349
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
350
	}
351

352
	return nil
353 354
}

355
// DeleteCollection delete collection
356
func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr string) error {
357 358 359 360 361
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
362
		return fmt.Errorf("can't find collection. id = %d", collID)
363 364
	}

Z
zhenshan.cao 已提交
365 366
	delete(mt.collID2Meta, collID)
	delete(mt.collName2ID, collMeta.Schema.Name)
367 368 369 370 371 372 373

	// update segID2IndexMeta
	for partID := range collMeta.PartitionIDs {
		if segIDMap, ok := mt.partID2SegID[typeutil.UniqueID(partID)]; ok {
			for segID := range segIDMap {
				delete(mt.segID2IndexMeta, segID)
			}
Z
zhenshan.cao 已提交
374 375
		}
	}
376 377 378 379 380 381

	// update partID2SegID
	for partID := range collMeta.PartitionIDs {
		delete(mt.partID2SegID, typeutil.UniqueID(partID))
	}

N
neza2017 已提交
382 383 384 385 386 387 388 389
	for _, idxInfo := range collMeta.FieldIndexes {
		_, ok := mt.indexID2Meta[idxInfo.IndexID]
		if !ok {
			log.Warn("index id not exist", zap.Int64("index id", idxInfo.IndexID))
			continue
		}
		delete(mt.indexID2Meta, idxInfo.IndexID)
	}
Y
Yusup 已提交
390 391 392 393 394 395 396
	var aliases []string
	// delete collection aliases
	for alias, cid := range mt.collAlias2ID {
		if cid == collID {
			aliases = append(aliases, alias)
		}
	}
397

398
	delMetakeysSnap := []string{
N
neza2017 已提交
399
		fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID),
400 401
	}
	delMetaKeysTxn := []string{
N
neza2017 已提交
402 403 404
		fmt.Sprintf("%s/%d", SegmentIndexMetaPrefix, collID),
		fmt.Sprintf("%s/%d", IndexMetaPrefix, collID),
	}
405

Y
Yusup 已提交
406 407
	for _, alias := range aliases {
		delete(mt.collAlias2ID, alias)
408
		delMetakeysSnap = append(delMetakeysSnap,
Y
Yusup 已提交
409 410 411 412
			fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias),
		)
	}

413
	// save ddOpStr into etcd
414 415 416 417 418
	var saveMeta = map[string]string{
		DDMsgSendPrefix:   "false",
		DDOperationPrefix: ddOpStr,
	}

419
	err := mt.snapshot.MultiSaveAndRemoveWithPrefix(map[string]string{}, delMetakeysSnap, ts)
420
	if err != nil {
421 422
		log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
423
	}
424 425 426 427 428
	err = mt.txn.MultiSaveAndRemoveWithPrefix(saveMeta, delMetaKeysTxn)
	if err != nil {
		log.Warn("TxnKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		//Txn kv fail will no panic here, treated as garbage
	}
429

430
	return nil
431 432
}

433 434
// HasCollection return collection existence
func (mt *MetaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool {
435 436
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
437 438 439 440 441
	if ts == 0 {
		_, ok := mt.collID2Meta[collID]
		return ok
	}
	key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
442
	_, err := mt.snapshot.Load(key, ts)
443
	return err == nil
444 445
}

446 447
// GetCollectionByID return collection meta by collection id
func (mt *MetaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
N
neza2017 已提交
448 449 450
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

451 452 453 454 455 456 457
	if ts == 0 {
		col, ok := mt.collID2Meta[collectionID]
		if !ok {
			return nil, fmt.Errorf("can't find collection id : %d", collectionID)
		}
		colCopy := proto.Clone(&col)
		return colCopy.(*pb.CollectionInfo), nil
N
neza2017 已提交
458
	}
459
	key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID)
460
	val, err := mt.snapshot.Load(key, ts)
461 462 463 464
	if err != nil {
		return nil, err
	}
	colMeta := pb.CollectionInfo{}
465
	err = proto.Unmarshal([]byte(val), &colMeta)
466 467 468 469
	if err != nil {
		return nil, err
	}
	return &colMeta, nil
N
neza2017 已提交
470 471
}

472 473
// GetCollectionByName return collection meta by collection name
func (mt *MetaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
474 475 476
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

477 478 479
	if ts == 0 {
		vid, ok := mt.collName2ID[collectionName]
		if !ok {
Y
Yusup 已提交
480 481 482
			if vid, ok = mt.collAlias2ID[collectionName]; !ok {
				return nil, fmt.Errorf("can't find collection: " + collectionName)
			}
483 484 485
		}
		col, ok := mt.collID2Meta[vid]
		if !ok {
S
sunby 已提交
486
			return nil, fmt.Errorf("can't find collection %s with id %d", collectionName, vid)
487 488 489
		}
		colCopy := proto.Clone(&col)
		return colCopy.(*pb.CollectionInfo), nil
N
neza2017 已提交
490
	}
491
	_, vals, err := mt.snapshot.LoadWithPrefix(CollectionMetaPrefix, ts)
492 493
	if err != nil {
		return nil, err
N
neza2017 已提交
494
	}
495 496
	for _, val := range vals {
		collMeta := pb.CollectionInfo{}
497
		err = proto.Unmarshal([]byte(val), &collMeta)
498 499 500 501 502 503 504 505 506
		if err != nil {
			log.Debug("unmarshal collection info failed", zap.Error(err))
			continue
		}
		if collMeta.Schema.Name == collectionName {
			return &collMeta, nil
		}
	}
	return nil, fmt.Errorf("can't find collection: %s, at timestamp = %d", collectionName, ts)
N
neza2017 已提交
507 508
}

509 510
// ListCollections list all collection names
func (mt *MetaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.CollectionInfo, error) {
511 512
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
513
	colls := make(map[string]*pb.CollectionInfo)
514

515
	if ts == 0 {
516 517 518 519
		for collName, collID := range mt.collName2ID {
			coll := mt.collID2Meta[collID]
			colCopy := proto.Clone(&coll)
			colls[collName] = colCopy.(*pb.CollectionInfo)
N
neza2017 已提交
520 521
		}
		return colls, nil
522
	}
523
	_, vals, err := mt.snapshot.LoadWithPrefix(CollectionMetaPrefix, ts)
524
	if err != nil {
N
neza2017 已提交
525
		log.Debug("load with prefix error", zap.Uint64("timestamp", ts), zap.Error(err))
526
		return nil, nil
527 528 529
	}
	for _, val := range vals {
		collMeta := pb.CollectionInfo{}
530
		err := proto.Unmarshal([]byte(val), &collMeta)
531 532 533
		if err != nil {
			log.Debug("unmarshal collection info failed", zap.Error(err))
		}
534
		colls[collMeta.Schema.Name] = &collMeta
535 536 537 538
	}
	return colls, nil
}

539 540
// ListAliases list all collection aliases
func (mt *MetaTable) ListAliases(collID typeutil.UniqueID) []string {
541 542 543 544 545 546 547 548 549 550 551
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	var aliases []string
	for alias, cid := range mt.collAlias2ID {
		if cid == collID {
			aliases = append(aliases, alias)
		}
	}
	return aliases
}

552
// ListCollectionVirtualChannels list virtual channels of all collections
553
func (mt *MetaTable) ListCollectionVirtualChannels() map[typeutil.UniqueID][]string {
554 555
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
556
	chanMap := make(map[typeutil.UniqueID][]string)
557

558 559
	for id, collInfo := range mt.collID2Meta {
		chanMap[id] = collInfo.VirtualChannelNames
560
	}
561
	return chanMap
562 563
}

564
// ListCollectionPhysicalChannels list physical channels of all collections
565
func (mt *MetaTable) ListCollectionPhysicalChannels() map[typeutil.UniqueID][]string {
566 567
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
568
	chanMap := make(map[typeutil.UniqueID][]string)
569

570 571
	for id, collInfo := range mt.collID2Meta {
		chanMap[id] = collInfo.PhysicalChannelNames
572
	}
573
	return chanMap
574 575
}

576
// AddPartition add partition
577
func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr string) error {
578 579 580 581
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	coll, ok := mt.collID2Meta[collID]
	if !ok {
582
		return fmt.Errorf("can't find collection. id = %d", collID)
583 584 585
	}

	// number of partition tags (except _default) should be limited to 4096 by default
586 587
	if int64(len(coll.PartitionIDs)) >= Params.RootCoordCfg.MaxPartitionNum {
		return fmt.Errorf("maximum partition's number should be limit to %d", Params.RootCoordCfg.MaxPartitionNum)
588
	}
589

590
	if len(coll.PartitionIDs) != len(coll.PartitionNames) {
591
		return fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionNames)=%d", len(coll.PartitionIDs), len(coll.PartitionNames))
592 593
	}

594
	if len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) {
595
		return fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionIDs), len(coll.PartitionCreatedTimestamps))
596 597 598
	}

	if len(coll.PartitionNames) != len(coll.PartitionCreatedTimestamps) {
599
		return fmt.Errorf("len(coll.PartitionNames)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionNames), len(coll.PartitionCreatedTimestamps))
600 601
	}

602 603
	for idx := range coll.PartitionIDs {
		if coll.PartitionIDs[idx] == partitionID {
604
			return fmt.Errorf("partition id = %d already exists", partitionID)
Z
zhenshan.cao 已提交
605
		}
606
		if coll.PartitionNames[idx] == partitionName {
607
			return fmt.Errorf("partition name = %s already exists", partitionName)
608
		}
609
		// no necessary to check created timestamp
610
	}
Z
zhenshan.cao 已提交
611

612 613 614 615
	coll.PartitionIDs = append(coll.PartitionIDs, partitionID)
	coll.PartitionNames = append(coll.PartitionNames, partitionName)
	coll.PartitionCreatedTimestamps = append(coll.PartitionCreatedTimestamps, ts)
	mt.collID2Meta[collID] = coll
616

617 618 619 620 621
	k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	v1, err := proto.Marshal(&coll)
	if err != nil {
		log.Error("MetaTable AddPartition saveColl Marshal fail",
			zap.String("key", k1), zap.Error(err))
622
		return fmt.Errorf("metaTable AddPartition Marshal fail, k1:%s, err:%w", k1, err)
623
	}
624
	meta := map[string]string{k1: string(v1)}
625
	metaTxn := map[string]string{}
626
	// save ddOpStr into etcd
627 628
	metaTxn[DDMsgSendPrefix] = "false"
	metaTxn[DDOperationPrefix] = ddOpStr
629

630
	err = mt.snapshot.MultiSave(meta, ts)
631
	if err != nil {
632 633
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
634
	}
635 636 637 638 639
	err = mt.txn.MultiSave(metaTxn)
	if err != nil {
		// will not panic, missing create msg
		log.Warn("TxnKV MultiSave fail", zap.Error(err))
	}
640
	return nil
641 642
}

643 644
// GetPartitionNameByID return partition name by partition id
func (mt *MetaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (string, error) {
645
	if ts == 0 {
646 647
		mt.ddLock.RLock()
		defer mt.ddLock.RUnlock()
648 649
		collMeta, ok := mt.collID2Meta[collID]
		if !ok {
650
			return "", fmt.Errorf("can't find collection id = %d", collID)
651
		}
652 653
		for idx := range collMeta.PartitionIDs {
			if collMeta.PartitionIDs[idx] == partitionID {
654
				return collMeta.PartitionNames[idx], nil
655 656
			}
		}
657
		return "", fmt.Errorf("partition %d does not exist", partitionID)
658 659
	}
	collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
660
	collVal, err := mt.snapshot.Load(collKey, ts)
661
	if err != nil {
662
		return "", err
663
	}
664
	collMeta := pb.CollectionInfo{}
665
	err = proto.Unmarshal([]byte(collVal), &collMeta)
666
	if err != nil {
667
		return "", err
668
	}
669 670
	for idx := range collMeta.PartitionIDs {
		if collMeta.PartitionIDs[idx] == partitionID {
671
			return collMeta.PartitionNames[idx], nil
672
		}
673 674 675 676
	}
	return "", fmt.Errorf("partition %d does not exist", partitionID)
}

677
func (mt *MetaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
678 679 680 681
	if ts == 0 {
		collMeta, ok := mt.collID2Meta[collID]
		if !ok {
			return 0, fmt.Errorf("can't find collection id = %d", collID)
682
		}
683
		for idx := range collMeta.PartitionIDs {
684
			if collMeta.PartitionNames[idx] == partitionName {
685 686 687 688 689 690
				return collMeta.PartitionIDs[idx], nil
			}
		}
		return 0, fmt.Errorf("partition %s does not exist", partitionName)
	}
	collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
691
	collVal, err := mt.snapshot.Load(collKey, ts)
692 693 694 695
	if err != nil {
		return 0, err
	}
	collMeta := pb.CollectionInfo{}
696
	err = proto.Unmarshal([]byte(collVal), &collMeta)
697 698 699 700
	if err != nil {
		return 0, err
	}
	for idx := range collMeta.PartitionIDs {
701
		if collMeta.PartitionNames[idx] == partitionName {
702
			return collMeta.PartitionIDs[idx], nil
703 704
		}
	}
705
	return 0, fmt.Errorf("partition %s does not exist", partitionName)
706 707
}

708 709
// GetPartitionByName return partition id by partition name
func (mt *MetaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
710 711
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
712
	return mt.getPartitionByName(collID, partitionName, ts)
713 714
}

715 716
// HasPartition check partition existence
func (mt *MetaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool {
717 718
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
719
	_, err := mt.getPartitionByName(collID, partitionName, ts)
720 721 722
	return err == nil
}

723
// DeletePartition delete partition
724
func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr string) (typeutil.UniqueID, error) {
725 726 727
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

728
	if partitionName == Params.CommonCfg.DefaultPartitionName {
729
		return 0, fmt.Errorf("default partition cannot be deleted")
730 731 732 733
	}

	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
734
		return 0, fmt.Errorf("can't find collection id = %d", collID)
735 736 737 738 739 740
	}

	// check tag exists
	exist := false

	pd := make([]typeutil.UniqueID, 0, len(collMeta.PartitionIDs))
741
	pn := make([]string, 0, len(collMeta.PartitionNames))
742
	pts := make([]uint64, 0, len(collMeta.PartitionCreatedTimestamps))
743 744
	var partID typeutil.UniqueID
	for idx := range collMeta.PartitionIDs {
745
		if collMeta.PartitionNames[idx] == partitionName {
746 747 748 749
			partID = collMeta.PartitionIDs[idx]
			exist = true
		} else {
			pd = append(pd, collMeta.PartitionIDs[idx])
750
			pn = append(pn, collMeta.PartitionNames[idx])
751
			pts = append(pts, collMeta.PartitionCreatedTimestamps[idx])
752 753 754
		}
	}
	if !exist {
755
		return 0, fmt.Errorf("partition %s does not exist", partitionName)
756
	}
Z
zhenshan.cao 已提交
757
	collMeta.PartitionIDs = pd
758
	collMeta.PartitionNames = pn
759
	collMeta.PartitionCreatedTimestamps = pts
Z
zhenshan.cao 已提交
760
	mt.collID2Meta[collID] = collMeta
761

762 763 764 765
	// update segID2IndexMeta and partID2SegID
	if segIDMap, ok := mt.partID2SegID[partID]; ok {
		for segID := range segIDMap {
			delete(mt.segID2IndexMeta, segID)
766 767
		}
	}
768 769
	delete(mt.partID2SegID, partID)

770
	k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10))
771 772 773 774
	v, err := proto.Marshal(&collMeta)
	if err != nil {
		log.Error("MetaTable DeletePartition Marshal collectionMeta fail",
			zap.String("key", k), zap.Error(err))
775
		return 0, fmt.Errorf("metaTable DeletePartition Marshal collectionMeta fail key:%s, err:%w", k, err)
776 777
	}
	var delMetaKeys []string
N
neza2017 已提交
778
	for _, idxInfo := range collMeta.FieldIndexes {
779
		k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partID)
N
neza2017 已提交
780 781 782
		delMetaKeys = append(delMetaKeys, k)
	}

783
	metaTxn := make(map[string]string)
784
	// save ddOpStr into etcd
785 786
	metaTxn[DDMsgSendPrefix] = "false"
	metaTxn[DDOperationPrefix] = ddOpStr
787

788
	err = mt.snapshot.Save(k, string(v), ts)
789
	if err != nil {
790 791
		log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
792
	}
793 794 795 796 797 798
	err = mt.txn.MultiSaveAndRemoveWithPrefix(metaTxn, delMetaKeys)
	if err != nil {
		log.Warn("TxnKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		// will not panic, failed txn shall be treated by garbage related logic
	}

799
	return partID, nil
800 801
}

802
// AddIndex add index
803
func (mt *MetaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) error {
804 805
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
806

807
	collMeta, ok := mt.collID2Meta[segIdxInfo.CollectionID]
808
	if !ok {
809
		return fmt.Errorf("collection id = %d not found", segIdxInfo.CollectionID)
810
	}
811 812 813 814 815 816
	exist := false
	for _, fidx := range collMeta.FieldIndexes {
		if fidx.IndexID == segIdxInfo.IndexID {
			exist = true
			break
		}
817
	}
818
	if !exist {
819
		return fmt.Errorf("index id = %d not found", segIdxInfo.IndexID)
820
	}
821

822 823 824 825
	segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID]
	if !ok {
		idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{segIdxInfo.IndexID: *segIdxInfo}
		mt.segID2IndexMeta[segIdxInfo.SegmentID] = idxMap
826 827 828

		segIDMap := map[typeutil.UniqueID]bool{segIdxInfo.SegmentID: true}
		mt.partID2SegID[segIdxInfo.PartitionID] = segIDMap
829 830 831 832 833
	} else {
		tmpInfo, ok := segIdxMap[segIdxInfo.IndexID]
		if ok {
			if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) {
				if segIdxInfo.BuildID == tmpInfo.BuildID {
834
					log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID))
835
					return nil
836
				}
837
				return fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
838
			}
839 840 841
		}
	}

842
	mt.segID2IndexMeta[segIdxInfo.SegmentID][segIdxInfo.IndexID] = *segIdxInfo
843 844
	mt.partID2SegID[segIdxInfo.PartitionID][segIdxInfo.SegmentID] = true

845
	k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, segIdxInfo.CollectionID, segIdxInfo.IndexID, segIdxInfo.PartitionID, segIdxInfo.SegmentID)
846 847 848 849
	v, err := proto.Marshal(segIdxInfo)
	if err != nil {
		log.Error("MetaTable AddIndex Marshal segIdxInfo fail",
			zap.String("key", k), zap.Error(err))
850
		return fmt.Errorf("metaTable AddIndex Marshal segIdxInfo fail key:%s, err:%w", k, err)
851
	}
N
neza2017 已提交
852

853
	err = mt.txn.Save(k, string(v))
N
neza2017 已提交
854
	if err != nil {
855 856
		log.Error("SnapShotKV Save fail", zap.Error(err))
		panic("SnapShotKV Save fail")
N
neza2017 已提交
857
	}
858

859
	return nil
N
neza2017 已提交
860 861
}

862
// DropIndex drop index
863
func (mt *MetaTable) DropIndex(collName, fieldName, indexName string) (typeutil.UniqueID, bool, error) {
N
neza2017 已提交
864 865 866 867 868
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	collID, ok := mt.collName2ID[collName]
	if !ok {
Y
Yusup 已提交
869 870 871 872
		collID, ok = mt.collAlias2ID[collName]
		if !ok {
			return 0, false, fmt.Errorf("collection name = %s not exist", collName)
		}
N
neza2017 已提交
873 874 875
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
876
		return 0, false, fmt.Errorf("collection name  = %s not has meta", collName)
N
neza2017 已提交
877 878 879
	}
	fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName)
	if err != nil {
880
		return 0, false, err
N
neza2017 已提交
881 882 883 884 885 886 887 888 889 890 891
	}
	fieldIdxInfo := make([]*pb.FieldIndexInfo, 0, len(collMeta.FieldIndexes))
	var dropIdxID typeutil.UniqueID
	for i, info := range collMeta.FieldIndexes {
		if info.FiledID != fieldSch.FieldID {
			fieldIdxInfo = append(fieldIdxInfo, info)
			continue
		}
		idxMeta, ok := mt.indexID2Meta[info.IndexID]
		if !ok {
			fieldIdxInfo = append(fieldIdxInfo, info)
N
neza2017 已提交
892
			log.Warn("index id not has meta", zap.Int64("index id", info.IndexID))
N
neza2017 已提交
893 894 895 896 897 898 899 900 901 902 903
			continue
		}
		if idxMeta.IndexName != indexName {
			fieldIdxInfo = append(fieldIdxInfo, info)
			continue
		}
		dropIdxID = info.IndexID
		fieldIdxInfo = append(fieldIdxInfo, collMeta.FieldIndexes[i+1:]...)
		break
	}
	if len(fieldIdxInfo) == len(collMeta.FieldIndexes) {
N
neza2017 已提交
904
		log.Warn("drop index,index not found", zap.String("collection name", collName), zap.String("filed name", fieldName), zap.String("index name", indexName))
905
		return 0, false, nil
N
neza2017 已提交
906 907 908
	}
	collMeta.FieldIndexes = fieldIdxInfo
	mt.collID2Meta[collID] = collMeta
909
	k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10))
910 911 912 913
	v, err := proto.Marshal(&collMeta)
	if err != nil {
		log.Error("MetaTable DropIndex Marshal collMeta fail",
			zap.String("key", k), zap.Error(err))
914
		return 0, false, fmt.Errorf("metaTable DropIndex Marshal collMeta fail key:%s, err:%w", k, err)
915
	}
916
	saveMeta := map[string]string{k: string(v)}
N
neza2017 已提交
917 918 919

	delete(mt.indexID2Meta, dropIdxID)

920 921 922 923 924 925 926
	// update segID2IndexMeta
	for partID := range collMeta.PartitionIDs {
		if segIDMap, ok := mt.partID2SegID[typeutil.UniqueID(partID)]; ok {
			for segID := range segIDMap {
				if segIndexInfos, ok := mt.segID2IndexMeta[segID]; ok {
					delete(segIndexInfos, dropIdxID)
				}
N
neza2017 已提交
927 928 929
			}
		}
	}
930

N
neza2017 已提交
931 932 933 934
	delMeta := []string{
		fmt.Sprintf("%s/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, dropIdxID),
		fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID),
	}
N
neza2017 已提交
935

936
	err = mt.txn.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta)
N
neza2017 已提交
937
	if err != nil {
938 939
		log.Error("TxnKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		panic("TxnKV MultiSaveAndRemoveWithPrefix fail")
N
neza2017 已提交
940 941
	}

942
	return dropIdxID, true, nil
N
neza2017 已提交
943 944
}

945
// GetSegmentIndexInfoByID return segment index info by segment id
946
func (mt *MetaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, fieldID int64, idxName string) (pb.SegmentIndexInfo, error) {
N
neza2017 已提交
947 948 949 950 951
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	segIdxMap, ok := mt.segID2IndexMeta[segID]
	if !ok {
952 953
		return pb.SegmentIndexInfo{
			SegmentID:   segID,
954
			FieldID:     fieldID,
955 956 957 958
			IndexID:     0,
			BuildID:     0,
			EnableIndex: false,
		}, nil
N
neza2017 已提交
959
	}
960
	if len(segIdxMap) == 0 {
S
sunby 已提交
961
		return pb.SegmentIndexInfo{}, fmt.Errorf("segment id %d not has any index", segID)
N
neza2017 已提交
962 963
	}

964
	if fieldID == -1 && idxName == "" { // return default index
965
		for _, seg := range segIdxMap {
B
bigsheeper 已提交
966
			info, ok := mt.indexID2Meta[seg.IndexID]
967
			if ok && info.IndexName == Params.CommonCfg.DefaultIndexName {
B
bigsheeper 已提交
968 969
				return seg, nil
			}
N
neza2017 已提交
970 971
		}
	} else {
972
		for idxID, seg := range segIdxMap {
N
neza2017 已提交
973 974 975 976 977
			idxMeta, ok := mt.indexID2Meta[idxID]
			if ok {
				if idxMeta.IndexName != idxName {
					continue
				}
978
				if seg.FieldID != fieldID {
N
neza2017 已提交
979 980 981 982 983 984
					continue
				}
				return seg, nil
			}
		}
	}
985
	return pb.SegmentIndexInfo{}, fmt.Errorf("can't find index name = %s on segment = %d, with filed id = %d", idxName, segID, fieldID)
N
neza2017 已提交
986 987
}

988 989 990 991 992 993 994 995 996 997 998 999
func (mt *MetaTable) GetSegmentIndexInfos(segID typeutil.UniqueID) (map[typeutil.UniqueID]pb.SegmentIndexInfo, error) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	ret, ok := mt.segID2IndexMeta[segID]
	if !ok {
		return nil, fmt.Errorf("segment not found in meta, segment: %d", segID)
	}

	return ret, nil
}

1000 1001
// GetFieldSchema return field schema
func (mt *MetaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
N
neza2017 已提交
1002 1003 1004
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

N
neza2017 已提交
1005 1006 1007
	return mt.unlockGetFieldSchema(collName, fieldName)
}

1008
func (mt *MetaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
N
neza2017 已提交
1009 1010
	collID, ok := mt.collName2ID[collName]
	if !ok {
Y
Yusup 已提交
1011 1012 1013 1014
		collID, ok = mt.collAlias2ID[collName]
		if !ok {
			return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
		}
N
neza2017 已提交
1015 1016 1017
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
S
sunby 已提交
1018
		return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
1019 1020 1021 1022 1023 1024 1025
	}

	for _, field := range collMeta.Schema.Fields {
		if field.Name == fieldName {
			return *field, nil
		}
	}
S
sunby 已提交
1026
	return schemapb.FieldSchema{}, fmt.Errorf("collection %s doesn't have filed %s", collName, fieldName)
N
neza2017 已提交
1027 1028
}

1029
// IsSegmentIndexed check if segment has indexed
1030
func (mt *MetaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
N
neza2017 已提交
1031 1032
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
N
neza2017 已提交
1033 1034 1035
	return mt.unlockIsSegmentIndexed(segID, fieldSchema, indexParams)
}

1036
func (mt *MetaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
N
neza2017 已提交
1037 1038 1039 1040 1041
	segIdx, ok := mt.segID2IndexMeta[segID]
	if !ok {
		return false
	}
	exist := false
1042
	for idxID, meta := range segIdx {
N
neza2017 已提交
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057
		if meta.FieldID != fieldSchema.FieldID {
			continue
		}
		idxMeta, ok := mt.indexID2Meta[idxID]
		if !ok {
			continue
		}
		if EqualKeyPairArray(indexParams, idxMeta.IndexParams) {
			exist = true
			break
		}
	}
	return exist
}

1058
// GetNotIndexedSegments return segment ids which have no index
1059
func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID) ([]typeutil.UniqueID, schemapb.FieldSchema, error) {
N
neza2017 已提交
1060 1061 1062
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

N
neza2017 已提交
1063 1064 1065
	if idxInfo.IndexParams == nil {
		return nil, schemapb.FieldSchema{}, fmt.Errorf("index param is nil")
	}
N
neza2017 已提交
1066 1067
	collID, ok := mt.collName2ID[collName]
	if !ok {
Y
Yusup 已提交
1068 1069 1070 1071
		collID, ok = mt.collAlias2ID[collName]
		if !ok {
			return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
		}
N
neza2017 已提交
1072 1073 1074
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
S
sunby 已提交
1075
		return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
1076
	}
N
neza2017 已提交
1077
	fieldSchema, err := mt.unlockGetFieldSchema(collName, fieldName)
N
neza2017 已提交
1078 1079 1080 1081
	if err != nil {
		return nil, fieldSchema, err
	}

1082
	var dupIdx typeutil.UniqueID
N
neza2017 已提交
1083
	for _, f := range collMeta.FieldIndexes {
1084 1085 1086 1087
		if info, ok := mt.indexID2Meta[f.IndexID]; ok {
			if info.IndexName == idxInfo.IndexName {
				dupIdx = info.IndexID
				break
N
neza2017 已提交
1088 1089 1090
			}
		}
	}
N
neza2017 已提交
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106

	exist := false
	var existInfo pb.IndexInfo
	for _, f := range collMeta.FieldIndexes {
		if f.FiledID == fieldSchema.FieldID {
			existInfo, ok = mt.indexID2Meta[f.IndexID]
			if !ok {
				return nil, schemapb.FieldSchema{}, fmt.Errorf("index id = %d not found", f.IndexID)
			}
			if EqualKeyPairArray(existInfo.IndexParams, idxInfo.IndexParams) {
				exist = true
				break
			}
		}
	}
	if !exist {
N
neza2017 已提交
1107
		idx := &pb.FieldIndexInfo{
N
neza2017 已提交
1108 1109
			FiledID: fieldSchema.FieldID,
			IndexID: idxInfo.IndexID,
N
neza2017 已提交
1110 1111
		}
		collMeta.FieldIndexes = append(collMeta.FieldIndexes, idx)
N
neza2017 已提交
1112 1113
		mt.collID2Meta[collMeta.ID] = collMeta
		k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(collMeta.ID, 10))
1114 1115 1116 1117
		v1, err := proto.Marshal(&collMeta)
		if err != nil {
			log.Error("MetaTable GetNotIndexedSegments Marshal collMeta fail",
				zap.String("key", k1), zap.Error(err))
1118
			return nil, schemapb.FieldSchema{}, fmt.Errorf("metaTable GetNotIndexedSegments Marshal collMeta fail key:%s, err:%w", k1, err)
1119
		}
N
neza2017 已提交
1120

N
neza2017 已提交
1121 1122
		mt.indexID2Meta[idx.IndexID] = *idxInfo
		k2 := path.Join(IndexMetaPrefix, strconv.FormatInt(idx.IndexID, 10))
1123 1124 1125 1126
		v2, err := proto.Marshal(idxInfo)
		if err != nil {
			log.Error("MetaTable GetNotIndexedSegments Marshal idxInfo fail",
				zap.String("key", k2), zap.Error(err))
1127
			return nil, schemapb.FieldSchema{}, fmt.Errorf("metaTable GetNotIndexedSegments Marshal idxInfo fail key:%s, err:%w", k2, err)
1128
		}
1129
		meta := map[string]string{k1: string(v1), k2: string(v2)}
N
neza2017 已提交
1130

N
neza2017 已提交
1131 1132 1133 1134 1135
		if dupIdx != 0 {
			dupInfo := mt.indexID2Meta[dupIdx]
			dupInfo.IndexName = dupInfo.IndexName + "_bak"
			mt.indexID2Meta[dupIdx] = dupInfo
			k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
1136 1137 1138 1139
			v, err := proto.Marshal(&dupInfo)
			if err != nil {
				log.Error("MetaTable GetNotIndexedSegments Marshal dupInfo fail",
					zap.String("key", k), zap.Error(err))
1140
				return nil, schemapb.FieldSchema{}, fmt.Errorf("metaTable GetNotIndexedSegments Marshal dupInfo fail key:%s, err:%w", k, err)
1141
			}
1142
			meta[k] = string(v)
N
neza2017 已提交
1143
		}
1144
		err = mt.txn.MultiSave(meta)
N
neza2017 已提交
1145
		if err != nil {
1146 1147
			log.Error("TxnKV MultiSave fail", zap.Error(err))
			panic("TxnKV MultiSave fail")
N
neza2017 已提交
1148
		}
N
neza2017 已提交
1149 1150 1151 1152 1153 1154
	} else {
		idxInfo.IndexID = existInfo.IndexID
		if existInfo.IndexName != idxInfo.IndexName { //replace index name
			existInfo.IndexName = idxInfo.IndexName
			mt.indexID2Meta[existInfo.IndexID] = existInfo
			k := path.Join(IndexMetaPrefix, strconv.FormatInt(existInfo.IndexID, 10))
1155 1156 1157 1158
			v, err := proto.Marshal(&existInfo)
			if err != nil {
				log.Error("MetaTable GetNotIndexedSegments Marshal existInfo fail",
					zap.String("key", k), zap.Error(err))
1159
				return nil, schemapb.FieldSchema{}, fmt.Errorf("metaTable GetNotIndexedSegments Marshal existInfo fail key:%s, err:%w", k, err)
1160
			}
1161
			meta := map[string]string{k: string(v)}
N
neza2017 已提交
1162 1163 1164 1165 1166
			if dupIdx != 0 {
				dupInfo := mt.indexID2Meta[dupIdx]
				dupInfo.IndexName = dupInfo.IndexName + "_bak"
				mt.indexID2Meta[dupIdx] = dupInfo
				k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
1167 1168 1169 1170
				v, err := proto.Marshal(&dupInfo)
				if err != nil {
					log.Error("MetaTable GetNotIndexedSegments Marshal dupInfo fail",
						zap.String("key", k), zap.Error(err))
1171
					return nil, schemapb.FieldSchema{}, fmt.Errorf("metaTable GetNotIndexedSegments Marshal dupInfo fail key:%s, err:%w", k, err)
1172
				}
1173
				meta[k] = string(v)
N
neza2017 已提交
1174 1175
			}

1176
			err = mt.txn.MultiSave(meta)
N
neza2017 已提交
1177
			if err != nil {
1178 1179
				log.Error("SnapShotKV MultiSave fail", zap.Error(err))
				panic("SnapShotKV MultiSave fail")
N
neza2017 已提交
1180 1181
			}
		}
N
neza2017 已提交
1182 1183
	}

N
neza2017 已提交
1184
	rstID := make([]typeutil.UniqueID, 0, 16)
1185 1186 1187
	for _, segID := range segIDs {
		if exist := mt.unlockIsSegmentIndexed(segID, &fieldSchema, idxInfo.IndexParams); !exist {
			rstID = append(rstID, segID)
N
neza2017 已提交
1188 1189 1190 1191 1192
		}
	}
	return rstID, fieldSchema, nil
}

1193 1194
// GetIndexByName return index info by index name
func (mt *MetaTable) GetIndexByName(collName, indexName string) (pb.CollectionInfo, []pb.IndexInfo, error) {
N
neza2017 已提交
1195
	mt.ddLock.RLock()
S
sunby 已提交
1196
	defer mt.ddLock.RUnlock()
N
neza2017 已提交
1197 1198 1199

	collID, ok := mt.collName2ID[collName]
	if !ok {
Y
Yusup 已提交
1200 1201 1202 1203
		collID, ok = mt.collAlias2ID[collName]
		if !ok {
			return pb.CollectionInfo{}, nil, fmt.Errorf("collection %s not found", collName)
		}
N
neza2017 已提交
1204 1205 1206
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
1207
		return pb.CollectionInfo{}, nil, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
1208 1209
	}

N
neza2017 已提交
1210
	rstIndex := make([]pb.IndexInfo, 0, len(collMeta.FieldIndexes))
Z
zhenshan.cao 已提交
1211
	for _, idx := range collMeta.FieldIndexes {
1212 1213
		idxInfo, ok := mt.indexID2Meta[idx.IndexID]
		if !ok {
1214
			return pb.CollectionInfo{}, nil, fmt.Errorf("index id = %d not found", idx.IndexID)
1215 1216 1217
		}
		if indexName == "" || idxInfo.IndexName == indexName {
			rstIndex = append(rstIndex, idxInfo)
N
neza2017 已提交
1218 1219
		}
	}
1220
	return collMeta, rstIndex, nil
N
neza2017 已提交
1221
}
B
bigsheeper 已提交
1222

1223 1224
// GetIndexByID return index info by index id
func (mt *MetaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error) {
B
bigsheeper 已提交
1225
	mt.ddLock.RLock()
S
sunby 已提交
1226
	defer mt.ddLock.RUnlock()
B
bigsheeper 已提交
1227 1228 1229

	indexInfo, ok := mt.indexID2Meta[indexID]
	if !ok {
S
sunby 已提交
1230
		return nil, fmt.Errorf("cannot find index, id = %d", indexID)
B
bigsheeper 已提交
1231 1232 1233
	}
	return &indexInfo, nil
}
1234

1235
func (mt *MetaTable) dupMeta() (
1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259
	map[typeutil.UniqueID]pb.CollectionInfo,
	map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo,
	map[typeutil.UniqueID]pb.IndexInfo,
) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	collID2Meta := map[typeutil.UniqueID]pb.CollectionInfo{}
	segID2IndexMeta := map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo{}
	indexID2Meta := map[typeutil.UniqueID]pb.IndexInfo{}
	for k, v := range mt.collID2Meta {
		collID2Meta[k] = v
	}
	for k, v := range mt.segID2IndexMeta {
		segID2IndexMeta[k] = map[typeutil.UniqueID]pb.SegmentIndexInfo{}
		for k2, v2 := range v {
			segID2IndexMeta[k][k2] = v2
		}
	}
	for k, v := range mt.indexID2Meta {
		indexID2Meta[k] = v
	}
	return collID2Meta, segID2IndexMeta, indexID2Meta
}
Y
Yusup 已提交
1260

1261
// AddAlias add collection alias
1262
func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string, ts typeutil.Timestamp) error {
Y
Yusup 已提交
1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	if _, ok := mt.collAlias2ID[collectionAlias]; ok {
		return fmt.Errorf("duplicate collection alias, alias = %s", collectionAlias)
	}

	if _, ok := mt.collName2ID[collectionAlias]; ok {
		return fmt.Errorf("collection alias collides with existing collection name. collection = %s, alias = %s", collectionAlias, collectionAlias)
	}

	id, ok := mt.collName2ID[collectionName]
	if !ok {
		return fmt.Errorf("aliased collection name does not exist, name = %s", collectionName)
	}
	mt.collAlias2ID[collectionAlias] = id

1279 1280 1281 1282 1283
	k := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
	v, err := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
	if err != nil {
		log.Error("MetaTable AddAlias Marshal CollectionInfo fail",
			zap.String("key", k), zap.Error(err))
1284
		return fmt.Errorf("metaTable AddAlias Marshal CollectionInfo fail key:%s, err:%w", k, err)
Y
Yusup 已提交
1285 1286
	}

1287
	err = mt.snapshot.Save(k, string(v), ts)
Y
Yusup 已提交
1288
	if err != nil {
1289 1290
		log.Error("SnapShotKV Save fail", zap.Error(err))
		panic("SnapShotKV Save fail")
Y
Yusup 已提交
1291 1292 1293 1294
	}
	return nil
}

1295 1296
// DropAlias drop collection alias
func (mt *MetaTable) DropAlias(collectionAlias string, ts typeutil.Timestamp) error {
Y
Yusup 已提交
1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	if _, ok := mt.collAlias2ID[collectionAlias]; !ok {
		return fmt.Errorf("alias does not exist, alias = %s", collectionAlias)
	}
	delete(mt.collAlias2ID, collectionAlias)

	delMetakeys := []string{
		fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias),
	}
	meta := make(map[string]string)
1308
	err := mt.snapshot.MultiSaveAndRemoveWithPrefix(meta, delMetakeys, ts)
Y
Yusup 已提交
1309
	if err != nil {
1310 1311
		log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
Y
Yusup 已提交
1312 1313 1314 1315
	}
	return nil
}

1316
// AlterAlias alter collection alias
1317
func (mt *MetaTable) AlterAlias(collectionAlias string, collectionName string, ts typeutil.Timestamp) error {
Y
Yusup 已提交
1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	if _, ok := mt.collAlias2ID[collectionAlias]; !ok {
		return fmt.Errorf("alias does not exist, alias = %s", collectionAlias)
	}

	id, ok := mt.collName2ID[collectionName]
	if !ok {
		return fmt.Errorf("aliased collection name does not exist, name = %s", collectionName)
	}
	mt.collAlias2ID[collectionAlias] = id
1329 1330 1331 1332 1333 1334

	k := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
	v, err := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
	if err != nil {
		log.Error("MetaTable AlterAlias Marshal CollectionInfo fail",
			zap.String("key", k), zap.Error(err))
1335
		return fmt.Errorf("metaTable AlterAlias Marshal CollectionInfo fail key:%s, err:%w", k, err)
Y
Yusup 已提交
1336 1337
	}

1338
	err = mt.snapshot.Save(k, string(v), ts)
Y
Yusup 已提交
1339
	if err != nil {
1340 1341
		log.Error("SnapShotKV Save fail", zap.Error(err))
		panic("SnapShotKV Save fail")
Y
Yusup 已提交
1342 1343 1344
	}
	return nil
}
1345

1346
// IsAlias returns true if specific `collectionAlias` is an alias of collection.
1347 1348 1349 1350 1351 1352
func (mt *MetaTable) IsAlias(collectionAlias string) bool {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	_, ok := mt.collAlias2ID[collectionAlias]
	return ok
}
1353 1354 1355 1356 1357 1358 1359 1360 1361 1362

// AddCredential add credential
func (mt *MetaTable) AddCredential(credInfo *internalpb.CredentialInfo) error {
	mt.credLock.Lock()
	defer mt.credLock.Unlock()

	if credInfo.Username == "" {
		return fmt.Errorf("username is empty")
	}
	k := fmt.Sprintf("%s/%s", CredentialPrefix, credInfo.Username)
C
codeman 已提交
1363 1364 1365 1366 1367 1368
	v, err := proto.Marshal(&internalpb.CredentialInfo{EncryptedPassword: credInfo.EncryptedPassword})
	if err != nil {
		log.Error("MetaTable marshal credential info fail", zap.String("key", k), zap.Error(err))
		return fmt.Errorf("metaTable marshal credential info fail key:%s, err:%w", k, err)
	}
	err = mt.txn.Save(k, string(v))
1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386
	if err != nil {
		log.Error("MetaTable save fail", zap.Error(err))
		return fmt.Errorf("save credential fail key:%s, err:%w", credInfo.Username, err)
	}
	return nil
}

// GetCredential get credential by username
func (mt *MetaTable) getCredential(username string) (*internalpb.CredentialInfo, error) {
	mt.credLock.RLock()
	defer mt.credLock.RUnlock()

	k := fmt.Sprintf("%s/%s", CredentialPrefix, username)
	v, err := mt.txn.Load(k)
	if err != nil {
		log.Warn("MetaTable load fail", zap.String("key", k), zap.Error(err))
		return nil, err
	}
C
codeman 已提交
1387 1388 1389 1390 1391 1392 1393

	credentialInfo := internalpb.CredentialInfo{}
	err = proto.Unmarshal([]byte(v), &credentialInfo)
	if err != nil {
		return nil, fmt.Errorf("get credential unmarshal err:%w", err)
	}
	return &internalpb.CredentialInfo{Username: username, EncryptedPassword: credentialInfo.EncryptedPassword}, nil
1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432
}

// DeleteCredential delete credential
func (mt *MetaTable) DeleteCredential(username string) error {
	mt.credLock.Lock()
	defer mt.credLock.Unlock()

	k := fmt.Sprintf("%s/%s", CredentialPrefix, username)

	err := mt.txn.Remove(k)
	if err != nil {
		log.Error("MetaTable remove fail", zap.Error(err))
		return fmt.Errorf("remove credential fail key:%s, err:%w", username, err)
	}
	return nil
}

// ListCredentialUsernames list credential usernames
func (mt *MetaTable) ListCredentialUsernames() (*milvuspb.ListCredUsersResponse, error) {
	mt.credLock.RLock()
	defer mt.credLock.RUnlock()

	keys, _, err := mt.txn.LoadWithPrefix(CredentialPrefix)
	if err != nil {
		log.Error("MetaTable list all credential usernames fail", zap.Error(err))
		return &milvuspb.ListCredUsersResponse{}, err
	}

	var usernames []string
	for _, path := range keys {
		username := typeutil.After(path, UserSubPrefix+"/")
		if len(username) == 0 {
			log.Warn("no username extract from path:", zap.String("path", path))
			continue
		}
		usernames = append(usernames, username)
	}
	return &milvuspb.ListCredUsersResponse{Usernames: usernames}, nil
}