meta_table.go 31.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package rootcoord
13 14

import (
S
sunby 已提交
15
	"fmt"
Z
zhenshan.cao 已提交
16
	"path"
17 18 19 20
	"strconv"
	"sync"

	"github.com/golang/protobuf/proto"
B
bigsheeper 已提交
21 22
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
23 24 25 26 27
	"github.com/milvus-io/milvus/internal/kv"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
28
	"github.com/milvus-io/milvus/internal/util/typeutil"
29 30
)

Z
zhenshan.cao 已提交
31
const (
N
neza2017 已提交
32
	ComponentPrefix        = "root-coord"
N
neza2017 已提交
33 34 35 36 37
	TenantMetaPrefix       = ComponentPrefix + "/tenant"
	ProxyMetaPrefix        = ComponentPrefix + "/proxy"
	CollectionMetaPrefix   = ComponentPrefix + "/collection"
	SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index"
	IndexMetaPrefix        = ComponentPrefix + "/index"
38

39 40
	TimestampPrefix = ComponentPrefix + "/timestamp"

41 42
	DDOperationPrefix = ComponentPrefix + "/dd-operation"
	DDMsgSendPrefix   = ComponentPrefix + "/dd-msg-send"
43

44 45 46 47
	CreateCollectionDDType = "CreateCollection"
	DropCollectionDDType   = "DropCollection"
	CreatePartitionDDType  = "CreatePartition"
	DropPartitionDDType    = "DropPartition"
Z
zhenshan.cao 已提交
48 49
)

50
type metaTable struct {
51 52 53 54 55
	client          kv.SnapShotKV                                                   // client of a reliable kv service, i.e. etcd client
	tenantID2Meta   map[typeutil.UniqueID]pb.TenantMeta                             // tenant id to tenant meta
	proxyID2Meta    map[typeutil.UniqueID]pb.ProxyMeta                              // proxy id to proxy meta
	collID2Meta     map[typeutil.UniqueID]pb.CollectionInfo                         // collection_id -> meta
	collName2ID     map[string]typeutil.UniqueID                                    // collection name to collection id
56
	partID2SegID    map[typeutil.UniqueID]map[typeutil.UniqueID]bool                // partition_id -> segment_id -> bool
57 58
	segID2IndexMeta map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo // collection_id/index_id/partition_id/segment_id -> meta
	indexID2Meta    map[typeutil.UniqueID]pb.IndexInfo                              // collection_id/index_id -> meta
59 60 61 62 63 64

	tenantLock sync.RWMutex
	proxyLock  sync.RWMutex
	ddLock     sync.RWMutex
}

65
func NewMetaTable(kv kv.SnapShotKV) (*metaTable, error) {
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
	mt := &metaTable{
		client:     kv,
		tenantLock: sync.RWMutex{},
		proxyLock:  sync.RWMutex{},
		ddLock:     sync.RWMutex{},
	}
	err := mt.reloadFromKV()
	if err != nil {
		return nil, err
	}
	return mt, nil
}

func (mt *metaTable) reloadFromKV() error {

	mt.tenantID2Meta = make(map[typeutil.UniqueID]pb.TenantMeta)
	mt.proxyID2Meta = make(map[typeutil.UniqueID]pb.ProxyMeta)
Z
zhenshan.cao 已提交
83
	mt.collID2Meta = make(map[typeutil.UniqueID]pb.CollectionInfo)
84
	mt.collName2ID = make(map[string]typeutil.UniqueID)
85
	mt.partID2SegID = make(map[typeutil.UniqueID]map[typeutil.UniqueID]bool)
86
	mt.segID2IndexMeta = make(map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo)
Z
zhenshan.cao 已提交
87
	mt.indexID2Meta = make(map[typeutil.UniqueID]pb.IndexInfo)
88

89
	_, values, err := mt.client.LoadWithPrefix(TenantMetaPrefix, 0)
90 91 92 93 94 95 96 97
	if err != nil {
		return err
	}

	for _, value := range values {
		tenantMeta := pb.TenantMeta{}
		err := proto.UnmarshalText(value, &tenantMeta)
		if err != nil {
C
Cai Yudong 已提交
98
			return fmt.Errorf("RootCoord UnmarshalText pb.TenantMeta err:%w", err)
99 100 101 102
		}
		mt.tenantID2Meta[tenantMeta.ID] = tenantMeta
	}

103
	_, values, err = mt.client.LoadWithPrefix(ProxyMetaPrefix, 0)
104 105 106 107 108 109 110 111
	if err != nil {
		return err
	}

	for _, value := range values {
		proxyMeta := pb.ProxyMeta{}
		err = proto.UnmarshalText(value, &proxyMeta)
		if err != nil {
C
Cai Yudong 已提交
112
			return fmt.Errorf("RootCoord UnmarshalText pb.ProxyMeta err:%w", err)
113 114 115 116
		}
		mt.proxyID2Meta[proxyMeta.ID] = proxyMeta
	}

117
	_, values, err = mt.client.LoadWithPrefix(CollectionMetaPrefix, 0)
118 119 120 121 122
	if err != nil {
		return err
	}

	for _, value := range values {
123 124
		collInfo := pb.CollectionInfo{}
		err = proto.UnmarshalText(value, &collInfo)
125
		if err != nil {
C
Cai Yudong 已提交
126
			return fmt.Errorf("RootCoord UnmarshalText pb.CollectionInfo err:%w", err)
127
		}
128 129
		mt.collID2Meta[collInfo.ID] = collInfo
		mt.collName2ID[collInfo.Schema.Name] = collInfo.ID
130 131
	}

132
	_, values, err = mt.client.LoadWithPrefix(SegmentIndexMetaPrefix, 0)
133 134 135
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
136 137 138
	for _, value := range values {
		segmentIndexInfo := pb.SegmentIndexInfo{}
		err = proto.UnmarshalText(value, &segmentIndexInfo)
139
		if err != nil {
C
Cai Yudong 已提交
140
			return fmt.Errorf("RootCoord UnmarshalText pb.SegmentIndexInfo err:%w", err)
141
		}
142 143 144 145 146 147 148 149 150 151 152 153

		// update partID2SegID
		segIDMap, ok := mt.partID2SegID[segmentIndexInfo.PartitionID]
		if ok {
			segIDMap[segmentIndexInfo.SegmentID] = true
		} else {
			idMap := make(map[typeutil.UniqueID]bool)
			idMap[segmentIndexInfo.SegmentID] = true
			mt.partID2SegID[segmentIndexInfo.PartitionID] = idMap
		}

		// update segID2IndexMeta
Z
zhenshan.cao 已提交
154
		idx, ok := mt.segID2IndexMeta[segmentIndexInfo.SegmentID]
155
		if ok {
156
			idx[segmentIndexInfo.IndexID] = segmentIndexInfo
Z
zhenshan.cao 已提交
157 158 159
		} else {
			meta := make(map[typeutil.UniqueID]pb.SegmentIndexInfo)
			meta[segmentIndexInfo.IndexID] = segmentIndexInfo
160
			mt.segID2IndexMeta[segmentIndexInfo.SegmentID] = meta
161 162 163
		}
	}

164
	_, values, err = mt.client.LoadWithPrefix(IndexMetaPrefix, 0)
Z
zhenshan.cao 已提交
165 166
	if err != nil {
		return err
167
	}
Z
zhenshan.cao 已提交
168 169 170 171
	for _, value := range values {
		meta := pb.IndexInfo{}
		err = proto.UnmarshalText(value, &meta)
		if err != nil {
C
Cai Yudong 已提交
172
			return fmt.Errorf("RootCoord UnmarshalText pb.IndexInfo err:%w", err)
173
		}
Z
zhenshan.cao 已提交
174
		mt.indexID2Meta[meta.IndexID] = meta
175 176
	}

Z
zhenshan.cao 已提交
177
	return nil
178 179
}

N
neza2017 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192 193
func (mt *metaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error), meta map[string]string) func(ts typeutil.Timestamp) (string, string, error) {
	if op == nil {
		return nil
	}
	meta[DDMsgSendPrefix] = "false"
	return func(ts typeutil.Timestamp) (string, string, error) {
		val, err := op(ts)
		if err != nil {
			return "", "", err
		}
		return DDOperationPrefix, val, nil
	}
}

194
func (mt *metaTable) AddTenant(te *pb.TenantMeta) (typeutil.Timestamp, error) {
N
neza2017 已提交
195 196 197 198 199 200
	mt.tenantLock.Lock()
	defer mt.tenantLock.Unlock()

	k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID)
	v := proto.MarshalTextString(te)

201 202 203
	ts, err := mt.client.Save(k, v)
	if err != nil {
		return 0, err
N
neza2017 已提交
204 205
	}
	mt.tenantID2Meta[te.ID] = *te
206
	return ts, nil
N
neza2017 已提交
207 208
}

209
func (mt *metaTable) AddProxy(po *pb.ProxyMeta) (typeutil.Timestamp, error) {
N
neza2017 已提交
210 211 212 213 214 215
	mt.proxyLock.Lock()
	defer mt.proxyLock.Unlock()

	k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID)
	v := proto.MarshalTextString(po)

216 217 218
	ts, err := mt.client.Save(k, v)
	if err != nil {
		return 0, err
N
neza2017 已提交
219 220
	}
	mt.proxyID2Meta[po.ID] = *po
221
	return ts, nil
N
neza2017 已提交
222 223
}

224
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
225 226
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
Z
zhenshan.cao 已提交
227

228 229
	if len(coll.PartitionIDs) != len(coll.PartitionNames) ||
		len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) ||
230
		(len(coll.PartitionIDs) != 1 && len(coll.PartitionIDs) != 0) {
231
		return 0, fmt.Errorf("PartitionIDs, PartitionNames and PartitionCreatedTimestmaps' length mis-match when creating collection")
232
	}
233
	if _, ok := mt.collName2ID[coll.Schema.Name]; ok {
234
		return 0, fmt.Errorf("collection %s exist", coll.Schema.Name)
235
	}
N
neza2017 已提交
236
	if len(coll.FieldIndexes) != len(idx) {
237
		return 0, fmt.Errorf("incorrect index id when creating collection")
N
neza2017 已提交
238
	}
239

N
neza2017 已提交
240 241 242
	for _, i := range idx {
		mt.indexID2Meta[i.IndexID] = *i
	}
Z
zhenshan.cao 已提交
243

244
	meta := make(map[string]string)
Z
zhenshan.cao 已提交
245

N
neza2017 已提交
246
	for _, i := range idx {
N
neza2017 已提交
247
		k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID)
N
neza2017 已提交
248 249 250 251
		v := proto.MarshalTextString(i)
		meta[k] = v
	}

252
	// save ddOpStr into etcd
N
neza2017 已提交
253
	addition := mt.getAdditionKV(ddOpStr, meta)
254 255 256 257 258 259 260 261 262 263 264 265 266 267
	saveColl := func(ts typeutil.Timestamp) (string, string, error) {
		coll.CreateTime = ts
		if len(coll.PartitionCreatedTimestamps) == 1 {
			coll.PartitionCreatedTimestamps[0] = ts
		}
		mt.collID2Meta[coll.ID] = *coll
		mt.collName2ID[coll.Schema.Name] = coll.ID
		k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID)
		v1 := proto.MarshalTextString(coll)
		meta[k1] = v1
		return k1, v1, nil
	}

	ts, err := mt.client.MultiSave(meta, addition, saveColl)
268 269
	if err != nil {
		_ = mt.reloadFromKV()
270
		return 0, err
271
	}
272

273
	return ts, nil
274 275
}

276
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
277 278 279 280 281
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
282
		return 0, fmt.Errorf("can't find collection. id = %d", collID)
283 284
	}

Z
zhenshan.cao 已提交
285 286
	delete(mt.collID2Meta, collID)
	delete(mt.collName2ID, collMeta.Schema.Name)
287 288 289 290 291 292 293

	// update segID2IndexMeta
	for partID := range collMeta.PartitionIDs {
		if segIDMap, ok := mt.partID2SegID[typeutil.UniqueID(partID)]; ok {
			for segID := range segIDMap {
				delete(mt.segID2IndexMeta, segID)
			}
Z
zhenshan.cao 已提交
294 295
		}
	}
296 297 298 299 300 301

	// update partID2SegID
	for partID := range collMeta.PartitionIDs {
		delete(mt.partID2SegID, typeutil.UniqueID(partID))
	}

N
neza2017 已提交
302 303 304 305 306 307 308 309
	for _, idxInfo := range collMeta.FieldIndexes {
		_, ok := mt.indexID2Meta[idxInfo.IndexID]
		if !ok {
			log.Warn("index id not exist", zap.Int64("index id", idxInfo.IndexID))
			continue
		}
		delete(mt.indexID2Meta, idxInfo.IndexID)
	}
310

311
	delMetakeys := []string{
N
neza2017 已提交
312 313 314 315
		fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID),
		fmt.Sprintf("%s/%d", SegmentIndexMetaPrefix, collID),
		fmt.Sprintf("%s/%d", IndexMetaPrefix, collID),
	}
316

317
	// save ddOpStr into etcd
N
neza2017 已提交
318 319 320
	var saveMeta = map[string]string{}
	addition := mt.getAdditionKV(ddOpStr, saveMeta)
	ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, addition)
321 322
	if err != nil {
		_ = mt.reloadFromKV()
323
		return 0, err
324 325
	}

326
	return ts, nil
327 328
}

329
func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool {
330 331
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
332 333 334 335 336 337 338
	if ts == 0 {
		_, ok := mt.collID2Meta[collID]
		return ok
	}
	key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	_, err := mt.client.Load(key, ts)
	return err == nil
339 340
}

341
func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
N
neza2017 已提交
342 343 344
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

345 346 347 348 349 350 351
	if ts == 0 {
		col, ok := mt.collID2Meta[collectionID]
		if !ok {
			return nil, fmt.Errorf("can't find collection id : %d", collectionID)
		}
		colCopy := proto.Clone(&col)
		return colCopy.(*pb.CollectionInfo), nil
N
neza2017 已提交
352
	}
353 354 355 356 357 358 359 360 361 362 363
	key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID)
	val, err := mt.client.Load(key, ts)
	if err != nil {
		return nil, err
	}
	colMeta := pb.CollectionInfo{}
	err = proto.UnmarshalText(val, &colMeta)
	if err != nil {
		return nil, err
	}
	return &colMeta, nil
N
neza2017 已提交
364 365
}

366
func (mt *metaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
367 368 369
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

370 371 372 373 374 375 376 377 378 379 380
	if ts == 0 {
		vid, ok := mt.collName2ID[collectionName]
		if !ok {
			return nil, fmt.Errorf("can't find collection: " + collectionName)
		}
		col, ok := mt.collID2Meta[vid]
		if !ok {
			return nil, fmt.Errorf("can't find collection: " + collectionName)
		}
		colCopy := proto.Clone(&col)
		return colCopy.(*pb.CollectionInfo), nil
N
neza2017 已提交
381
	}
382 383 384
	_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
	if err != nil {
		return nil, err
N
neza2017 已提交
385
	}
386 387 388 389 390 391 392 393 394 395 396 397
	for _, val := range vals {
		collMeta := pb.CollectionInfo{}
		err = proto.UnmarshalText(val, &collMeta)
		if err != nil {
			log.Debug("unmarshal collection info failed", zap.Error(err))
			continue
		}
		if collMeta.Schema.Name == collectionName {
			return &collMeta, nil
		}
	}
	return nil, fmt.Errorf("can't find collection: %s, at timestamp = %d", collectionName, ts)
N
neza2017 已提交
398 399
}

400
func (mt *metaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.CollectionInfo, error) {
401 402
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
403
	colls := make(map[string]*pb.CollectionInfo)
404

405
	if ts == 0 {
406 407 408 409
		for collName, collID := range mt.collName2ID {
			coll := mt.collID2Meta[collID]
			colCopy := proto.Clone(&coll)
			colls[collName] = colCopy.(*pb.CollectionInfo)
N
neza2017 已提交
410 411
		}
		return colls, nil
412 413 414
	}
	_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
	if err != nil {
N
neza2017 已提交
415
		log.Debug("load with prefix error", zap.Uint64("timestamp", ts), zap.Error(err))
416
		return nil, nil
417 418 419 420 421 422 423
	}
	for _, val := range vals {
		collMeta := pb.CollectionInfo{}
		err := proto.UnmarshalText(val, &collMeta)
		if err != nil {
			log.Debug("unmarshal collection info failed", zap.Error(err))
		}
424
		colls[collMeta.Schema.Name] = &collMeta
425 426 427 428
	}
	return colls, nil
}

429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
// ListCollectionVirtualChannels list virtual channel of all the collection
func (mt *metaTable) ListCollectionVirtualChannels() []string {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	vlist := []string{}

	for _, c := range mt.collID2Meta {
		vlist = append(vlist, c.VirtualChannelNames...)
	}
	return vlist
}

// ListCollectionPhysicalChannels list physical channel of all the collection
func (mt *metaTable) ListCollectionPhysicalChannels() []string {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	plist := []string{}

	for _, c := range mt.collID2Meta {
		plist = append(plist, c.PhysicalChannelNames...)
	}
	return plist
}

453
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
454 455 456 457
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	coll, ok := mt.collID2Meta[collID]
	if !ok {
458
		return 0, fmt.Errorf("can't find collection. id = %d", collID)
459 460 461
	}

	// number of partition tags (except _default) should be limited to 4096 by default
N
neza2017 已提交
462
	if int64(len(coll.PartitionIDs)) >= Params.MaxPartitionNum {
463
		return 0, fmt.Errorf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)
464
	}
465

466 467
	if len(coll.PartitionIDs) != len(coll.PartitionNames) {
		return 0, fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionNames)=%d", len(coll.PartitionIDs), len(coll.PartitionNames))
468 469
	}

470 471 472 473 474 475 476 477
	if len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) {
		return 0, fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionIDs), len(coll.PartitionCreatedTimestamps))
	}

	if len(coll.PartitionNames) != len(coll.PartitionCreatedTimestamps) {
		return 0, fmt.Errorf("len(coll.PartitionNames)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionNames), len(coll.PartitionCreatedTimestamps))
	}

478 479 480
	for idx := range coll.PartitionIDs {
		if coll.PartitionIDs[idx] == partitionID {
			return 0, fmt.Errorf("partition id = %d already exists", partitionID)
Z
zhenshan.cao 已提交
481
		}
482
		if coll.PartitionNames[idx] == partitionName {
483
			return 0, fmt.Errorf("partition name = %s already exists", partitionName)
484
		}
485
		// no necessary to check created timestamp
486
	}
487
	meta := make(map[string]string)
Z
zhenshan.cao 已提交
488

489
	// save ddOpStr into etcd
N
neza2017 已提交
490
	addition := mt.getAdditionKV(ddOpStr, meta)
491

492 493 494 495 496 497 498 499 500 501 502 503 504 505
	saveColl := func(ts typeutil.Timestamp) (string, string, error) {
		coll.PartitionIDs = append(coll.PartitionIDs, partitionID)
		coll.PartitionNames = append(coll.PartitionNames, partitionName)
		coll.PartitionCreatedTimestamps = append(coll.PartitionCreatedTimestamps, ts)
		mt.collID2Meta[collID] = coll

		k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
		v1 := proto.MarshalTextString(&coll)
		meta[k1] = v1

		return k1, v1, nil
	}

	ts, err := mt.client.MultiSave(meta, addition, saveColl)
506 507
	if err != nil {
		_ = mt.reloadFromKV()
508
		return 0, err
509
	}
510
	return ts, nil
511 512
}

513
func (mt *metaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (string, error) {
514
	if ts == 0 {
515 516
		mt.ddLock.RLock()
		defer mt.ddLock.RUnlock()
517 518
		collMeta, ok := mt.collID2Meta[collID]
		if !ok {
519
			return "", fmt.Errorf("can't find collection id = %d", collID)
520
		}
521 522
		for idx := range collMeta.PartitionIDs {
			if collMeta.PartitionIDs[idx] == partitionID {
523
				return collMeta.PartitionNames[idx], nil
524 525
			}
		}
526
		return "", fmt.Errorf("partition %d does not exist", partitionID)
527 528 529 530
	}
	collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	collVal, err := mt.client.Load(collKey, ts)
	if err != nil {
531
		return "", err
532
	}
533
	collMeta := pb.CollectionInfo{}
534 535
	err = proto.UnmarshalText(collVal, &collMeta)
	if err != nil {
536
		return "", err
537
	}
538 539
	for idx := range collMeta.PartitionIDs {
		if collMeta.PartitionIDs[idx] == partitionID {
540
			return collMeta.PartitionNames[idx], nil
541
		}
542 543 544 545 546 547 548 549 550
	}
	return "", fmt.Errorf("partition %d does not exist", partitionID)
}

func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
	if ts == 0 {
		collMeta, ok := mt.collID2Meta[collID]
		if !ok {
			return 0, fmt.Errorf("can't find collection id = %d", collID)
551
		}
552
		for idx := range collMeta.PartitionIDs {
553
			if collMeta.PartitionNames[idx] == partitionName {
554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569
				return collMeta.PartitionIDs[idx], nil
			}
		}
		return 0, fmt.Errorf("partition %s does not exist", partitionName)
	}
	collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	collVal, err := mt.client.Load(collKey, ts)
	if err != nil {
		return 0, err
	}
	collMeta := pb.CollectionInfo{}
	err = proto.UnmarshalText(collVal, &collMeta)
	if err != nil {
		return 0, err
	}
	for idx := range collMeta.PartitionIDs {
570
		if collMeta.PartitionNames[idx] == partitionName {
571
			return collMeta.PartitionIDs[idx], nil
572 573
		}
	}
574
	return 0, fmt.Errorf("partition %s does not exist", partitionName)
575 576
}

577
func (mt *metaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
578 579
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
580
	return mt.getPartitionByName(collID, partitionName, ts)
581 582
}

583
func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool {
584 585
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
586
	_, err := mt.getPartitionByName(collID, partitionName, ts)
587 588 589
	return err == nil
}

590
//return timestamp, partitionid, error
591
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, typeutil.UniqueID, error) {
592 593 594
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

Z
zhenshan.cao 已提交
595
	if partitionName == Params.DefaultPartitionName {
596
		return 0, 0, fmt.Errorf("default partition cannot be deleted")
597 598 599 600
	}

	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
601
		return 0, 0, fmt.Errorf("can't find collection id = %d", collID)
602 603 604 605 606 607
	}

	// check tag exists
	exist := false

	pd := make([]typeutil.UniqueID, 0, len(collMeta.PartitionIDs))
608
	pn := make([]string, 0, len(collMeta.PartitionNames))
609
	pts := make([]uint64, 0, len(collMeta.PartitionCreatedTimestamps))
610 611
	var partID typeutil.UniqueID
	for idx := range collMeta.PartitionIDs {
612
		if collMeta.PartitionNames[idx] == partitionName {
613 614 615 616
			partID = collMeta.PartitionIDs[idx]
			exist = true
		} else {
			pd = append(pd, collMeta.PartitionIDs[idx])
617
			pn = append(pn, collMeta.PartitionNames[idx])
618
			pts = append(pts, collMeta.PartitionCreatedTimestamps[idx])
619 620 621
		}
	}
	if !exist {
622
		return 0, 0, fmt.Errorf("partition %s does not exist", partitionName)
623
	}
Z
zhenshan.cao 已提交
624
	collMeta.PartitionIDs = pd
625
	collMeta.PartitionNames = pn
626
	collMeta.PartitionCreatedTimestamps = pts
Z
zhenshan.cao 已提交
627
	mt.collID2Meta[collID] = collMeta
628

629 630 631 632
	// update segID2IndexMeta and partID2SegID
	if segIDMap, ok := mt.partID2SegID[partID]; ok {
		for segID := range segIDMap {
			delete(mt.segID2IndexMeta, segID)
633 634
		}
	}
635 636
	delete(mt.partID2SegID, partID)

637
	meta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)}
638
	delMetaKeys := []string{}
N
neza2017 已提交
639
	for _, idxInfo := range collMeta.FieldIndexes {
640
		k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partID)
N
neza2017 已提交
641 642 643
		delMetaKeys = append(delMetaKeys, k)
	}

644
	// save ddOpStr into etcd
N
neza2017 已提交
645
	addition := mt.getAdditionKV(ddOpStr, meta)
646

N
neza2017 已提交
647
	ts, err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, addition)
648 649
	if err != nil {
		_ = mt.reloadFromKV()
650
		return 0, 0, err
651
	}
652
	return ts, partID, nil
653 654
}

655
func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timestamp, error) {
656 657
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
658

659
	collMeta, ok := mt.collID2Meta[segIdxInfo.CollectionID]
660
	if !ok {
661
		return 0, fmt.Errorf("collection id = %d not found", segIdxInfo.CollectionID)
662
	}
663 664 665 666 667 668
	exist := false
	for _, fidx := range collMeta.FieldIndexes {
		if fidx.IndexID == segIdxInfo.IndexID {
			exist = true
			break
		}
669
	}
670 671
	if !exist {
		return 0, fmt.Errorf("index id = %d not found", segIdxInfo.IndexID)
672
	}
673

674 675 676 677
	segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID]
	if !ok {
		idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{segIdxInfo.IndexID: *segIdxInfo}
		mt.segID2IndexMeta[segIdxInfo.SegmentID] = idxMap
678 679 680

		segIDMap := map[typeutil.UniqueID]bool{segIdxInfo.SegmentID: true}
		mt.partID2SegID[segIdxInfo.PartitionID] = segIDMap
681 682 683 684 685
	} else {
		tmpInfo, ok := segIdxMap[segIdxInfo.IndexID]
		if ok {
			if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) {
				if segIdxInfo.BuildID == tmpInfo.BuildID {
686
					log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID))
687
					return 0, nil
688 689
				}
				return 0, fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
690
			}
691 692 693
		}
	}

694
	mt.segID2IndexMeta[segIdxInfo.SegmentID][segIdxInfo.IndexID] = *segIdxInfo
695 696
	mt.partID2SegID[segIdxInfo.PartitionID][segIdxInfo.SegmentID] = true

697
	k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, segIdxInfo.CollectionID, segIdxInfo.IndexID, segIdxInfo.PartitionID, segIdxInfo.SegmentID)
698
	v := proto.MarshalTextString(segIdxInfo)
N
neza2017 已提交
699

700
	ts, err := mt.client.Save(k, v)
N
neza2017 已提交
701 702
	if err != nil {
		_ = mt.reloadFromKV()
703
		return 0, err
N
neza2017 已提交
704
	}
705

706
	return ts, nil
N
neza2017 已提交
707 708
}

709
//return timestamp, index id, is dropped, error
710
func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.Timestamp, typeutil.UniqueID, bool, error) {
N
neza2017 已提交
711 712 713 714 715
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	collID, ok := mt.collName2ID[collName]
	if !ok {
716
		return 0, 0, false, fmt.Errorf("collection name = %s not exist", collName)
N
neza2017 已提交
717 718 719
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
720
		return 0, 0, false, fmt.Errorf("collection name  = %s not has meta", collName)
N
neza2017 已提交
721 722 723
	}
	fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName)
	if err != nil {
724
		return 0, 0, false, err
N
neza2017 已提交
725 726 727 728 729 730 731 732 733 734 735
	}
	fieldIdxInfo := make([]*pb.FieldIndexInfo, 0, len(collMeta.FieldIndexes))
	var dropIdxID typeutil.UniqueID
	for i, info := range collMeta.FieldIndexes {
		if info.FiledID != fieldSch.FieldID {
			fieldIdxInfo = append(fieldIdxInfo, info)
			continue
		}
		idxMeta, ok := mt.indexID2Meta[info.IndexID]
		if !ok {
			fieldIdxInfo = append(fieldIdxInfo, info)
N
neza2017 已提交
736
			log.Warn("index id not has meta", zap.Int64("index id", info.IndexID))
N
neza2017 已提交
737 738 739 740 741 742 743 744 745 746 747
			continue
		}
		if idxMeta.IndexName != indexName {
			fieldIdxInfo = append(fieldIdxInfo, info)
			continue
		}
		dropIdxID = info.IndexID
		fieldIdxInfo = append(fieldIdxInfo, collMeta.FieldIndexes[i+1:]...)
		break
	}
	if len(fieldIdxInfo) == len(collMeta.FieldIndexes) {
N
neza2017 已提交
748
		log.Warn("drop index,index not found", zap.String("collection name", collName), zap.String("filed name", fieldName), zap.String("index name", indexName))
749
		return 0, 0, false, nil
N
neza2017 已提交
750 751 752 753 754 755 756
	}
	collMeta.FieldIndexes = fieldIdxInfo
	mt.collID2Meta[collID] = collMeta
	saveMeta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)}

	delete(mt.indexID2Meta, dropIdxID)

757 758 759 760 761 762 763
	// update segID2IndexMeta
	for partID := range collMeta.PartitionIDs {
		if segIDMap, ok := mt.partID2SegID[typeutil.UniqueID(partID)]; ok {
			for segID := range segIDMap {
				if segIndexInfos, ok := mt.segID2IndexMeta[segID]; ok {
					delete(segIndexInfos, dropIdxID)
				}
N
neza2017 已提交
764 765 766
			}
		}
	}
767

N
neza2017 已提交
768 769 770 771
	delMeta := []string{
		fmt.Sprintf("%s/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, dropIdxID),
		fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID),
	}
N
neza2017 已提交
772

773
	ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta)
N
neza2017 已提交
774 775
	if err != nil {
		_ = mt.reloadFromKV()
776
		return 0, 0, false, err
N
neza2017 已提交
777 778
	}

779
	return ts, dropIdxID, true, nil
N
neza2017 已提交
780 781
}

N
neza2017 已提交
782 783 784 785 786 787
func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	segIdxMap, ok := mt.segID2IndexMeta[segID]
	if !ok {
788 789 790 791 792 793 794
		return pb.SegmentIndexInfo{
			SegmentID:   segID,
			FieldID:     filedID,
			IndexID:     0,
			BuildID:     0,
			EnableIndex: false,
		}, nil
N
neza2017 已提交
795
	}
796
	if len(segIdxMap) == 0 {
S
sunby 已提交
797
		return pb.SegmentIndexInfo{}, fmt.Errorf("segment id %d not has any index", segID)
N
neza2017 已提交
798 799
	}

B
bigsheeper 已提交
800
	if filedID == -1 && idxName == "" { // return default index
801
		for _, seg := range segIdxMap {
B
bigsheeper 已提交
802 803 804 805
			info, ok := mt.indexID2Meta[seg.IndexID]
			if ok && info.IndexName == Params.DefaultIndexName {
				return seg, nil
			}
N
neza2017 已提交
806 807
		}
	} else {
808
		for idxID, seg := range segIdxMap {
N
neza2017 已提交
809 810 811 812 813 814 815 816 817 818 819 820
			idxMeta, ok := mt.indexID2Meta[idxID]
			if ok {
				if idxMeta.IndexName != idxName {
					continue
				}
				if seg.FieldID != filedID {
					continue
				}
				return seg, nil
			}
		}
	}
S
sunby 已提交
821
	return pb.SegmentIndexInfo{}, fmt.Errorf("can't find index name = %s on segment = %d, with filed id = %d", idxName, segID, filedID)
N
neza2017 已提交
822 823 824 825 826 827
}

func (mt *metaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

N
neza2017 已提交
828 829 830 831
	return mt.unlockGetFieldSchema(collName, fieldName)
}

func (mt *metaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
N
neza2017 已提交
832 833
	collID, ok := mt.collName2ID[collName]
	if !ok {
S
sunby 已提交
834
		return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
835 836 837
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
S
sunby 已提交
838
		return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
839 840 841 842 843 844 845
	}

	for _, field := range collMeta.Schema.Fields {
		if field.Name == fieldName {
			return *field, nil
		}
	}
S
sunby 已提交
846
	return schemapb.FieldSchema{}, fmt.Errorf("collection %s doesn't have filed %s", collName, fieldName)
N
neza2017 已提交
847 848 849 850 851 852
}

//return true/false
func (mt *metaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
N
neza2017 已提交
853 854 855 856
	return mt.unlockIsSegmentIndexed(segID, fieldSchema, indexParams)
}

func (mt *metaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
N
neza2017 已提交
857 858 859 860 861
	segIdx, ok := mt.segID2IndexMeta[segID]
	if !ok {
		return false
	}
	exist := false
862
	for idxID, meta := range segIdx {
N
neza2017 已提交
863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878
		if meta.FieldID != fieldSchema.FieldID {
			continue
		}
		idxMeta, ok := mt.indexID2Meta[idxID]
		if !ok {
			continue
		}
		if EqualKeyPairArray(indexParams, idxMeta.IndexParams) {
			exist = true
			break
		}
	}
	return exist
}

// return segment ids, type params, error
879
func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID) ([]typeutil.UniqueID, schemapb.FieldSchema, error) {
N
neza2017 已提交
880 881 882
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

N
neza2017 已提交
883 884 885
	if idxInfo.IndexParams == nil {
		return nil, schemapb.FieldSchema{}, fmt.Errorf("index param is nil")
	}
N
neza2017 已提交
886 887
	collID, ok := mt.collName2ID[collName]
	if !ok {
S
sunby 已提交
888
		return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
889 890 891
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
S
sunby 已提交
892
		return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
893
	}
N
neza2017 已提交
894
	fieldSchema, err := mt.unlockGetFieldSchema(collName, fieldName)
N
neza2017 已提交
895 896 897 898
	if err != nil {
		return nil, fieldSchema, err
	}

N
neza2017 已提交
899 900
	var dupIdx typeutil.UniqueID = 0
	for _, f := range collMeta.FieldIndexes {
901 902 903 904
		if info, ok := mt.indexID2Meta[f.IndexID]; ok {
			if info.IndexName == idxInfo.IndexName {
				dupIdx = info.IndexID
				break
N
neza2017 已提交
905 906 907
			}
		}
	}
N
neza2017 已提交
908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923

	exist := false
	var existInfo pb.IndexInfo
	for _, f := range collMeta.FieldIndexes {
		if f.FiledID == fieldSchema.FieldID {
			existInfo, ok = mt.indexID2Meta[f.IndexID]
			if !ok {
				return nil, schemapb.FieldSchema{}, fmt.Errorf("index id = %d not found", f.IndexID)
			}
			if EqualKeyPairArray(existInfo.IndexParams, idxInfo.IndexParams) {
				exist = true
				break
			}
		}
	}
	if !exist {
N
neza2017 已提交
924
		idx := &pb.FieldIndexInfo{
N
neza2017 已提交
925 926
			FiledID: fieldSchema.FieldID,
			IndexID: idxInfo.IndexID,
N
neza2017 已提交
927 928
		}
		collMeta.FieldIndexes = append(collMeta.FieldIndexes, idx)
N
neza2017 已提交
929 930 931 932
		mt.collID2Meta[collMeta.ID] = collMeta
		k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(collMeta.ID, 10))
		v1 := proto.MarshalTextString(&collMeta)

N
neza2017 已提交
933 934
		mt.indexID2Meta[idx.IndexID] = *idxInfo
		k2 := path.Join(IndexMetaPrefix, strconv.FormatInt(idx.IndexID, 10))
Z
zhenshan.cao 已提交
935
		v2 := proto.MarshalTextString(idxInfo)
N
neza2017 已提交
936 937
		meta := map[string]string{k1: v1, k2: v2}

N
neza2017 已提交
938 939 940 941 942 943 944 945 946
		if dupIdx != 0 {
			dupInfo := mt.indexID2Meta[dupIdx]
			dupInfo.IndexName = dupInfo.IndexName + "_bak"
			mt.indexID2Meta[dupIdx] = dupInfo
			k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
			v := proto.MarshalTextString(&dupInfo)
			meta[k] = v
		}

947
		_, err = mt.client.MultiSave(meta)
N
neza2017 已提交
948 949 950 951 952
		if err != nil {
			_ = mt.reloadFromKV()
			return nil, schemapb.FieldSchema{}, err
		}

N
neza2017 已提交
953 954 955 956 957 958 959
	} else {
		idxInfo.IndexID = existInfo.IndexID
		if existInfo.IndexName != idxInfo.IndexName { //replace index name
			existInfo.IndexName = idxInfo.IndexName
			mt.indexID2Meta[existInfo.IndexID] = existInfo
			k := path.Join(IndexMetaPrefix, strconv.FormatInt(existInfo.IndexID, 10))
			v := proto.MarshalTextString(&existInfo)
N
neza2017 已提交
960 961 962 963 964 965 966 967 968 969
			meta := map[string]string{k: v}
			if dupIdx != 0 {
				dupInfo := mt.indexID2Meta[dupIdx]
				dupInfo.IndexName = dupInfo.IndexName + "_bak"
				mt.indexID2Meta[dupIdx] = dupInfo
				k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
				v := proto.MarshalTextString(&dupInfo)
				meta[k] = v
			}

970
			_, err = mt.client.MultiSave(meta)
N
neza2017 已提交
971 972 973 974 975
			if err != nil {
				_ = mt.reloadFromKV()
				return nil, schemapb.FieldSchema{}, err
			}
		}
N
neza2017 已提交
976 977
	}

N
neza2017 已提交
978
	rstID := make([]typeutil.UniqueID, 0, 16)
979 980 981
	for _, segID := range segIDs {
		if exist := mt.unlockIsSegmentIndexed(segID, &fieldSchema, idxInfo.IndexParams); !exist {
			rstID = append(rstID, segID)
N
neza2017 已提交
982 983 984 985 986
		}
	}
	return rstID, fieldSchema, nil
}

987
func (mt *metaTable) GetIndexByName(collName, indexName string) (pb.CollectionInfo, []pb.IndexInfo, error) {
N
neza2017 已提交
988
	mt.ddLock.RLock()
S
sunby 已提交
989
	defer mt.ddLock.RUnlock()
N
neza2017 已提交
990 991 992

	collID, ok := mt.collName2ID[collName]
	if !ok {
993
		return pb.CollectionInfo{}, nil, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
994 995 996
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
997
		return pb.CollectionInfo{}, nil, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
998 999
	}

N
neza2017 已提交
1000
	rstIndex := make([]pb.IndexInfo, 0, len(collMeta.FieldIndexes))
Z
zhenshan.cao 已提交
1001
	for _, idx := range collMeta.FieldIndexes {
1002 1003
		idxInfo, ok := mt.indexID2Meta[idx.IndexID]
		if !ok {
1004
			return pb.CollectionInfo{}, nil, fmt.Errorf("index id = %d not found", idx.IndexID)
1005 1006 1007
		}
		if indexName == "" || idxInfo.IndexName == indexName {
			rstIndex = append(rstIndex, idxInfo)
N
neza2017 已提交
1008 1009
		}
	}
1010
	return collMeta, rstIndex, nil
N
neza2017 已提交
1011
}
B
bigsheeper 已提交
1012 1013 1014

func (mt *metaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error) {
	mt.ddLock.RLock()
S
sunby 已提交
1015
	defer mt.ddLock.RUnlock()
B
bigsheeper 已提交
1016 1017 1018

	indexInfo, ok := mt.indexID2Meta[indexID]
	if !ok {
S
sunby 已提交
1019
		return nil, fmt.Errorf("cannot find index, id = %d", indexID)
B
bigsheeper 已提交
1020 1021 1022
	}
	return &indexInfo, nil
}
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048

func (mt *metaTable) dupMeta() (
	map[typeutil.UniqueID]pb.CollectionInfo,
	map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo,
	map[typeutil.UniqueID]pb.IndexInfo,
) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	collID2Meta := map[typeutil.UniqueID]pb.CollectionInfo{}
	segID2IndexMeta := map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo{}
	indexID2Meta := map[typeutil.UniqueID]pb.IndexInfo{}
	for k, v := range mt.collID2Meta {
		collID2Meta[k] = v
	}
	for k, v := range mt.segID2IndexMeta {
		segID2IndexMeta[k] = map[typeutil.UniqueID]pb.SegmentIndexInfo{}
		for k2, v2 := range v {
			segID2IndexMeta[k][k2] = v2
		}
	}
	for k, v := range mt.indexID2Meta {
		indexID2Meta[k] = v
	}
	return collID2Meta, segID2IndexMeta, indexID2Meta
}