meta_table.go 38.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package rootcoord
13 14

import (
S
sunby 已提交
15
	"fmt"
Z
zhenshan.cao 已提交
16
	"path"
17 18 19 20
	"strconv"
	"sync"

	"github.com/golang/protobuf/proto"
B
bigsheeper 已提交
21 22
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
23 24 25 26 27
	"github.com/milvus-io/milvus/internal/kv"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
28
	"github.com/milvus-io/milvus/internal/util/typeutil"
29 30
)

Z
zhenshan.cao 已提交
31
const (
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
	// ComponentPrefix prefix for rootcoord component
	ComponentPrefix = "root-coord"

	// TenantMetaPrefix prefix for tenant meta
	TenantMetaPrefix = ComponentPrefix + "/tenant"

	// ProxyMetaPrefix prefix for proxy meta
	ProxyMetaPrefix = ComponentPrefix + "/proxy"

	// CollectionMetaPrefix prefix for collection meta
	CollectionMetaPrefix = ComponentPrefix + "/collection"

	// SegmentIndexMetaPrefix prefix for segment index meta
	SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index"

	// IndexMetaPrefix prefix for index meta
	IndexMetaPrefix = ComponentPrefix + "/index"

	// CollectionAliasMetaPrefix prefix for collection alias meta
Y
Yusup 已提交
51
	CollectionAliasMetaPrefix = ComponentPrefix + "/collection-alias"
52

53
	// TimestampPrefix prefix for timestamp
54 55
	TimestampPrefix = ComponentPrefix + "/timestamp"

56
	// DDOperationPrefix prefix for DD operation
57
	DDOperationPrefix = ComponentPrefix + "/dd-operation"
58

59 60 61 62
	// DDMsgSendPrefix prefix to indicate whether DD msg has been send
	DDMsgSendPrefix = ComponentPrefix + "/dd-msg-send"

	// CreateCollectionDDType name of DD type for create collection
63
	CreateCollectionDDType = "CreateCollection"
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81

	// DropCollectionDDType name of DD type for drop collection
	DropCollectionDDType = "DropCollection"

	// CreatePartitionDDType name of DD type for create partition
	CreatePartitionDDType = "CreatePartition"

	// DropPartitionDDType name of DD type for drop partition
	DropPartitionDDType = "DropPartition"

	// CreateAliasDDType name of DD type for create collection alias
	CreateAliasDDType = "CreateAlias"

	// DropAliasDDType name of DD type for drop collection alias
	DropAliasDDType = "DropAlias"

	// AlterAliasDDType name of DD type for alter collection alias
	AlterAliasDDType = "AlterAlias"
Z
zhenshan.cao 已提交
82 83
)

84 85
// MetaTable store all rootcoord meta info
type MetaTable struct {
86 87 88 89 90
	client          kv.SnapShotKV                                                   // client of a reliable kv service, i.e. etcd client
	tenantID2Meta   map[typeutil.UniqueID]pb.TenantMeta                             // tenant id to tenant meta
	proxyID2Meta    map[typeutil.UniqueID]pb.ProxyMeta                              // proxy id to proxy meta
	collID2Meta     map[typeutil.UniqueID]pb.CollectionInfo                         // collection_id -> meta
	collName2ID     map[string]typeutil.UniqueID                                    // collection name to collection id
Y
Yusup 已提交
91
	collAlias2ID    map[string]typeutil.UniqueID                                    // collection alias to collection id
92
	partID2SegID    map[typeutil.UniqueID]map[typeutil.UniqueID]bool                // partition_id -> segment_id -> bool
93 94
	segID2IndexMeta map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo // collection_id/index_id/partition_id/segment_id -> meta
	indexID2Meta    map[typeutil.UniqueID]pb.IndexInfo                              // collection_id/index_id -> meta
95 96 97 98 99 100

	tenantLock sync.RWMutex
	proxyLock  sync.RWMutex
	ddLock     sync.RWMutex
}

101 102 103 104
// NewMetaTable create meta table for rootcoord, which stores all in-memory information
// for collection, partion, segment, index etc.
func NewMetaTable(kv kv.SnapShotKV) (*MetaTable, error) {
	mt := &MetaTable{
105 106 107 108 109 110 111 112 113 114 115 116
		client:     kv,
		tenantLock: sync.RWMutex{},
		proxyLock:  sync.RWMutex{},
		ddLock:     sync.RWMutex{},
	}
	err := mt.reloadFromKV()
	if err != nil {
		return nil, err
	}
	return mt, nil
}

117
func (mt *MetaTable) reloadFromKV() error {
118 119
	mt.tenantID2Meta = make(map[typeutil.UniqueID]pb.TenantMeta)
	mt.proxyID2Meta = make(map[typeutil.UniqueID]pb.ProxyMeta)
Z
zhenshan.cao 已提交
120
	mt.collID2Meta = make(map[typeutil.UniqueID]pb.CollectionInfo)
121
	mt.collName2ID = make(map[string]typeutil.UniqueID)
Y
Yusup 已提交
122
	mt.collAlias2ID = make(map[string]typeutil.UniqueID)
123
	mt.partID2SegID = make(map[typeutil.UniqueID]map[typeutil.UniqueID]bool)
124
	mt.segID2IndexMeta = make(map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo)
Z
zhenshan.cao 已提交
125
	mt.indexID2Meta = make(map[typeutil.UniqueID]pb.IndexInfo)
126

127
	_, values, err := mt.client.LoadWithPrefix(TenantMetaPrefix, 0)
128 129 130 131 132 133
	if err != nil {
		return err
	}

	for _, value := range values {
		tenantMeta := pb.TenantMeta{}
134
		err := proto.Unmarshal([]byte(value), &tenantMeta)
135
		if err != nil {
136
			return fmt.Errorf("RootCoord Unmarshal pb.TenantMeta err:%w", err)
137 138 139 140
		}
		mt.tenantID2Meta[tenantMeta.ID] = tenantMeta
	}

141
	_, values, err = mt.client.LoadWithPrefix(ProxyMetaPrefix, 0)
142 143 144 145 146 147
	if err != nil {
		return err
	}

	for _, value := range values {
		proxyMeta := pb.ProxyMeta{}
148
		err = proto.Unmarshal([]byte(value), &proxyMeta)
149
		if err != nil {
150
			return fmt.Errorf("RootCoord Unmarshal pb.ProxyMeta err:%w", err)
151 152 153 154
		}
		mt.proxyID2Meta[proxyMeta.ID] = proxyMeta
	}

155
	_, values, err = mt.client.LoadWithPrefix(CollectionMetaPrefix, 0)
156 157 158 159 160
	if err != nil {
		return err
	}

	for _, value := range values {
161
		collInfo := pb.CollectionInfo{}
162
		err = proto.Unmarshal([]byte(value), &collInfo)
163
		if err != nil {
164
			return fmt.Errorf("RootCoord Unmarshal pb.CollectionInfo err:%w", err)
165
		}
166 167
		mt.collID2Meta[collInfo.ID] = collInfo
		mt.collName2ID[collInfo.Schema.Name] = collInfo.ID
168 169
	}

170
	_, values, err = mt.client.LoadWithPrefix(SegmentIndexMetaPrefix, 0)
171 172 173
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
174 175
	for _, value := range values {
		segmentIndexInfo := pb.SegmentIndexInfo{}
176
		err = proto.Unmarshal([]byte(value), &segmentIndexInfo)
177
		if err != nil {
178
			return fmt.Errorf("RootCoord Unmarshal pb.SegmentIndexInfo err:%w", err)
179
		}
180 181 182 183 184 185 186 187 188 189 190 191

		// update partID2SegID
		segIDMap, ok := mt.partID2SegID[segmentIndexInfo.PartitionID]
		if ok {
			segIDMap[segmentIndexInfo.SegmentID] = true
		} else {
			idMap := make(map[typeutil.UniqueID]bool)
			idMap[segmentIndexInfo.SegmentID] = true
			mt.partID2SegID[segmentIndexInfo.PartitionID] = idMap
		}

		// update segID2IndexMeta
Z
zhenshan.cao 已提交
192
		idx, ok := mt.segID2IndexMeta[segmentIndexInfo.SegmentID]
193
		if ok {
194
			idx[segmentIndexInfo.IndexID] = segmentIndexInfo
Z
zhenshan.cao 已提交
195 196 197
		} else {
			meta := make(map[typeutil.UniqueID]pb.SegmentIndexInfo)
			meta[segmentIndexInfo.IndexID] = segmentIndexInfo
198
			mt.segID2IndexMeta[segmentIndexInfo.SegmentID] = meta
199 200 201
		}
	}

202
	_, values, err = mt.client.LoadWithPrefix(IndexMetaPrefix, 0)
Z
zhenshan.cao 已提交
203 204
	if err != nil {
		return err
205
	}
Z
zhenshan.cao 已提交
206 207
	for _, value := range values {
		meta := pb.IndexInfo{}
208
		err = proto.Unmarshal([]byte(value), &meta)
Z
zhenshan.cao 已提交
209
		if err != nil {
210
			return fmt.Errorf("RootCoord Unmarshal pb.IndexInfo err:%w", err)
211
		}
Z
zhenshan.cao 已提交
212
		mt.indexID2Meta[meta.IndexID] = meta
213 214
	}

Y
Yusup 已提交
215 216 217 218 219 220
	_, values, err = mt.client.LoadWithPrefix(CollectionAliasMetaPrefix, 0)
	if err != nil {
		return err
	}
	for _, value := range values {
		aliasInfo := pb.CollectionInfo{}
221
		err = proto.Unmarshal([]byte(value), &aliasInfo)
Y
Yusup 已提交
222
		if err != nil {
223
			return fmt.Errorf("RootCoord Unmarshal pb.AliasInfo err:%w", err)
Y
Yusup 已提交
224 225 226 227
		}
		mt.collAlias2ID[aliasInfo.Schema.Name] = aliasInfo.ID
	}

228
	log.Debug("reload meta table from KV successfully")
Z
zhenshan.cao 已提交
229
	return nil
230 231
}

232
func (mt *MetaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error), meta map[string]string) func(ts typeutil.Timestamp) (string, string, error) {
N
neza2017 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245
	if op == nil {
		return nil
	}
	meta[DDMsgSendPrefix] = "false"
	return func(ts typeutil.Timestamp) (string, string, error) {
		val, err := op(ts)
		if err != nil {
			return "", "", err
		}
		return DDOperationPrefix, val, nil
	}
}

246 247
// AddTenant add tenant
func (mt *MetaTable) AddTenant(te *pb.TenantMeta, ts typeutil.Timestamp) error {
N
neza2017 已提交
248 249 250 251
	mt.tenantLock.Lock()
	defer mt.tenantLock.Unlock()

	k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID)
252
	v, _ := proto.Marshal(te)
N
neza2017 已提交
253

254
	err := mt.client.Save(k, string(v), ts)
255
	if err != nil {
256 257
		log.Error("SnapShotKV Save fail", zap.Error(err))
		panic("SnapShotKV Save fail")
N
neza2017 已提交
258 259
	}
	mt.tenantID2Meta[te.ID] = *te
260
	return nil
N
neza2017 已提交
261 262
}

263 264
// AddProxy add proxy
func (mt *MetaTable) AddProxy(po *pb.ProxyMeta, ts typeutil.Timestamp) error {
N
neza2017 已提交
265 266 267 268
	mt.proxyLock.Lock()
	defer mt.proxyLock.Unlock()

	k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID)
269
	v, _ := proto.Marshal(po)
N
neza2017 已提交
270

271
	err := mt.client.Save(k, string(v), ts)
272
	if err != nil {
273 274
		log.Error("SnapShotKV Save fail", zap.Error(err))
		panic("SnapShotKV Save fail")
N
neza2017 已提交
275 276
	}
	mt.proxyID2Meta[po.ID] = *po
277
	return nil
N
neza2017 已提交
278 279
}

280 281
// AddCollection add collection
func (mt *MetaTable) AddCollection(coll *pb.CollectionInfo, ts typeutil.Timestamp, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
282 283
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
Z
zhenshan.cao 已提交
284

285 286
	if len(coll.PartitionIDs) != len(coll.PartitionNames) ||
		len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) ||
287
		(len(coll.PartitionIDs) != 1 && len(coll.PartitionIDs) != 0) {
288
		return fmt.Errorf("PartitionIDs, PartitionNames and PartitionCreatedTimestmaps' length mis-match when creating collection")
289
	}
290
	if _, ok := mt.collName2ID[coll.Schema.Name]; ok {
291
		return fmt.Errorf("collection %s exist", coll.Schema.Name)
292
	}
N
neza2017 已提交
293
	if len(coll.FieldIndexes) != len(idx) {
294
		return fmt.Errorf("incorrect index id when creating collection")
N
neza2017 已提交
295
	}
296

N
neza2017 已提交
297 298 299
	for _, i := range idx {
		mt.indexID2Meta[i.IndexID] = *i
	}
Z
zhenshan.cao 已提交
300

301
	meta := make(map[string]string)
Z
zhenshan.cao 已提交
302

N
neza2017 已提交
303
	for _, i := range idx {
N
neza2017 已提交
304
		k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID)
305 306
		v, _ := proto.Marshal(i)
		meta[k] = string(v)
N
neza2017 已提交
307 308
	}

309
	// save ddOpStr into etcd
N
neza2017 已提交
310
	addition := mt.getAdditionKV(ddOpStr, meta)
311 312 313 314 315 316 317 318
	saveColl := func(ts typeutil.Timestamp) (string, string, error) {
		coll.CreateTime = ts
		if len(coll.PartitionCreatedTimestamps) == 1 {
			coll.PartitionCreatedTimestamps[0] = ts
		}
		mt.collID2Meta[coll.ID] = *coll
		mt.collName2ID[coll.Schema.Name] = coll.ID
		k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID)
319 320 321
		v1, _ := proto.Marshal(coll)
		meta[k1] = string(v1)
		return k1, string(v1), nil
322 323
	}

324
	err := mt.client.MultiSave(meta, ts, addition, saveColl)
325
	if err != nil {
326 327
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
328
	}
329

330
	return nil
331 332
}

333 334
// DeleteCollection delete collection
func (mt *MetaTable) DeleteCollection(collID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
335 336 337 338 339
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
340
		return fmt.Errorf("can't find collection. id = %d", collID)
341 342
	}

Z
zhenshan.cao 已提交
343 344
	delete(mt.collID2Meta, collID)
	delete(mt.collName2ID, collMeta.Schema.Name)
345 346 347 348 349 350 351

	// update segID2IndexMeta
	for partID := range collMeta.PartitionIDs {
		if segIDMap, ok := mt.partID2SegID[typeutil.UniqueID(partID)]; ok {
			for segID := range segIDMap {
				delete(mt.segID2IndexMeta, segID)
			}
Z
zhenshan.cao 已提交
352 353
		}
	}
354 355 356 357 358 359

	// update partID2SegID
	for partID := range collMeta.PartitionIDs {
		delete(mt.partID2SegID, typeutil.UniqueID(partID))
	}

N
neza2017 已提交
360 361 362 363 364 365 366 367
	for _, idxInfo := range collMeta.FieldIndexes {
		_, ok := mt.indexID2Meta[idxInfo.IndexID]
		if !ok {
			log.Warn("index id not exist", zap.Int64("index id", idxInfo.IndexID))
			continue
		}
		delete(mt.indexID2Meta, idxInfo.IndexID)
	}
Y
Yusup 已提交
368 369 370 371 372 373 374
	var aliases []string
	// delete collection aliases
	for alias, cid := range mt.collAlias2ID {
		if cid == collID {
			aliases = append(aliases, alias)
		}
	}
375

376
	delMetakeys := []string{
N
neza2017 已提交
377 378 379 380
		fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID),
		fmt.Sprintf("%s/%d", SegmentIndexMetaPrefix, collID),
		fmt.Sprintf("%s/%d", IndexMetaPrefix, collID),
	}
381

Y
Yusup 已提交
382 383 384 385 386 387 388
	for _, alias := range aliases {
		delete(mt.collAlias2ID, alias)
		delMetakeys = append(delMetakeys,
			fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, alias),
		)
	}

389
	// save ddOpStr into etcd
N
neza2017 已提交
390 391
	var saveMeta = map[string]string{}
	addition := mt.getAdditionKV(ddOpStr, saveMeta)
392
	err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, ts, addition)
393
	if err != nil {
394 395
		log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
396 397
	}

398
	return nil
399 400
}

401 402
// HasCollection return collection existence
func (mt *MetaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool {
403 404
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
405 406 407 408 409 410 411
	if ts == 0 {
		_, ok := mt.collID2Meta[collID]
		return ok
	}
	key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	_, err := mt.client.Load(key, ts)
	return err == nil
412 413
}

414 415
// GetCollectionByID return collection meta by collection id
func (mt *MetaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
N
neza2017 已提交
416 417 418
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

419 420 421 422 423 424 425
	if ts == 0 {
		col, ok := mt.collID2Meta[collectionID]
		if !ok {
			return nil, fmt.Errorf("can't find collection id : %d", collectionID)
		}
		colCopy := proto.Clone(&col)
		return colCopy.(*pb.CollectionInfo), nil
N
neza2017 已提交
426
	}
427 428 429 430 431 432
	key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID)
	val, err := mt.client.Load(key, ts)
	if err != nil {
		return nil, err
	}
	colMeta := pb.CollectionInfo{}
433
	err = proto.Unmarshal([]byte(val), &colMeta)
434 435 436 437
	if err != nil {
		return nil, err
	}
	return &colMeta, nil
N
neza2017 已提交
438 439
}

440 441
// GetCollectionByName return collection meta by collection name
func (mt *MetaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
442 443 444
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

445 446 447
	if ts == 0 {
		vid, ok := mt.collName2ID[collectionName]
		if !ok {
Y
Yusup 已提交
448 449 450
			if vid, ok = mt.collAlias2ID[collectionName]; !ok {
				return nil, fmt.Errorf("can't find collection: " + collectionName)
			}
451 452 453
		}
		col, ok := mt.collID2Meta[vid]
		if !ok {
S
sunby 已提交
454
			return nil, fmt.Errorf("can't find collection %s with id %d", collectionName, vid)
455 456 457
		}
		colCopy := proto.Clone(&col)
		return colCopy.(*pb.CollectionInfo), nil
N
neza2017 已提交
458
	}
459 460 461
	_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
	if err != nil {
		return nil, err
N
neza2017 已提交
462
	}
463 464
	for _, val := range vals {
		collMeta := pb.CollectionInfo{}
465
		err = proto.Unmarshal([]byte(val), &collMeta)
466 467 468 469 470 471 472 473 474
		if err != nil {
			log.Debug("unmarshal collection info failed", zap.Error(err))
			continue
		}
		if collMeta.Schema.Name == collectionName {
			return &collMeta, nil
		}
	}
	return nil, fmt.Errorf("can't find collection: %s, at timestamp = %d", collectionName, ts)
N
neza2017 已提交
475 476
}

477 478
// ListCollections list all collection names
func (mt *MetaTable) ListCollections(ts typeutil.Timestamp) (map[string]*pb.CollectionInfo, error) {
479 480
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
481
	colls := make(map[string]*pb.CollectionInfo)
482

483
	if ts == 0 {
484 485 486 487
		for collName, collID := range mt.collName2ID {
			coll := mt.collID2Meta[collID]
			colCopy := proto.Clone(&coll)
			colls[collName] = colCopy.(*pb.CollectionInfo)
N
neza2017 已提交
488 489
		}
		return colls, nil
490 491 492
	}
	_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
	if err != nil {
N
neza2017 已提交
493
		log.Debug("load with prefix error", zap.Uint64("timestamp", ts), zap.Error(err))
494
		return nil, nil
495 496 497
	}
	for _, val := range vals {
		collMeta := pb.CollectionInfo{}
498
		err := proto.Unmarshal([]byte(val), &collMeta)
499 500 501
		if err != nil {
			log.Debug("unmarshal collection info failed", zap.Error(err))
		}
502
		colls[collMeta.Schema.Name] = &collMeta
503 504 505 506
	}
	return colls, nil
}

507 508
// ListAliases list all collection aliases
func (mt *MetaTable) ListAliases(collID typeutil.UniqueID) []string {
509 510 511 512 513 514 515 516 517 518 519
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	var aliases []string
	for alias, cid := range mt.collAlias2ID {
		if cid == collID {
			aliases = append(aliases, alias)
		}
	}
	return aliases
}

520 521
// ListCollectionVirtualChannels list virtual channels of all collections
func (mt *MetaTable) ListCollectionVirtualChannels() []string {
522 523 524 525 526 527 528 529 530 531
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	vlist := []string{}

	for _, c := range mt.collID2Meta {
		vlist = append(vlist, c.VirtualChannelNames...)
	}
	return vlist
}

532 533
// ListCollectionPhysicalChannels list physical channels of all collections
func (mt *MetaTable) ListCollectionPhysicalChannels() []string {
534 535 536 537 538 539 540 541 542 543
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	plist := []string{}

	for _, c := range mt.collID2Meta {
		plist = append(plist, c.PhysicalChannelNames...)
	}
	return plist
}

544 545
// AddPartition add partition
func (mt *MetaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
546 547 548 549
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	coll, ok := mt.collID2Meta[collID]
	if !ok {
550
		return fmt.Errorf("can't find collection. id = %d", collID)
551 552 553
	}

	// number of partition tags (except _default) should be limited to 4096 by default
N
neza2017 已提交
554
	if int64(len(coll.PartitionIDs)) >= Params.MaxPartitionNum {
555
		return fmt.Errorf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)
556
	}
557

558
	if len(coll.PartitionIDs) != len(coll.PartitionNames) {
559
		return fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionNames)=%d", len(coll.PartitionIDs), len(coll.PartitionNames))
560 561
	}

562
	if len(coll.PartitionIDs) != len(coll.PartitionCreatedTimestamps) {
563
		return fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionIDs), len(coll.PartitionCreatedTimestamps))
564 565 566
	}

	if len(coll.PartitionNames) != len(coll.PartitionCreatedTimestamps) {
567
		return fmt.Errorf("len(coll.PartitionNames)=%d, len(coll.PartitionCreatedTimestamps)=%d", len(coll.PartitionNames), len(coll.PartitionCreatedTimestamps))
568 569
	}

570 571
	for idx := range coll.PartitionIDs {
		if coll.PartitionIDs[idx] == partitionID {
572
			return fmt.Errorf("partition id = %d already exists", partitionID)
Z
zhenshan.cao 已提交
573
		}
574
		if coll.PartitionNames[idx] == partitionName {
575
			return fmt.Errorf("partition name = %s already exists", partitionName)
576
		}
577
		// no necessary to check created timestamp
578
	}
579
	meta := make(map[string]string)
Z
zhenshan.cao 已提交
580

581
	// save ddOpStr into etcd
N
neza2017 已提交
582
	addition := mt.getAdditionKV(ddOpStr, meta)
583

584 585 586 587 588 589 590
	saveColl := func(ts typeutil.Timestamp) (string, string, error) {
		coll.PartitionIDs = append(coll.PartitionIDs, partitionID)
		coll.PartitionNames = append(coll.PartitionNames, partitionName)
		coll.PartitionCreatedTimestamps = append(coll.PartitionCreatedTimestamps, ts)
		mt.collID2Meta[collID] = coll

		k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
591 592
		v1, _ := proto.Marshal(&coll)
		meta[k1] = string(v1)
593

594
		return k1, string(v1), nil
595 596
	}

597
	err := mt.client.MultiSave(meta, ts, addition, saveColl)
598
	if err != nil {
599 600
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
601
	}
602
	return nil
603 604
}

605 606
// GetPartitionNameByID return partition name by partition id
func (mt *MetaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (string, error) {
607
	if ts == 0 {
608 609
		mt.ddLock.RLock()
		defer mt.ddLock.RUnlock()
610 611
		collMeta, ok := mt.collID2Meta[collID]
		if !ok {
612
			return "", fmt.Errorf("can't find collection id = %d", collID)
613
		}
614 615
		for idx := range collMeta.PartitionIDs {
			if collMeta.PartitionIDs[idx] == partitionID {
616
				return collMeta.PartitionNames[idx], nil
617 618
			}
		}
619
		return "", fmt.Errorf("partition %d does not exist", partitionID)
620 621 622 623
	}
	collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	collVal, err := mt.client.Load(collKey, ts)
	if err != nil {
624
		return "", err
625
	}
626
	collMeta := pb.CollectionInfo{}
627
	err = proto.Unmarshal([]byte(collVal), &collMeta)
628
	if err != nil {
629
		return "", err
630
	}
631 632
	for idx := range collMeta.PartitionIDs {
		if collMeta.PartitionIDs[idx] == partitionID {
633
			return collMeta.PartitionNames[idx], nil
634
		}
635 636 637 638
	}
	return "", fmt.Errorf("partition %d does not exist", partitionID)
}

639
func (mt *MetaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
640 641 642 643
	if ts == 0 {
		collMeta, ok := mt.collID2Meta[collID]
		if !ok {
			return 0, fmt.Errorf("can't find collection id = %d", collID)
644
		}
645
		for idx := range collMeta.PartitionIDs {
646
			if collMeta.PartitionNames[idx] == partitionName {
647 648 649 650 651 652 653 654 655 656 657
				return collMeta.PartitionIDs[idx], nil
			}
		}
		return 0, fmt.Errorf("partition %s does not exist", partitionName)
	}
	collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	collVal, err := mt.client.Load(collKey, ts)
	if err != nil {
		return 0, err
	}
	collMeta := pb.CollectionInfo{}
658
	err = proto.Unmarshal([]byte(collVal), &collMeta)
659 660 661 662
	if err != nil {
		return 0, err
	}
	for idx := range collMeta.PartitionIDs {
663
		if collMeta.PartitionNames[idx] == partitionName {
664
			return collMeta.PartitionIDs[idx], nil
665 666
		}
	}
667
	return 0, fmt.Errorf("partition %s does not exist", partitionName)
668 669
}

670 671
// GetPartitionByName return partition id by partition name
func (mt *MetaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
672 673
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
674
	return mt.getPartitionByName(collID, partitionName, ts)
675 676
}

677 678
// HasPartition check partition existence
func (mt *MetaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool {
679 680
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
681
	_, err := mt.getPartitionByName(collID, partitionName, ts)
682 683 684
	return err == nil
}

685 686
// DeletePartition delete partition
func (mt *MetaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.UniqueID, error) {
687 688 689
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

Z
zhenshan.cao 已提交
690
	if partitionName == Params.DefaultPartitionName {
691
		return 0, fmt.Errorf("default partition cannot be deleted")
692 693 694 695
	}

	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
696
		return 0, fmt.Errorf("can't find collection id = %d", collID)
697 698 699 700 701 702
	}

	// check tag exists
	exist := false

	pd := make([]typeutil.UniqueID, 0, len(collMeta.PartitionIDs))
703
	pn := make([]string, 0, len(collMeta.PartitionNames))
704
	pts := make([]uint64, 0, len(collMeta.PartitionCreatedTimestamps))
705 706
	var partID typeutil.UniqueID
	for idx := range collMeta.PartitionIDs {
707
		if collMeta.PartitionNames[idx] == partitionName {
708 709 710 711
			partID = collMeta.PartitionIDs[idx]
			exist = true
		} else {
			pd = append(pd, collMeta.PartitionIDs[idx])
712
			pn = append(pn, collMeta.PartitionNames[idx])
713
			pts = append(pts, collMeta.PartitionCreatedTimestamps[idx])
714 715 716
		}
	}
	if !exist {
717
		return 0, fmt.Errorf("partition %s does not exist", partitionName)
718
	}
Z
zhenshan.cao 已提交
719
	collMeta.PartitionIDs = pd
720
	collMeta.PartitionNames = pn
721
	collMeta.PartitionCreatedTimestamps = pts
Z
zhenshan.cao 已提交
722
	mt.collID2Meta[collID] = collMeta
723

724 725 726 727
	// update segID2IndexMeta and partID2SegID
	if segIDMap, ok := mt.partID2SegID[partID]; ok {
		for segID := range segIDMap {
			delete(mt.segID2IndexMeta, segID)
728 729
		}
	}
730 731
	delete(mt.partID2SegID, partID)

732 733 734
	k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10))
	v, _ := proto.Marshal(&collMeta)
	meta := map[string]string{k: string(v)}
735
	delMetaKeys := []string{}
N
neza2017 已提交
736
	for _, idxInfo := range collMeta.FieldIndexes {
737
		k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partID)
N
neza2017 已提交
738 739 740
		delMetaKeys = append(delMetaKeys, k)
	}

741
	// save ddOpStr into etcd
N
neza2017 已提交
742
	addition := mt.getAdditionKV(ddOpStr, meta)
743

744
	err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, ts, addition)
745
	if err != nil {
746 747
		log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
748
	}
749
	return partID, nil
750 751
}

752 753
// AddIndex add index
func (mt *MetaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo, ts typeutil.Timestamp) error {
754 755
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
756

757
	collMeta, ok := mt.collID2Meta[segIdxInfo.CollectionID]
758
	if !ok {
759
		return fmt.Errorf("collection id = %d not found", segIdxInfo.CollectionID)
760
	}
761 762 763 764 765 766
	exist := false
	for _, fidx := range collMeta.FieldIndexes {
		if fidx.IndexID == segIdxInfo.IndexID {
			exist = true
			break
		}
767
	}
768
	if !exist {
769
		return fmt.Errorf("index id = %d not found", segIdxInfo.IndexID)
770
	}
771

772 773 774 775
	segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID]
	if !ok {
		idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{segIdxInfo.IndexID: *segIdxInfo}
		mt.segID2IndexMeta[segIdxInfo.SegmentID] = idxMap
776 777 778

		segIDMap := map[typeutil.UniqueID]bool{segIdxInfo.SegmentID: true}
		mt.partID2SegID[segIdxInfo.PartitionID] = segIDMap
779 780 781 782 783
	} else {
		tmpInfo, ok := segIdxMap[segIdxInfo.IndexID]
		if ok {
			if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) {
				if segIdxInfo.BuildID == tmpInfo.BuildID {
784
					log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID))
785
					return nil
786
				}
787
				return fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
788
			}
789 790 791
		}
	}

792
	mt.segID2IndexMeta[segIdxInfo.SegmentID][segIdxInfo.IndexID] = *segIdxInfo
793 794
	mt.partID2SegID[segIdxInfo.PartitionID][segIdxInfo.SegmentID] = true

795
	k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, segIdxInfo.CollectionID, segIdxInfo.IndexID, segIdxInfo.PartitionID, segIdxInfo.SegmentID)
796
	v, _ := proto.Marshal(segIdxInfo)
N
neza2017 已提交
797

798
	err := mt.client.Save(k, string(v), ts)
N
neza2017 已提交
799
	if err != nil {
800 801
		log.Error("SnapShotKV Save fail", zap.Error(err))
		panic("SnapShotKV Save fail")
N
neza2017 已提交
802
	}
803

804
	return nil
N
neza2017 已提交
805 806
}

807 808
// DropIndex drop index
func (mt *MetaTable) DropIndex(collName, fieldName, indexName string, ts typeutil.Timestamp) (typeutil.UniqueID, bool, error) {
N
neza2017 已提交
809 810 811 812 813
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	collID, ok := mt.collName2ID[collName]
	if !ok {
814
		return 0, false, fmt.Errorf("collection name = %s not exist", collName)
N
neza2017 已提交
815 816 817
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
818
		return 0, false, fmt.Errorf("collection name  = %s not has meta", collName)
N
neza2017 已提交
819 820 821
	}
	fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName)
	if err != nil {
822
		return 0, false, err
N
neza2017 已提交
823 824 825 826 827 828 829 830 831 832 833
	}
	fieldIdxInfo := make([]*pb.FieldIndexInfo, 0, len(collMeta.FieldIndexes))
	var dropIdxID typeutil.UniqueID
	for i, info := range collMeta.FieldIndexes {
		if info.FiledID != fieldSch.FieldID {
			fieldIdxInfo = append(fieldIdxInfo, info)
			continue
		}
		idxMeta, ok := mt.indexID2Meta[info.IndexID]
		if !ok {
			fieldIdxInfo = append(fieldIdxInfo, info)
N
neza2017 已提交
834
			log.Warn("index id not has meta", zap.Int64("index id", info.IndexID))
N
neza2017 已提交
835 836 837 838 839 840 841 842 843 844 845
			continue
		}
		if idxMeta.IndexName != indexName {
			fieldIdxInfo = append(fieldIdxInfo, info)
			continue
		}
		dropIdxID = info.IndexID
		fieldIdxInfo = append(fieldIdxInfo, collMeta.FieldIndexes[i+1:]...)
		break
	}
	if len(fieldIdxInfo) == len(collMeta.FieldIndexes) {
N
neza2017 已提交
846
		log.Warn("drop index,index not found", zap.String("collection name", collName), zap.String("filed name", fieldName), zap.String("index name", indexName))
847
		return 0, false, nil
N
neza2017 已提交
848 849 850
	}
	collMeta.FieldIndexes = fieldIdxInfo
	mt.collID2Meta[collID] = collMeta
851 852 853
	k := path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10))
	v, _ := proto.Marshal(&collMeta)
	saveMeta := map[string]string{k: string(v)}
N
neza2017 已提交
854 855 856

	delete(mt.indexID2Meta, dropIdxID)

857 858 859 860 861 862 863
	// update segID2IndexMeta
	for partID := range collMeta.PartitionIDs {
		if segIDMap, ok := mt.partID2SegID[typeutil.UniqueID(partID)]; ok {
			for segID := range segIDMap {
				if segIndexInfos, ok := mt.segID2IndexMeta[segID]; ok {
					delete(segIndexInfos, dropIdxID)
				}
N
neza2017 已提交
864 865 866
			}
		}
	}
867

N
neza2017 已提交
868 869 870 871
	delMeta := []string{
		fmt.Sprintf("%s/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, dropIdxID),
		fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID),
	}
N
neza2017 已提交
872

873
	err = mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta, ts)
N
neza2017 已提交
874
	if err != nil {
875 876
		log.Error("SnapShotKV MultiSaveAndRemoveWithPrefix fail", zap.Error(err))
		panic("SnapShotKV MultiSaveAndRemoveWithPrefix fail")
N
neza2017 已提交
877 878
	}

879
	return dropIdxID, true, nil
N
neza2017 已提交
880 881
}

882 883
// GetSegmentIndexInfoByID return segment index info by segment id
func (mt *MetaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) {
N
neza2017 已提交
884 885 886 887 888
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	segIdxMap, ok := mt.segID2IndexMeta[segID]
	if !ok {
889 890 891 892 893 894 895
		return pb.SegmentIndexInfo{
			SegmentID:   segID,
			FieldID:     filedID,
			IndexID:     0,
			BuildID:     0,
			EnableIndex: false,
		}, nil
N
neza2017 已提交
896
	}
897
	if len(segIdxMap) == 0 {
S
sunby 已提交
898
		return pb.SegmentIndexInfo{}, fmt.Errorf("segment id %d not has any index", segID)
N
neza2017 已提交
899 900
	}

B
bigsheeper 已提交
901
	if filedID == -1 && idxName == "" { // return default index
902
		for _, seg := range segIdxMap {
B
bigsheeper 已提交
903 904 905 906
			info, ok := mt.indexID2Meta[seg.IndexID]
			if ok && info.IndexName == Params.DefaultIndexName {
				return seg, nil
			}
N
neza2017 已提交
907 908
		}
	} else {
909
		for idxID, seg := range segIdxMap {
N
neza2017 已提交
910 911 912 913 914 915 916 917 918 919 920 921
			idxMeta, ok := mt.indexID2Meta[idxID]
			if ok {
				if idxMeta.IndexName != idxName {
					continue
				}
				if seg.FieldID != filedID {
					continue
				}
				return seg, nil
			}
		}
	}
S
sunby 已提交
922
	return pb.SegmentIndexInfo{}, fmt.Errorf("can't find index name = %s on segment = %d, with filed id = %d", idxName, segID, filedID)
N
neza2017 已提交
923 924
}

925 926
// GetFieldSchema return field schema
func (mt *MetaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
N
neza2017 已提交
927 928 929
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

N
neza2017 已提交
930 931 932
	return mt.unlockGetFieldSchema(collName, fieldName)
}

933
func (mt *MetaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
N
neza2017 已提交
934 935
	collID, ok := mt.collName2ID[collName]
	if !ok {
S
sunby 已提交
936
		return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
937 938 939
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
S
sunby 已提交
940
		return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
941 942 943 944 945 946 947
	}

	for _, field := range collMeta.Schema.Fields {
		if field.Name == fieldName {
			return *field, nil
		}
	}
S
sunby 已提交
948
	return schemapb.FieldSchema{}, fmt.Errorf("collection %s doesn't have filed %s", collName, fieldName)
N
neza2017 已提交
949 950
}

951 952
// IsSegmentIndexed check if segment has index
func (mt *MetaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
N
neza2017 已提交
953 954
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
N
neza2017 已提交
955 956 957
	return mt.unlockIsSegmentIndexed(segID, fieldSchema, indexParams)
}

958
func (mt *MetaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
N
neza2017 已提交
959 960 961 962 963
	segIdx, ok := mt.segID2IndexMeta[segID]
	if !ok {
		return false
	}
	exist := false
964
	for idxID, meta := range segIdx {
N
neza2017 已提交
965 966 967 968 969 970 971 972 973 974 975 976 977 978 979
		if meta.FieldID != fieldSchema.FieldID {
			continue
		}
		idxMeta, ok := mt.indexID2Meta[idxID]
		if !ok {
			continue
		}
		if EqualKeyPairArray(indexParams, idxMeta.IndexParams) {
			exist = true
			break
		}
	}
	return exist
}

980 981
// GetNotIndexedSegments return segment ids which have no index
func (mt *MetaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID, ts typeutil.Timestamp) ([]typeutil.UniqueID, schemapb.FieldSchema, error) {
N
neza2017 已提交
982 983 984
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

N
neza2017 已提交
985 986 987
	if idxInfo.IndexParams == nil {
		return nil, schemapb.FieldSchema{}, fmt.Errorf("index param is nil")
	}
N
neza2017 已提交
988 989
	collID, ok := mt.collName2ID[collName]
	if !ok {
S
sunby 已提交
990
		return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
991 992 993
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
S
sunby 已提交
994
		return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
995
	}
N
neza2017 已提交
996
	fieldSchema, err := mt.unlockGetFieldSchema(collName, fieldName)
N
neza2017 已提交
997 998 999 1000
	if err != nil {
		return nil, fieldSchema, err
	}

N
neza2017 已提交
1001 1002
	var dupIdx typeutil.UniqueID = 0
	for _, f := range collMeta.FieldIndexes {
1003 1004 1005 1006
		if info, ok := mt.indexID2Meta[f.IndexID]; ok {
			if info.IndexName == idxInfo.IndexName {
				dupIdx = info.IndexID
				break
N
neza2017 已提交
1007 1008 1009
			}
		}
	}
N
neza2017 已提交
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025

	exist := false
	var existInfo pb.IndexInfo
	for _, f := range collMeta.FieldIndexes {
		if f.FiledID == fieldSchema.FieldID {
			existInfo, ok = mt.indexID2Meta[f.IndexID]
			if !ok {
				return nil, schemapb.FieldSchema{}, fmt.Errorf("index id = %d not found", f.IndexID)
			}
			if EqualKeyPairArray(existInfo.IndexParams, idxInfo.IndexParams) {
				exist = true
				break
			}
		}
	}
	if !exist {
N
neza2017 已提交
1026
		idx := &pb.FieldIndexInfo{
N
neza2017 已提交
1027 1028
			FiledID: fieldSchema.FieldID,
			IndexID: idxInfo.IndexID,
N
neza2017 已提交
1029 1030
		}
		collMeta.FieldIndexes = append(collMeta.FieldIndexes, idx)
N
neza2017 已提交
1031 1032
		mt.collID2Meta[collMeta.ID] = collMeta
		k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(collMeta.ID, 10))
1033
		v1, _ := proto.Marshal(&collMeta)
N
neza2017 已提交
1034

N
neza2017 已提交
1035 1036
		mt.indexID2Meta[idx.IndexID] = *idxInfo
		k2 := path.Join(IndexMetaPrefix, strconv.FormatInt(idx.IndexID, 10))
1037 1038
		v2, _ := proto.Marshal(idxInfo)
		meta := map[string]string{k1: string(v1), k2: string(v2)}
N
neza2017 已提交
1039

N
neza2017 已提交
1040 1041 1042 1043 1044
		if dupIdx != 0 {
			dupInfo := mt.indexID2Meta[dupIdx]
			dupInfo.IndexName = dupInfo.IndexName + "_bak"
			mt.indexID2Meta[dupIdx] = dupInfo
			k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
1045 1046
			v, _ := proto.Marshal(&dupInfo)
			meta[k] = string(v)
N
neza2017 已提交
1047
		}
C
congqixia 已提交
1048
		err = mt.client.MultiSave(meta, ts)
N
neza2017 已提交
1049
		if err != nil {
1050 1051
			log.Error("SnapShotKV MultiSave fail", zap.Error(err))
			panic("SnapShotKV MultiSave fail")
N
neza2017 已提交
1052
		}
N
neza2017 已提交
1053 1054 1055 1056 1057 1058
	} else {
		idxInfo.IndexID = existInfo.IndexID
		if existInfo.IndexName != idxInfo.IndexName { //replace index name
			existInfo.IndexName = idxInfo.IndexName
			mt.indexID2Meta[existInfo.IndexID] = existInfo
			k := path.Join(IndexMetaPrefix, strconv.FormatInt(existInfo.IndexID, 10))
1059 1060
			v, _ := proto.Marshal(&existInfo)
			meta := map[string]string{k: string(v)}
N
neza2017 已提交
1061 1062 1063 1064 1065
			if dupIdx != 0 {
				dupInfo := mt.indexID2Meta[dupIdx]
				dupInfo.IndexName = dupInfo.IndexName + "_bak"
				mt.indexID2Meta[dupIdx] = dupInfo
				k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
1066 1067
				v, _ := proto.Marshal(&dupInfo)
				meta[k] = string(v)
N
neza2017 已提交
1068 1069
			}

C
congqixia 已提交
1070
			err = mt.client.MultiSave(meta, ts)
N
neza2017 已提交
1071
			if err != nil {
1072 1073
				log.Error("SnapShotKV MultiSave fail", zap.Error(err))
				panic("SnapShotKV MultiSave fail")
N
neza2017 已提交
1074 1075
			}
		}
N
neza2017 已提交
1076 1077
	}

N
neza2017 已提交
1078
	rstID := make([]typeutil.UniqueID, 0, 16)
1079 1080 1081
	for _, segID := range segIDs {
		if exist := mt.unlockIsSegmentIndexed(segID, &fieldSchema, idxInfo.IndexParams); !exist {
			rstID = append(rstID, segID)
N
neza2017 已提交
1082 1083 1084 1085 1086
		}
	}
	return rstID, fieldSchema, nil
}

1087 1088
// GetIndexByName return index info by index name
func (mt *MetaTable) GetIndexByName(collName, indexName string) (pb.CollectionInfo, []pb.IndexInfo, error) {
N
neza2017 已提交
1089
	mt.ddLock.RLock()
S
sunby 已提交
1090
	defer mt.ddLock.RUnlock()
N
neza2017 已提交
1091 1092 1093

	collID, ok := mt.collName2ID[collName]
	if !ok {
1094
		return pb.CollectionInfo{}, nil, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
1095 1096 1097
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
1098
		return pb.CollectionInfo{}, nil, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
1099 1100
	}

N
neza2017 已提交
1101
	rstIndex := make([]pb.IndexInfo, 0, len(collMeta.FieldIndexes))
Z
zhenshan.cao 已提交
1102
	for _, idx := range collMeta.FieldIndexes {
1103 1104
		idxInfo, ok := mt.indexID2Meta[idx.IndexID]
		if !ok {
1105
			return pb.CollectionInfo{}, nil, fmt.Errorf("index id = %d not found", idx.IndexID)
1106 1107 1108
		}
		if indexName == "" || idxInfo.IndexName == indexName {
			rstIndex = append(rstIndex, idxInfo)
N
neza2017 已提交
1109 1110
		}
	}
1111
	return collMeta, rstIndex, nil
N
neza2017 已提交
1112
}
B
bigsheeper 已提交
1113

1114 1115
// GetIndexByID return index info by index id
func (mt *MetaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error) {
B
bigsheeper 已提交
1116
	mt.ddLock.RLock()
S
sunby 已提交
1117
	defer mt.ddLock.RUnlock()
B
bigsheeper 已提交
1118 1119 1120

	indexInfo, ok := mt.indexID2Meta[indexID]
	if !ok {
S
sunby 已提交
1121
		return nil, fmt.Errorf("cannot find index, id = %d", indexID)
B
bigsheeper 已提交
1122 1123 1124
	}
	return &indexInfo, nil
}
1125

1126
func (mt *MetaTable) dupMeta() (
1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
	map[typeutil.UniqueID]pb.CollectionInfo,
	map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo,
	map[typeutil.UniqueID]pb.IndexInfo,
) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	collID2Meta := map[typeutil.UniqueID]pb.CollectionInfo{}
	segID2IndexMeta := map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo{}
	indexID2Meta := map[typeutil.UniqueID]pb.IndexInfo{}
	for k, v := range mt.collID2Meta {
		collID2Meta[k] = v
	}
	for k, v := range mt.segID2IndexMeta {
		segID2IndexMeta[k] = map[typeutil.UniqueID]pb.SegmentIndexInfo{}
		for k2, v2 := range v {
			segID2IndexMeta[k][k2] = v2
		}
	}
	for k, v := range mt.indexID2Meta {
		indexID2Meta[k] = v
	}
	return collID2Meta, segID2IndexMeta, indexID2Meta
}
Y
Yusup 已提交
1151

1152 1153
// AddAlias add collection alias
func (mt *MetaTable) AddAlias(collectionAlias string, collectionName string,
Y
Yusup 已提交
1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174
	ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	if _, ok := mt.collAlias2ID[collectionAlias]; ok {
		return fmt.Errorf("duplicate collection alias, alias = %s", collectionAlias)
	}

	if _, ok := mt.collName2ID[collectionAlias]; ok {
		return fmt.Errorf("collection alias collides with existing collection name. collection = %s, alias = %s", collectionAlias, collectionAlias)
	}

	id, ok := mt.collName2ID[collectionName]
	if !ok {
		return fmt.Errorf("aliased collection name does not exist, name = %s", collectionName)
	}
	mt.collAlias2ID[collectionAlias] = id

	meta := make(map[string]string)
	addition := mt.getAdditionKV(ddOpStr, meta)
	saveAlias := func(ts typeutil.Timestamp) (string, string, error) {
		k1 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
1175 1176 1177
		v1, _ := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
		meta[k1] = string(v1)
		return k1, string(v1), nil
Y
Yusup 已提交
1178 1179 1180 1181 1182 1183 1184 1185 1186 1187
	}

	err := mt.client.MultiSave(meta, ts, addition, saveAlias)
	if err != nil {
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
	}
	return nil
}

1188 1189
// DeleteAlias delete collection alias
func (mt *MetaTable) DeleteAlias(collectionAlias string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
Y
Yusup 已提交
1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	if _, ok := mt.collAlias2ID[collectionAlias]; !ok {
		return fmt.Errorf("alias does not exist, alias = %s", collectionAlias)
	}
	delete(mt.collAlias2ID, collectionAlias)

	delMetakeys := []string{
		fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias),
	}
	meta := make(map[string]string)
	addition := mt.getAdditionKV(ddOpStr, meta)
	err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetakeys, ts, addition)
	if err != nil {
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
	}
	return nil
}

1210 1211
// AlterAlias alter collection alias
func (mt *MetaTable) AlterAlias(collectionAlias string, collectionName string, ts typeutil.Timestamp, ddOpStr func(ts typeutil.Timestamp) (string, error)) error {
Y
Yusup 已提交
1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	if _, ok := mt.collAlias2ID[collectionAlias]; !ok {
		return fmt.Errorf("alias does not exist, alias = %s", collectionAlias)
	}

	id, ok := mt.collName2ID[collectionName]
	if !ok {
		return fmt.Errorf("aliased collection name does not exist, name = %s", collectionName)
	}
	mt.collAlias2ID[collectionAlias] = id
	meta := make(map[string]string)
	addition := mt.getAdditionKV(ddOpStr, meta)
	alterAlias := func(ts typeutil.Timestamp) (string, string, error) {
		k1 := fmt.Sprintf("%s/%s", CollectionAliasMetaPrefix, collectionAlias)
1227 1228 1229
		v1, _ := proto.Marshal(&pb.CollectionInfo{ID: id, Schema: &schemapb.CollectionSchema{Name: collectionAlias}})
		meta[k1] = string(v1)
		return k1, string(v1), nil
Y
Yusup 已提交
1230 1231 1232 1233 1234 1235 1236 1237 1238
	}

	err := mt.client.MultiSave(meta, ts, addition, alterAlias)
	if err != nil {
		log.Error("SnapShotKV MultiSave fail", zap.Error(err))
		panic("SnapShotKV MultiSave fail")
	}
	return nil
}