meta_table.go 32.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package rootcoord
13 14

import (
S
sunby 已提交
15
	"fmt"
Z
zhenshan.cao 已提交
16
	"path"
17 18 19 20
	"strconv"
	"sync"

	"github.com/golang/protobuf/proto"
B
bigsheeper 已提交
21 22
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
23 24 25 26 27
	"github.com/milvus-io/milvus/internal/kv"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
28
	"github.com/milvus-io/milvus/internal/util/typeutil"
29 30
)

Z
zhenshan.cao 已提交
31
const (
N
neza2017 已提交
32
	ComponentPrefix        = "root-coord"
N
neza2017 已提交
33 34 35 36 37
	TenantMetaPrefix       = ComponentPrefix + "/tenant"
	ProxyMetaPrefix        = ComponentPrefix + "/proxy"
	CollectionMetaPrefix   = ComponentPrefix + "/collection"
	SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index"
	IndexMetaPrefix        = ComponentPrefix + "/index"
38

39 40
	TimestampPrefix = ComponentPrefix + "/timestamp"

41 42
	DDOperationPrefix = ComponentPrefix + "/dd-operation"
	DDMsgSendPrefix   = ComponentPrefix + "/dd-msg-send"
43

44 45 46 47
	CreateCollectionDDType = "CreateCollection"
	DropCollectionDDType   = "DropCollection"
	CreatePartitionDDType  = "CreatePartition"
	DropPartitionDDType    = "DropPartition"
Z
zhenshan.cao 已提交
48 49
)

50
type metaTable struct {
51 52 53 54 55
	client          kv.SnapShotKV                                                   // client of a reliable kv service, i.e. etcd client
	tenantID2Meta   map[typeutil.UniqueID]pb.TenantMeta                             // tenant id to tenant meta
	proxyID2Meta    map[typeutil.UniqueID]pb.ProxyMeta                              // proxy id to proxy meta
	collID2Meta     map[typeutil.UniqueID]pb.CollectionInfo                         // collection_id -> meta
	collName2ID     map[string]typeutil.UniqueID                                    // collection name to collection id
56
	partID2SegID    map[typeutil.UniqueID]map[typeutil.UniqueID]bool                // partition_id -> segment_id -> bool
57 58
	segID2IndexMeta map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo // collection_id/index_id/partition_id/segment_id -> meta
	indexID2Meta    map[typeutil.UniqueID]pb.IndexInfo                              // collection_id/index_id -> meta
59 60 61 62 63 64

	tenantLock sync.RWMutex
	proxyLock  sync.RWMutex
	ddLock     sync.RWMutex
}

65
func NewMetaTable(kv kv.SnapShotKV) (*metaTable, error) {
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
	mt := &metaTable{
		client:     kv,
		tenantLock: sync.RWMutex{},
		proxyLock:  sync.RWMutex{},
		ddLock:     sync.RWMutex{},
	}
	err := mt.reloadFromKV()
	if err != nil {
		return nil, err
	}
	return mt, nil
}

func (mt *metaTable) reloadFromKV() error {

	mt.tenantID2Meta = make(map[typeutil.UniqueID]pb.TenantMeta)
	mt.proxyID2Meta = make(map[typeutil.UniqueID]pb.ProxyMeta)
Z
zhenshan.cao 已提交
83
	mt.collID2Meta = make(map[typeutil.UniqueID]pb.CollectionInfo)
84
	mt.collName2ID = make(map[string]typeutil.UniqueID)
85
	mt.partID2SegID = make(map[typeutil.UniqueID]map[typeutil.UniqueID]bool)
86
	mt.segID2IndexMeta = make(map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo)
Z
zhenshan.cao 已提交
87
	mt.indexID2Meta = make(map[typeutil.UniqueID]pb.IndexInfo)
88

89
	_, values, err := mt.client.LoadWithPrefix(TenantMetaPrefix, 0)
90 91 92 93 94 95 96 97
	if err != nil {
		return err
	}

	for _, value := range values {
		tenantMeta := pb.TenantMeta{}
		err := proto.UnmarshalText(value, &tenantMeta)
		if err != nil {
C
Cai Yudong 已提交
98
			return fmt.Errorf("RootCoord UnmarshalText pb.TenantMeta err:%w", err)
99 100 101 102
		}
		mt.tenantID2Meta[tenantMeta.ID] = tenantMeta
	}

103
	_, values, err = mt.client.LoadWithPrefix(ProxyMetaPrefix, 0)
104 105 106 107 108 109 110 111
	if err != nil {
		return err
	}

	for _, value := range values {
		proxyMeta := pb.ProxyMeta{}
		err = proto.UnmarshalText(value, &proxyMeta)
		if err != nil {
C
Cai Yudong 已提交
112
			return fmt.Errorf("RootCoord UnmarshalText pb.ProxyMeta err:%w", err)
113 114 115 116
		}
		mt.proxyID2Meta[proxyMeta.ID] = proxyMeta
	}

117
	_, values, err = mt.client.LoadWithPrefix(CollectionMetaPrefix, 0)
118 119 120 121 122
	if err != nil {
		return err
	}

	for _, value := range values {
123 124
		collInfo := pb.CollectionInfo{}
		err = proto.UnmarshalText(value, &collInfo)
125
		if err != nil {
C
Cai Yudong 已提交
126
			return fmt.Errorf("RootCoord UnmarshalText pb.CollectionInfo err:%w", err)
127
		}
128 129
		mt.collID2Meta[collInfo.ID] = collInfo
		mt.collName2ID[collInfo.Schema.Name] = collInfo.ID
130 131
	}

132
	_, values, err = mt.client.LoadWithPrefix(SegmentIndexMetaPrefix, 0)
133 134 135
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
136 137 138
	for _, value := range values {
		segmentIndexInfo := pb.SegmentIndexInfo{}
		err = proto.UnmarshalText(value, &segmentIndexInfo)
139
		if err != nil {
C
Cai Yudong 已提交
140
			return fmt.Errorf("RootCoord UnmarshalText pb.SegmentIndexInfo err:%w", err)
141
		}
142 143 144 145 146 147 148 149 150 151 152 153

		// update partID2SegID
		segIDMap, ok := mt.partID2SegID[segmentIndexInfo.PartitionID]
		if ok {
			segIDMap[segmentIndexInfo.SegmentID] = true
		} else {
			idMap := make(map[typeutil.UniqueID]bool)
			idMap[segmentIndexInfo.SegmentID] = true
			mt.partID2SegID[segmentIndexInfo.PartitionID] = idMap
		}

		// update segID2IndexMeta
Z
zhenshan.cao 已提交
154
		idx, ok := mt.segID2IndexMeta[segmentIndexInfo.SegmentID]
155
		if ok {
156
			idx[segmentIndexInfo.IndexID] = segmentIndexInfo
Z
zhenshan.cao 已提交
157 158 159
		} else {
			meta := make(map[typeutil.UniqueID]pb.SegmentIndexInfo)
			meta[segmentIndexInfo.IndexID] = segmentIndexInfo
160
			mt.segID2IndexMeta[segmentIndexInfo.SegmentID] = meta
161 162 163
		}
	}

164
	_, values, err = mt.client.LoadWithPrefix(IndexMetaPrefix, 0)
Z
zhenshan.cao 已提交
165 166
	if err != nil {
		return err
167
	}
Z
zhenshan.cao 已提交
168 169 170 171
	for _, value := range values {
		meta := pb.IndexInfo{}
		err = proto.UnmarshalText(value, &meta)
		if err != nil {
C
Cai Yudong 已提交
172
			return fmt.Errorf("RootCoord UnmarshalText pb.IndexInfo err:%w", err)
173
		}
Z
zhenshan.cao 已提交
174
		mt.indexID2Meta[meta.IndexID] = meta
175 176
	}

Z
zhenshan.cao 已提交
177
	return nil
178 179
}

N
neza2017 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192 193
func (mt *metaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error), meta map[string]string) func(ts typeutil.Timestamp) (string, string, error) {
	if op == nil {
		return nil
	}
	meta[DDMsgSendPrefix] = "false"
	return func(ts typeutil.Timestamp) (string, string, error) {
		val, err := op(ts)
		if err != nil {
			return "", "", err
		}
		return DDOperationPrefix, val, nil
	}
}

194
func (mt *metaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error {
N
neza2017 已提交
195 196 197 198 199 200
	mt.tenantLock.Lock()
	defer mt.tenantLock.Unlock()

	k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID)
	v := proto.MarshalTextString(te)

201
	err := mt.client.Save(k, v, ts)
202
	if err != nil {
203 204
		log.Error("SnapShotKV Save fail", zap.Error(err))
		panic("SnapShotKV Save fail")
N
neza2017 已提交
205 206
	}
	mt.tenantID2Meta[te.ID] = *te
207
	return nil
N
neza2017 已提交
208 209
}

210
func (mt *metaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error {
N
neza2017 已提交
211 212 213 214 215 216
	mt.proxyLock.Lock()
	defer mt.proxyLock.Unlock()

	k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID)
	v := proto.MarshalTextString(po)

217
	err := mt.client.Save(k, v, ts)
218
	if err != nil {
219 220
		log.Error("SnapShotKV Save fail", zap.Error(err))
		panic("SnapShotKV Save fail")
N
neza2017 已提交
221 222
	}
	mt.proxyID2Meta[po.ID] = *po
223
	return nil
N
neza2017 已提交
224 225
}

226
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
227 228
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
Z
zhenshan.cao 已提交
229

230 231
	if len(coll.PartitionIDs) != len(coll.PartitionNames) ||
		len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) ||
232
		(len(coll.PartitionIDs) != 1 && len(coll.PartitionIDs) != 0) {
233
		return fmt.Errorf("PartitionIDs, PartitionNames and PartitionCreatedTimestmaps' length mis-match when creating collection")
234
	}
235
	if _, ok := mt.collName2ID[coll.Schema.Name]; ok {
236
		return fmt.Errorf("collection %s exist", coll.Schema.Name)
237
	}
N
neza2017 已提交
238
	if len(coll.FieldIndexes) != len(idx) {
239
		return fmt.Errorf("incorrect index id when creating collection")
N
neza2017 已提交
240
	}
241

N
neza2017 已提交
242 243 244
	for _, i := range idx {
		mt.indexID2Meta[i.IndexID] = *i
	}
Z
zhenshan.cao 已提交
245

246
	meta := make(map[string]string)
Z
zhenshan.cao 已提交
247

N
neza2017 已提交
248
	for _, i := range idx {
N
neza2017 已提交
249
		k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID)
N
neza2017 已提交
250 251 252 253
		v := proto.MarshalTextString(i)
		meta[k] = v
	}

254
	// save ddOpStr into etcd
N
neza2017 已提交
255
	addition := mt.getAdditionKV(ddOpStr, meta)
256 257 258 259 260 261 262 263 264 265 266 267 268
	saveColl := func(ts typeutil.Timestamp) (string, string, error) {
		coll.CreateTime = ts
		if len(coll.PartitionCreatedTimestamps) == 1 {
			coll.PartitionCreatedTimestamps[0] = ts
		}
		mt.collID2Meta[coll.ID] = *coll
		mt.collName2ID[coll.Schema.Name] = coll.ID
		k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID)
		v1 := proto.MarshalTextString(coll)
		meta[k1] = v1
		return k1, v1, nil
	}

269
	err := mt.client.MultiSave(meta, ts, addition, saveColl)
270
	if err != nil {
271 272
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
273
	}
274

275
	return nil
276 277
}

278
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
279 280 281 282 283
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
284
		return fmt.Errorf("can't find collection. id = %d", collID)
285 286
	}

Z
zhenshan.cao 已提交
287 288
	delete(mt.collID2Meta, collID)
	delete(mt.collName2ID, collMeta.Schema.Name)
289 290 291 292 293 294 295

	// update segID2IndexMeta
	for partID := range collMeta.PartitionIDs {
		if segIDMap, ok := mt.partID2SegID[typeutil.UniqueID(partID)]; ok {
			for segID := range segIDMap {
				delete(mt.segID2IndexMeta, segID)
			}
Z
zhenshan.cao 已提交
296 297
		}
	}
298 299 300 301 302 303

	// update partID2SegID
	for partID := range collMeta.PartitionIDs {
		delete(mt.partID2SegID, typeutil.UniqueID(partID))
	}

N
neza2017 已提交
304 305 306 307 308 309 310 311
	for _, idxInfo := range collMeta.FieldIndexes {
		_, ok := mt.indexID2Meta[idxInfo.IndexID]
		if !ok {
			log.Warn("index id not exist", zap.Int64("index id", idxInfo.IndexID))
			continue
		}
		delete(mt.indexID2Meta, idxInfo.IndexID)
	}
312

313
	delMetakeys := []string{
N
neza2017 已提交
314 315 316 317
		fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID),
		fmt.Sprintf("%s/%d", SegmentIndexMetaPrefix, collID),
		fmt.Sprintf("%s/%d", IndexMetaPrefix, collID),
	}
318

319
	// save ddOpStr into etcd
N
neza2017 已提交
320 321
	var saveMeta = map[string]string{}
	addition := mt.getAdditionKV(ddOpStr, saveMeta)
322
	err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, ts, addition)
323
	if err != nil {
324 325
		log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
326 327
	}

328
	return nil
329 330
}

331
func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool {
332 333
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
334 335 336 337 338 339 340
	if ts == 0 {
		_, ok := mt.collID2Meta[collID]
		return ok
	}
	key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	_, err := mt.client.Load(key, ts)
	return err == nil
341 342
}

343
func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
N
neza2017 已提交
344 345 346
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

347 348 349 350 351 352 353
	if ts == 0 {
		col, ok := mt.collID2Meta[collectionID]
		if !ok {
			return nil, fmt.Errorf("can't find collection id : %d", collectionID)
		}
		colCopy := proto.Clone(&col)
		return colCopy.(*pb.CollectionInfo), nil
N
neza2017 已提交
354
	}
355 356 357 358 359 360 361 362 363 364 365
	key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID)
	val, err := mt.client.Load(key, ts)
	if err != nil {
		return nil, err
	}
	colMeta := pb.CollectionInfo{}
	err = proto.UnmarshalText(val, &colMeta)
	if err != nil {
		return nil, err
	}
	return &colMeta, nil
N
neza2017 已提交
366 367
}

368
func (mt *metaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
369 370 371
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

372 373 374 375 376 377 378 379 380 381 382
	if ts == 0 {
		vid, ok := mt.collName2ID[collectionName]
		if !ok {
			return nil, fmt.Errorf("can't find collection: " + collectionName)
		}
		col, ok := mt.collID2Meta[vid]
		if !ok {
			return nil, fmt.Errorf("can't find collection: " + collectionName)
		}
		colCopy := proto.Clone(&col)
		return colCopy.(*pb.CollectionInfo), nil
N
neza2017 已提交
383
	}
384 385 386
	_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
	if err != nil {
		return nil, err
N
neza2017 已提交
387
	}
388 389 390 391 392 393 394 395 396 397 398 399
	for _, val := range vals {
		collMeta := pb.CollectionInfo{}
		err = proto.UnmarshalText(val, &collMeta)
		if err != nil {
			log.Debug("unmarshal collection info failed", zap.Error(err))
			continue
		}
		if collMeta.Schema.Name == collectionName {
			return &collMeta, nil
		}
	}
	return nil, fmt.Errorf("can't find collection: %s, at timestamp = %d", collectionName, ts)
N
neza2017 已提交
400 401
}

402
func (mt *metaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.CollectionInfo, error) {
403 404
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
405
	colls := make(map[string]*pb.CollectionInfo)
406

407
	if ts == 0 {
408 409 410 411
		for collName, collID := range mt.collName2ID {
			coll := mt.collID2Meta[collID]
			colCopy := proto.Clone(&coll)
			colls[collName] = colCopy.(*pb.CollectionInfo)
N
neza2017 已提交
412 413
		}
		return colls, nil
414 415 416
	}
	_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
	if err != nil {
N
neza2017 已提交
417
		log.Debug("load with prefix error", zap.Uint64("timestamp", ts), zap.Error(err))
418
		return nil, nil
419 420 421 422 423 424 425
	}
	for _, val := range vals {
		collMeta := pb.CollectionInfo{}
		err := proto.UnmarshalText(val, &collMeta)
		if err != nil {
			log.Debug("unmarshal collection info failed", zap.Error(err))
		}
426
		colls[collMeta.Schema.Name] = &collMeta
427 428 429 430
	}
	return colls, nil
}

431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
// ListCollectionVirtualChannels list virtual channel of all the collection
func (mt *metaTable) ListCollectionVirtualChannels() []string {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	vlist := []string{}

	for _, c := range mt.collID2Meta {
		vlist = append(vlist, c.VirtualChannelNames...)
	}
	return vlist
}

// ListCollectionPhysicalChannels list physical channel of all the collection
func (mt *metaTable) ListCollectionPhysicalChannels() []string {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	plist := []string{}

	for _, c := range mt.collID2Meta {
		plist = append(plist, c.PhysicalChannelNames...)
	}
	return plist
}

455
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
456 457 458 459
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	coll, ok := mt.collID2Meta[collID]
	if !ok {
460
		return fmt.Errorf("can't find collection. id = %d", collID)
461 462 463
	}

	// number of partition tags (except _default) should be limited to 4096 by default
N
neza2017 已提交
464
	if int64(len(coll.PartitionIDs)) >= Params.MaxPartitionNum {
465
		return fmt.Errorf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)
466
	}
467

468
	if len(coll.PartitionIDs) != len(coll.PartitionNames) {
469
		return fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionNames)=%d", len(coll.PartitionIDs), len(coll.PartitionNames))
470 471
	}

472
	if len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) {
473
		return fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionIDs), len(coll.PartitionCreatedTimestamps))
474 475 476
	}

	if len(coll.PartitionNames) != len(coll.PartitionCreatedTimestamps) {
477
		return fmt.Errorf("len(coll.PartitionNames)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionNames), len(coll.PartitionCreatedTimestamps))
478 479
	}

480 481
	for idx := range coll.PartitionIDs {
		if coll.PartitionIDs[idx] == partitionID {
482
			return fmt.Errorf("partition id = %d already exists", partitionID)
Z
zhenshan.cao 已提交
483
		}
484
		if coll.PartitionNames[idx] == partitionName {
485
			return fmt.Errorf("partition name = %s already exists", partitionName)
486
		}
487
		// no necessary to check created timestamp
488
	}
489
	meta := make(map[string]string)
Z
zhenshan.cao 已提交
490

491
	// save ddOpStr into etcd
N
neza2017 已提交
492
	addition := mt.getAdditionKV(ddOpStr, meta)
493

494 495 496 497 498 499 500 501 502 503 504 505 506
	saveColl := func(ts typeutil.Timestamp) (string, string, error) {
		coll.PartitionIDs = append(coll.PartitionIDs, partitionID)
		coll.PartitionNames = append(coll.PartitionNames, partitionName)
		coll.PartitionCreatedTimestamps = append(coll.PartitionCreatedTimestamps, ts)
		mt.collID2Meta[collID] = coll

		k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
		v1 := proto.MarshalTextString(&coll)
		meta[k1] = v1

		return k1, v1, nil
	}

507
	err := mt.client.MultiSave(meta, ts, addition, saveColl)
508
	if err != nil {
509 510
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
511
	}
512
	return nil
513 514
}

515
func (mt *metaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (string, error) {
516
	if ts == 0 {
517 518
		mt.ddLock.RLock()
		defer mt.ddLock.RUnlock()
519 520
		collMeta, ok := mt.collID2Meta[collID]
		if !ok {
521
			return "", fmt.Errorf("can't find collection id = %d", collID)
522
		}
523 524
		for idx := range collMeta.PartitionIDs {
			if collMeta.PartitionIDs[idx] == partitionID {
525
				return collMeta.PartitionNames[idx], nil
526 527
			}
		}
528
		return "", fmt.Errorf("partition %d does not exist", partitionID)
529 530 531 532
	}
	collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	collVal, err := mt.client.Load(collKey, ts)
	if err != nil {
533
		return "", err
534
	}
535
	collMeta := pb.CollectionInfo{}
536 537
	err = proto.UnmarshalText(collVal, &collMeta)
	if err != nil {
538
		return "", err
539
	}
540 541
	for idx := range collMeta.PartitionIDs {
		if collMeta.PartitionIDs[idx] == partitionID {
542
			return collMeta.PartitionNames[idx], nil
543
		}
544 545 546 547 548 549 550 551 552
	}
	return "", fmt.Errorf("partition %d does not exist", partitionID)
}

func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
	if ts == 0 {
		collMeta, ok := mt.collID2Meta[collID]
		if !ok {
			return 0, fmt.Errorf("can't find collection id = %d", collID)
553
		}
554
		for idx := range collMeta.PartitionIDs {
555
			if collMeta.PartitionNames[idx] == partitionName {
556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571
				return collMeta.PartitionIDs[idx], nil
			}
		}
		return 0, fmt.Errorf("partition %s does not exist", partitionName)
	}
	collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	collVal, err := mt.client.Load(collKey, ts)
	if err != nil {
		return 0, err
	}
	collMeta := pb.CollectionInfo{}
	err = proto.UnmarshalText(collVal, &collMeta)
	if err != nil {
		return 0, err
	}
	for idx := range collMeta.PartitionIDs {
572
		if collMeta.PartitionNames[idx] == partitionName {
573
			return collMeta.PartitionIDs[idx], nil
574 575
		}
	}
576
	return 0, fmt.Errorf("partition %s does not exist", partitionName)
577 578
}

579
func (mt *metaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
580 581
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
582
	return mt.getPartitionByName(collID, partitionName, ts)
583 584
}

585
func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool {
586 587
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
588
	_, err := mt.getPartitionByName(collID, partitionName, ts)
589 590 591
	return err == nil
}

592
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.UniqueID, error) {
593 594 595
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

Z
zhenshan.cao 已提交
596
	if partitionName == Params.DefaultPartitionName {
597
		return 0, fmt.Errorf("default partition cannot be deleted")
598 599 600 601
	}

	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
602
		return 0, fmt.Errorf("can't find collection id = %d", collID)
603 604 605 606 607 608
	}

	// check tag exists
	exist := false

	pd := make([]typeutil.UniqueID, 0, len(collMeta.PartitionIDs))
609
	pn := make([]string, 0, len(collMeta.PartitionNames))
610
	pts := make([]uint64, 0, len(collMeta.PartitionCreatedTimestamps))
611 612
	var partID typeutil.UniqueID
	for idx := range collMeta.PartitionIDs {
613
		if collMeta.PartitionNames[idx] == partitionName {
614 615 616 617
			partID = collMeta.PartitionIDs[idx]
			exist = true
		} else {
			pd = append(pd, collMeta.PartitionIDs[idx])
618
			pn = append(pn, collMeta.PartitionNames[idx])
619
			pts = append(pts, collMeta.PartitionCreatedTimestamps[idx])
620 621 622
		}
	}
	if !exist {
623
		return 0, fmt.Errorf("partition %s does not exist", partitionName)
624
	}
Z
zhenshan.cao 已提交
625
	collMeta.PartitionIDs = pd
626
	collMeta.PartitionNames = pn
627
	collMeta.PartitionCreatedTimestamps = pts
Z
zhenshan.cao 已提交
628
	mt.collID2Meta[collID] = collMeta
629

630 631 632 633
	// update segID2IndexMeta and partID2SegID
	if segIDMap, ok := mt.partID2SegID[partID]; ok {
		for segID := range segIDMap {
			delete(mt.segID2IndexMeta, segID)
634 635
		}
	}
636 637
	delete(mt.partID2SegID, partID)

638
	meta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)}
639
	delMetaKeys := []string{}
N
neza2017 已提交
640
	for _, idxInfo := range collMeta.FieldIndexes {
641
		k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partID)
N
neza2017 已提交
642 643 644
		delMetaKeys = append(delMetaKeys, k)
	}

645
	// save ddOpStr into etcd
N
neza2017 已提交
646
	addition := mt.getAdditionKV(ddOpStr, meta)
647

648
	err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, ts, addition)
649
	if err != nil {
650 651
		log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
652
	}
653
	return partID, nil
654 655
}

656
func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Timestamp) error {
657 658
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
659

660
	collMeta, ok := mt.collID2Meta[segIdxInfo.CollectionID]
661
	if !ok {
662
		return fmt.Errorf("collection id = %d not found", segIdxInfo.CollectionID)
663
	}
664 665 666 667 668 669
	exist := false
	for _, fidx := range collMeta.FieldIndexes {
		if fidx.IndexID == segIdxInfo.IndexID {
			exist = true
			break
		}
670
	}
671
	if !exist {
672
		return fmt.Errorf("index id = %d not found", segIdxInfo.IndexID)
673
	}
674

675 676 677 678
	segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID]
	if !ok {
		idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{segIdxInfo.IndexID: *segIdxInfo}
		mt.segID2IndexMeta[segIdxInfo.SegmentID] = idxMap
679 680 681

		segIDMap := map[typeutil.UniqueID]bool{segIdxInfo.SegmentID: true}
		mt.partID2SegID[segIdxInfo.PartitionID] = segIDMap
682 683 684 685 686
	} else {
		tmpInfo, ok := segIdxMap[segIdxInfo.IndexID]
		if ok {
			if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) {
				if segIdxInfo.BuildID == tmpInfo.BuildID {
687
					log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID))
688
					return nil
689
				}
690
				return fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
691
			}
692 693 694
		}
	}

695
	mt.segID2IndexMeta[segIdxInfo.SegmentID][segIdxInfo.IndexID] = *segIdxInfo
696 697
	mt.partID2SegID[segIdxInfo.PartitionID][segIdxInfo.SegmentID] = true

698
	k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, segIdxInfo.CollectionID, segIdxInfo.IndexID, segIdxInfo.PartitionID, segIdxInfo.SegmentID)
699
	v := proto.MarshalTextString(segIdxInfo)
N
neza2017 已提交
700

701
	err := mt.client.Save(k, v, ts)
N
neza2017 已提交
702
	if err != nil {
703 704
		log.Error("SnapShotKV Save fail", zap.Error(err))
		panic("SnapShotKV Save fail")
N
neza2017 已提交
705
	}
706

707
	return nil
N
neza2017 已提交
708 709
}

710
//return timestamp, index id, is dropped, error
711
func (mt *metaTable) DropIndex(collName, fieldName, indexName string, ts typeutil.Timestamp) (typeutil.UniqueID, bool, error) {
N
neza2017 已提交
712 713 714 715 716
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	collID, ok := mt.collName2ID[collName]
	if !ok {
717
		return 0, false, fmt.Errorf("collection name = %s not exist", collName)
N
neza2017 已提交
718 719 720
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
721
		return 0, false, fmt.Errorf("collection name  = %s not has meta", collName)
N
neza2017 已提交
722 723 724
	}
	fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName)
	if err != nil {
725
		return 0, false, err
N
neza2017 已提交
726 727 728 729 730 731 732 733 734 735 736
	}
	fieldIdxInfo := make([]*pb.FieldIndexInfo, 0, len(collMeta.FieldIndexes))
	var dropIdxID typeutil.UniqueID
	for i, info := range collMeta.FieldIndexes {
		if info.FiledID != fieldSch.FieldID {
			fieldIdxInfo = append(fieldIdxInfo, info)
			continue
		}
		idxMeta, ok := mt.indexID2Meta[info.IndexID]
		if !ok {
			fieldIdxInfo = append(fieldIdxInfo, info)
N
neza2017 已提交
737
			log.Warn("index id not has meta", zap.Int64("index id", info.IndexID))
N
neza2017 已提交
738 739 740 741 742 743 744 745 746 747 748
			continue
		}
		if idxMeta.IndexName != indexName {
			fieldIdxInfo = append(fieldIdxInfo, info)
			continue
		}
		dropIdxID = info.IndexID
		fieldIdxInfo = append(fieldIdxInfo, collMeta.FieldIndexes[i+1:]...)
		break
	}
	if len(fieldIdxInfo) == len(collMeta.FieldIndexes) {
N
neza2017 已提交
749
		log.Warn("drop index,index not found", zap.String("collection name", collName), zap.String("filed name", fieldName), zap.String("index name", indexName))
750
		return 0, false, nil
N
neza2017 已提交
751 752 753 754 755 756 757
	}
	collMeta.FieldIndexes = fieldIdxInfo
	mt.collID2Meta[collID] = collMeta
	saveMeta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)}

	delete(mt.indexID2Meta, dropIdxID)

758 759 760 761 762 763 764
	// update segID2IndexMeta
	for partID := range collMeta.PartitionIDs {
		if segIDMap, ok := mt.partID2SegID[typeutil.UniqueID(partID)]; ok {
			for segID := range segIDMap {
				if segIndexInfos, ok := mt.segID2IndexMeta[segID]; ok {
					delete(segIndexInfos, dropIdxID)
				}
N
neza2017 已提交
765 766 767
			}
		}
	}
768

N
neza2017 已提交
769 770 771 772
	delMeta := []string{
		fmt.Sprintf("%s/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, dropIdxID),
		fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID),
	}
N
neza2017 已提交
773

774
	err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta, ts)
N
neza2017 已提交
775
	if err != nil {
776 777
		log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
N
neza2017 已提交
778 779
	}

780
	return dropIdxID, true, nil
N
neza2017 已提交
781 782
}

N
neza2017 已提交
783 784 785 786 787 788
func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	segIdxMap, ok := mt.segID2IndexMeta[segID]
	if !ok {
789 790 791 792 793 794 795
		return pb.SegmentIndexInfo{
			SegmentID:   segID,
			FieldID:     filedID,
			IndexID:     0,
			BuildID:     0,
			EnableIndex: false,
		}, nil
N
neza2017 已提交
796
	}
797
	if len(segIdxMap) == 0 {
S
sunby 已提交
798
		return pb.SegmentIndexInfo{}, fmt.Errorf("segment id %d not has any index", segID)
N
neza2017 已提交
799 800
	}

B
bigsheeper 已提交
801
	if filedID == -1 && idxName == "" { // return default index
802
		for _, seg := range segIdxMap {
B
bigsheeper 已提交
803 804 805 806
			info, ok := mt.indexID2Meta[seg.IndexID]
			if ok && info.IndexName == Params.DefaultIndexName {
				return seg, nil
			}
N
neza2017 已提交
807 808
		}
	} else {
809
		for idxID, seg := range segIdxMap {
N
neza2017 已提交
810 811 812 813 814 815 816 817 818 819 820 821
			idxMeta, ok := mt.indexID2Meta[idxID]
			if ok {
				if idxMeta.IndexName != idxName {
					continue
				}
				if seg.FieldID != filedID {
					continue
				}
				return seg, nil
			}
		}
	}
S
sunby 已提交
822
	return pb.SegmentIndexInfo{}, fmt.Errorf("can't find index name = %s on segment = %d, with filed id = %d", idxName, segID, filedID)
N
neza2017 已提交
823 824 825 826 827 828
}

func (mt *metaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

N
neza2017 已提交
829 830 831 832
	return mt.unlockGetFieldSchema(collName, fieldName)
}

func (mt *metaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
N
neza2017 已提交
833 834
	collID, ok := mt.collName2ID[collName]
	if !ok {
S
sunby 已提交
835
		return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
836 837 838
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
S
sunby 已提交
839
		return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
840 841 842 843 844 845 846
	}

	for _, field := range collMeta.Schema.Fields {
		if field.Name == fieldName {
			return *field, nil
		}
	}
S
sunby 已提交
847
	return schemapb.FieldSchema{}, fmt.Errorf("collection %s doesn't have filed %s", collName, fieldName)
N
neza2017 已提交
848 849 850 851 852 853
}

//return true/false
func (mt *metaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
N
neza2017 已提交
854 855 856 857
	return mt.unlockIsSegmentIndexed(segID, fieldSchema, indexParams)
}

func (mt *metaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
N
neza2017 已提交
858 859 860 861 862
	segIdx, ok := mt.segID2IndexMeta[segID]
	if !ok {
		return false
	}
	exist := false
863
	for idxID, meta := range segIdx {
N
neza2017 已提交
864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879
		if meta.FieldID != fieldSchema.FieldID {
			continue
		}
		idxMeta, ok := mt.indexID2Meta[idxID]
		if !ok {
			continue
		}
		if EqualKeyPairArray(indexParams, idxMeta.IndexParams) {
			exist = true
			break
		}
	}
	return exist
}

// return segment ids, type params, error
880
func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID) ([]typeutil.UniqueID, schemapb.FieldSchema, error) {
N
neza2017 已提交
881 882 883
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

N
neza2017 已提交
884 885 886
	if idxInfo.IndexParams == nil {
		return nil, schemapb.FieldSchema{}, fmt.Errorf("index param is nil")
	}
N
neza2017 已提交
887 888
	collID, ok := mt.collName2ID[collName]
	if !ok {
S
sunby 已提交
889
		return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
890 891 892
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
S
sunby 已提交
893
		return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
894
	}
N
neza2017 已提交
895
	fieldSchema, err := mt.unlockGetFieldSchema(collName, fieldName)
N
neza2017 已提交
896 897 898 899
	if err != nil {
		return nil, fieldSchema, err
	}

N
neza2017 已提交
900 901
	var dupIdx typeutil.UniqueID = 0
	for _, f := range collMeta.FieldIndexes {
902 903 904 905
		if info, ok := mt.indexID2Meta[f.IndexID]; ok {
			if info.IndexName == idxInfo.IndexName {
				dupIdx = info.IndexID
				break
N
neza2017 已提交
906 907 908
			}
		}
	}
N
neza2017 已提交
909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924

	exist := false
	var existInfo pb.IndexInfo
	for _, f := range collMeta.FieldIndexes {
		if f.FiledID == fieldSchema.FieldID {
			existInfo, ok = mt.indexID2Meta[f.IndexID]
			if !ok {
				return nil, schemapb.FieldSchema{}, fmt.Errorf("index id = %d not found", f.IndexID)
			}
			if EqualKeyPairArray(existInfo.IndexParams, idxInfo.IndexParams) {
				exist = true
				break
			}
		}
	}
	if !exist {
N
neza2017 已提交
925
		idx := &pb.FieldIndexInfo{
N
neza2017 已提交
926 927
			FiledID: fieldSchema.FieldID,
			IndexID: idxInfo.IndexID,
N
neza2017 已提交
928 929
		}
		collMeta.FieldIndexes = append(collMeta.FieldIndexes, idx)
N
neza2017 已提交
930 931 932 933
		mt.collID2Meta[collMeta.ID] = collMeta
		k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(collMeta.ID, 10))
		v1 := proto.MarshalTextString(&collMeta)

N
neza2017 已提交
934 935
		mt.indexID2Meta[idx.IndexID] = *idxInfo
		k2 := path.Join(IndexMetaPrefix, strconv.FormatInt(idx.IndexID, 10))
Z
zhenshan.cao 已提交
936
		v2 := proto.MarshalTextString(idxInfo)
N
neza2017 已提交
937 938
		meta := map[string]string{k1: v1, k2: v2}

N
neza2017 已提交
939 940 941 942 943 944 945 946 947
		if dupIdx != 0 {
			dupInfo := mt.indexID2Meta[dupIdx]
			dupInfo.IndexName = dupInfo.IndexName + "_bak"
			mt.indexID2Meta[dupIdx] = dupInfo
			k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
			v := proto.MarshalTextString(&dupInfo)
			meta[k] = v
		}

948
		err = mt.client.MultiSave(meta, 0)
N
neza2017 已提交
949
		if err != nil {
950 951
			log.Error("SnapShotKV MultiSave fail", zap.Error(err))
			panic("SnapShotKV MultiSave fail")
N
neza2017 已提交
952
		}
N
neza2017 已提交
953 954 955 956 957 958 959
	} else {
		idxInfo.IndexID = existInfo.IndexID
		if existInfo.IndexName != idxInfo.IndexName { //replace index name
			existInfo.IndexName = idxInfo.IndexName
			mt.indexID2Meta[existInfo.IndexID] = existInfo
			k := path.Join(IndexMetaPrefix, strconv.FormatInt(existInfo.IndexID, 10))
			v := proto.MarshalTextString(&existInfo)
N
neza2017 已提交
960 961 962 963 964 965 966 967 968 969
			meta := map[string]string{k: v}
			if dupIdx != 0 {
				dupInfo := mt.indexID2Meta[dupIdx]
				dupInfo.IndexName = dupInfo.IndexName + "_bak"
				mt.indexID2Meta[dupIdx] = dupInfo
				k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
				v := proto.MarshalTextString(&dupInfo)
				meta[k] = v
			}

970
			err = mt.client.MultiSave(meta, 0)
N
neza2017 已提交
971
			if err != nil {
972 973
				log.Error("SnapShotKV MultiSave fail", zap.Error(err))
				panic("SnapShotKV MultiSave fail")
N
neza2017 已提交
974 975
			}
		}
N
neza2017 已提交
976 977
	}

N
neza2017 已提交
978
	rstID := make([]typeutil.UniqueID, 0, 16)
979 980 981
	for _, segID := range segIDs {
		if exist := mt.unlockIsSegmentIndexed(segID, &fieldSchema, idxInfo.IndexParams); !exist {
			rstID = append(rstID, segID)
N
neza2017 已提交
982 983 984 985 986
		}
	}
	return rstID, fieldSchema, nil
}

987
func (mt *metaTable) GetIndexByName(collName, indexName string) (pb.CollectionInfo, []pb.IndexInfo, error) {
N
neza2017 已提交
988
	mt.ddLock.RLock()
S
sunby 已提交
989
	defer mt.ddLock.RUnlock()
N
neza2017 已提交
990 991 992

	collID, ok := mt.collName2ID[collName]
	if !ok {
993
		return pb.CollectionInfo{}, nil, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
994 995 996
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
997
		return pb.CollectionInfo{}, nil, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
998 999
	}

N
neza2017 已提交
1000
	rstIndex := make([]pb.IndexInfo, 0, len(collMeta.FieldIndexes))
Z
zhenshan.cao 已提交
1001
	for _, idx := range collMeta.FieldIndexes {
1002 1003
		idxInfo, ok := mt.indexID2Meta[idx.IndexID]
		if !ok {
1004
			return pb.CollectionInfo{}, nil, fmt.Errorf("index id = %d not found", idx.IndexID)
1005 1006 1007
		}
		if indexName == "" || idxInfo.IndexName == indexName {
			rstIndex = append(rstIndex, idxInfo)
N
neza2017 已提交
1008 1009
		}
	}
1010
	return collMeta, rstIndex, nil
N
neza2017 已提交
1011
}
B
bigsheeper 已提交
1012 1013 1014

func (mt *metaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error) {
	mt.ddLock.RLock()
S
sunby 已提交
1015
	defer mt.ddLock.RUnlock()
B
bigsheeper 已提交
1016 1017 1018

	indexInfo, ok := mt.indexID2Meta[indexID]
	if !ok {
S
sunby 已提交
1019
		return nil, fmt.Errorf("cannot find index, id = %d", indexID)
B
bigsheeper 已提交
1020 1021 1022
	}
	return &indexInfo, nil
}
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048

func (mt *metaTable) dupMeta() (
	map[typeutil.UniqueID]pb.CollectionInfo,
	map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo,
	map[typeutil.UniqueID]pb.IndexInfo,
) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	collID2Meta := map[typeutil.UniqueID]pb.CollectionInfo{}
	segID2IndexMeta := map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo{}
	indexID2Meta := map[typeutil.UniqueID]pb.IndexInfo{}
	for k, v := range mt.collID2Meta {
		collID2Meta[k] = v
	}
	for k, v := range mt.segID2IndexMeta {
		segID2IndexMeta[k] = map[typeutil.UniqueID]pb.SegmentIndexInfo{}
		for k2, v2 := range v {
			segID2IndexMeta[k][k2] = v2
		}
	}
	for k, v := range mt.indexID2Meta {
		indexID2Meta[k] = v
	}
	return collID2Meta, segID2IndexMeta, indexID2Meta
}