meta_table.go 30.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package rootcoord
13 14

import (
S
sunby 已提交
15
	"fmt"
Z
zhenshan.cao 已提交
16
	"path"
17 18 19 20
	"strconv"
	"sync"

	"github.com/golang/protobuf/proto"
B
bigsheeper 已提交
21 22
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
23 24 25 26 27
	"github.com/milvus-io/milvus/internal/kv"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
28
	"github.com/milvus-io/milvus/internal/util/typeutil"
29 30
)

Z
zhenshan.cao 已提交
31
const (
N
neza2017 已提交
32 33 34 35 36 37
	ComponentPrefix        = "master-service"
	TenantMetaPrefix       = ComponentPrefix + "/tenant"
	ProxyMetaPrefix        = ComponentPrefix + "/proxy"
	CollectionMetaPrefix   = ComponentPrefix + "/collection"
	SegmentIndexMetaPrefix = ComponentPrefix + "/segment-index"
	IndexMetaPrefix        = ComponentPrefix + "/index"
38

39 40
	TimestampPrefix = ComponentPrefix + "/timestamp"

41 42
	DDOperationPrefix = ComponentPrefix + "/dd-operation"
	DDMsgSendPrefix   = ComponentPrefix + "/dd-msg-send"
43

44 45 46 47
	CreateCollectionDDType = "CreateCollection"
	DropCollectionDDType   = "DropCollection"
	CreatePartitionDDType  = "CreatePartition"
	DropPartitionDDType    = "DropPartition"
Z
zhenshan.cao 已提交
48 49
)

50
type metaTable struct {
51 52 53 54 55
	client          kv.SnapShotKV                                                   // client of a reliable kv service, i.e. etcd client
	tenantID2Meta   map[typeutil.UniqueID]pb.TenantMeta                             // tenant id to tenant meta
	proxyID2Meta    map[typeutil.UniqueID]pb.ProxyMeta                              // proxy id to proxy meta
	collID2Meta     map[typeutil.UniqueID]pb.CollectionInfo                         // collection_id -> meta
	collName2ID     map[string]typeutil.UniqueID                                    // collection name to collection id
56
	partID2SegID    map[typeutil.UniqueID]map[typeutil.UniqueID]bool                // partition_id -> segment_id -> bool
57 58
	segID2IndexMeta map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo // collection_id/index_id/partition_id/segment_id -> meta
	indexID2Meta    map[typeutil.UniqueID]pb.IndexInfo                              // collection_id/index_id -> meta
59 60 61 62 63 64

	tenantLock sync.RWMutex
	proxyLock  sync.RWMutex
	ddLock     sync.RWMutex
}

65
func NewMetaTable(kv kv.SnapShotKV) (*metaTable, error) {
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
	mt := &metaTable{
		client:     kv,
		tenantLock: sync.RWMutex{},
		proxyLock:  sync.RWMutex{},
		ddLock:     sync.RWMutex{},
	}
	err := mt.reloadFromKV()
	if err != nil {
		return nil, err
	}
	return mt, nil
}

func (mt *metaTable) reloadFromKV() error {

	mt.tenantID2Meta = make(map[typeutil.UniqueID]pb.TenantMeta)
	mt.proxyID2Meta = make(map[typeutil.UniqueID]pb.ProxyMeta)
Z
zhenshan.cao 已提交
83
	mt.collID2Meta = make(map[typeutil.UniqueID]pb.CollectionInfo)
84
	mt.collName2ID = make(map[string]typeutil.UniqueID)
85
	mt.partID2SegID = make(map[typeutil.UniqueID]map[typeutil.UniqueID]bool)
86
	mt.segID2IndexMeta = make(map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo)
Z
zhenshan.cao 已提交
87
	mt.indexID2Meta = make(map[typeutil.UniqueID]pb.IndexInfo)
88

89
	_, values, err := mt.client.LoadWithPrefix(TenantMetaPrefix, 0)
90 91 92 93 94 95 96 97
	if err != nil {
		return err
	}

	for _, value := range values {
		tenantMeta := pb.TenantMeta{}
		err := proto.UnmarshalText(value, &tenantMeta)
		if err != nil {
C
Cai Yudong 已提交
98
			return fmt.Errorf("RootCoord UnmarshalText pb.TenantMeta err:%w", err)
99 100 101 102
		}
		mt.tenantID2Meta[tenantMeta.ID] = tenantMeta
	}

103
	_, values, err = mt.client.LoadWithPrefix(ProxyMetaPrefix, 0)
104 105 106 107 108 109 110 111
	if err != nil {
		return err
	}

	for _, value := range values {
		proxyMeta := pb.ProxyMeta{}
		err = proto.UnmarshalText(value, &proxyMeta)
		if err != nil {
C
Cai Yudong 已提交
112
			return fmt.Errorf("RootCoord UnmarshalText pb.ProxyMeta err:%w", err)
113 114 115 116
		}
		mt.proxyID2Meta[proxyMeta.ID] = proxyMeta
	}

117
	_, values, err = mt.client.LoadWithPrefix(CollectionMetaPrefix, 0)
118 119 120 121 122
	if err != nil {
		return err
	}

	for _, value := range values {
123 124
		collInfo := pb.CollectionInfo{}
		err = proto.UnmarshalText(value, &collInfo)
125
		if err != nil {
C
Cai Yudong 已提交
126
			return fmt.Errorf("RootCoord UnmarshalText pb.CollectionInfo err:%w", err)
127
		}
128 129
		mt.collID2Meta[collInfo.ID] = collInfo
		mt.collName2ID[collInfo.Schema.Name] = collInfo.ID
130 131
	}

132
	_, values, err = mt.client.LoadWithPrefix(SegmentIndexMetaPrefix, 0)
133 134 135
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
136 137 138
	for _, value := range values {
		segmentIndexInfo := pb.SegmentIndexInfo{}
		err = proto.UnmarshalText(value, &segmentIndexInfo)
139
		if err != nil {
C
Cai Yudong 已提交
140
			return fmt.Errorf("RootCoord UnmarshalText pb.SegmentIndexInfo err:%w", err)
141
		}
142 143 144 145 146 147 148 149 150 151 152 153

		// update partID2SegID
		segIDMap, ok := mt.partID2SegID[segmentIndexInfo.PartitionID]
		if ok {
			segIDMap[segmentIndexInfo.SegmentID] = true
		} else {
			idMap := make(map[typeutil.UniqueID]bool)
			idMap[segmentIndexInfo.SegmentID] = true
			mt.partID2SegID[segmentIndexInfo.PartitionID] = idMap
		}

		// update segID2IndexMeta
Z
zhenshan.cao 已提交
154
		idx, ok := mt.segID2IndexMeta[segmentIndexInfo.SegmentID]
155
		if ok {
156
			idx[segmentIndexInfo.IndexID] = segmentIndexInfo
Z
zhenshan.cao 已提交
157 158 159
		} else {
			meta := make(map[typeutil.UniqueID]pb.SegmentIndexInfo)
			meta[segmentIndexInfo.IndexID] = segmentIndexInfo
160
			mt.segID2IndexMeta[segmentIndexInfo.SegmentID] = meta
161 162 163
		}
	}

164
	_, values, err = mt.client.LoadWithPrefix(IndexMetaPrefix, 0)
Z
zhenshan.cao 已提交
165 166
	if err != nil {
		return err
167
	}
Z
zhenshan.cao 已提交
168 169 170 171
	for _, value := range values {
		meta := pb.IndexInfo{}
		err = proto.UnmarshalText(value, &meta)
		if err != nil {
C
Cai Yudong 已提交
172
			return fmt.Errorf("RootCoord UnmarshalText pb.IndexInfo err:%w", err)
173
		}
Z
zhenshan.cao 已提交
174
		mt.indexID2Meta[meta.IndexID] = meta
175 176
	}

Z
zhenshan.cao 已提交
177
	return nil
178 179
}

N
neza2017 已提交
180 181 182 183 184 185 186 187 188 189 190 191 192 193
func (mt *metaTable) getAdditionKV(op func(ts typeutil.Timestamp) (string, error), meta map[string]string) func(ts typeutil.Timestamp) (string, string, error) {
	if op == nil {
		return nil
	}
	meta[DDMsgSendPrefix] = "false"
	return func(ts typeutil.Timestamp) (string, string, error) {
		val, err := op(ts)
		if err != nil {
			return "", "", err
		}
		return DDOperationPrefix, val, nil
	}
}

194
func (mt *metaTable) AddTenant(te *pb.TenantMeta) (typeutil.Timestamp, error) {
N
neza2017 已提交
195 196 197 198 199 200
	mt.tenantLock.Lock()
	defer mt.tenantLock.Unlock()

	k := fmt.Sprintf("%s/%d", TenantMetaPrefix, te.ID)
	v := proto.MarshalTextString(te)

201 202 203
	ts, err := mt.client.Save(k, v)
	if err != nil {
		return 0, err
N
neza2017 已提交
204 205
	}
	mt.tenantID2Meta[te.ID] = *te
206
	return ts, nil
N
neza2017 已提交
207 208
}

209
func (mt *metaTable) AddProxy(po *pb.ProxyMeta) (typeutil.Timestamp, error) {
N
neza2017 已提交
210 211 212 213 214 215
	mt.proxyLock.Lock()
	defer mt.proxyLock.Unlock()

	k := fmt.Sprintf("%s/%d", ProxyMetaPrefix, po.ID)
	v := proto.MarshalTextString(po)

216 217 218
	ts, err := mt.client.Save(k, v)
	if err != nil {
		return 0, err
N
neza2017 已提交
219 220
	}
	mt.proxyID2Meta[po.ID] = *po
221
	return ts, nil
N
neza2017 已提交
222 223
}

224
func (mt *metaTable) AddCollection(coll *pb.CollectionInfo, idx []*pb.IndexInfo, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
225 226
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
Z
zhenshan.cao 已提交
227

228 229
	if len(coll.PartitionIDs) != len(coll.PartitionNames) {
		return 0, fmt.Errorf("PartitionIDs and PartitionNames' length mis-match when creating collection")
230
	}
231
	if _, ok := mt.collName2ID[coll.Schema.Name]; ok {
232
		return 0, fmt.Errorf("collection %s exist", coll.Schema.Name)
233
	}
N
neza2017 已提交
234
	if len(coll.FieldIndexes) != len(idx) {
235
		return 0, fmt.Errorf("incorrect index id when creating collection")
N
neza2017 已提交
236
	}
237

Z
zhenshan.cao 已提交
238 239
	mt.collID2Meta[coll.ID] = *coll
	mt.collName2ID[coll.Schema.Name] = coll.ID
N
neza2017 已提交
240 241 242
	for _, i := range idx {
		mt.indexID2Meta[i.IndexID] = *i
	}
Z
zhenshan.cao 已提交
243

N
neza2017 已提交
244
	k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, coll.ID)
Z
zhenshan.cao 已提交
245
	v1 := proto.MarshalTextString(coll)
246
	meta := map[string]string{k1: v1}
Z
zhenshan.cao 已提交
247

N
neza2017 已提交
248
	for _, i := range idx {
N
neza2017 已提交
249
		k := fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, coll.ID, i.IndexID)
N
neza2017 已提交
250 251 252 253
		v := proto.MarshalTextString(i)
		meta[k] = v
	}

254
	// save ddOpStr into etcd
N
neza2017 已提交
255 256
	addition := mt.getAdditionKV(ddOpStr, meta)
	ts, err := mt.client.MultiSave(meta, addition)
257 258
	if err != nil {
		_ = mt.reloadFromKV()
259
		return 0, err
260
	}
261

262
	return ts, nil
263 264
}

265
func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
266 267 268 269 270
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
271
		return 0, fmt.Errorf("can't find collection. id = %d", collID)
272 273
	}

Z
zhenshan.cao 已提交
274 275
	delete(mt.collID2Meta, collID)
	delete(mt.collName2ID, collMeta.Schema.Name)
276 277 278 279 280 281 282

	// update segID2IndexMeta
	for partID := range collMeta.PartitionIDs {
		if segIDMap, ok := mt.partID2SegID[typeutil.UniqueID(partID)]; ok {
			for segID := range segIDMap {
				delete(mt.segID2IndexMeta, segID)
			}
Z
zhenshan.cao 已提交
283 284
		}
	}
285 286 287 288 289 290

	// update partID2SegID
	for partID := range collMeta.PartitionIDs {
		delete(mt.partID2SegID, typeutil.UniqueID(partID))
	}

N
neza2017 已提交
291 292 293 294 295 296 297 298
	for _, idxInfo := range collMeta.FieldIndexes {
		_, ok := mt.indexID2Meta[idxInfo.IndexID]
		if !ok {
			log.Warn("index id not exist", zap.Int64("index id", idxInfo.IndexID))
			continue
		}
		delete(mt.indexID2Meta, idxInfo.IndexID)
	}
299

300
	delMetakeys := []string{
N
neza2017 已提交
301 302 303 304
		fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID),
		fmt.Sprintf("%s/%d", SegmentIndexMetaPrefix, collID),
		fmt.Sprintf("%s/%d", IndexMetaPrefix, collID),
	}
305

306
	// save ddOpStr into etcd
N
neza2017 已提交
307 308 309
	var saveMeta = map[string]string{}
	addition := mt.getAdditionKV(ddOpStr, saveMeta)
	ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMetakeys, addition)
310 311
	if err != nil {
		_ = mt.reloadFromKV()
312
		return 0, err
313 314
	}

315
	return ts, nil
316 317
}

318
func (mt *metaTable) HasCollection(collID typeutil.UniqueID, ts typeutil.Timestamp) bool {
319 320
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
321 322 323 324 325 326 327
	if ts == 0 {
		_, ok := mt.collID2Meta[collID]
		return ok
	}
	key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	_, err := mt.client.Load(key, ts)
	return err == nil
328 329
}

330
func (mt *metaTable) GetCollectionByID(collectionID typeutil.UniqueID, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
N
neza2017 已提交
331 332 333
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

334 335 336 337 338 339 340
	if ts == 0 {
		col, ok := mt.collID2Meta[collectionID]
		if !ok {
			return nil, fmt.Errorf("can't find collection id : %d", collectionID)
		}
		colCopy := proto.Clone(&col)
		return colCopy.(*pb.CollectionInfo), nil
N
neza2017 已提交
341
	}
342 343 344 345 346 347 348 349 350 351 352
	key := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collectionID)
	val, err := mt.client.Load(key, ts)
	if err != nil {
		return nil, err
	}
	colMeta := pb.CollectionInfo{}
	err = proto.UnmarshalText(val, &colMeta)
	if err != nil {
		return nil, err
	}
	return &colMeta, nil
N
neza2017 已提交
353 354
}

355
func (mt *metaTable) GetCollectionByName(collectionName string, ts typeutil.Timestamp) (*pb.CollectionInfo, error) {
356 357 358
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

359 360 361 362 363 364 365 366 367 368 369
	if ts == 0 {
		vid, ok := mt.collName2ID[collectionName]
		if !ok {
			return nil, fmt.Errorf("can't find collection: " + collectionName)
		}
		col, ok := mt.collID2Meta[vid]
		if !ok {
			return nil, fmt.Errorf("can't find collection: " + collectionName)
		}
		colCopy := proto.Clone(&col)
		return colCopy.(*pb.CollectionInfo), nil
N
neza2017 已提交
370
	}
371 372 373
	_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
	if err != nil {
		return nil, err
N
neza2017 已提交
374
	}
375 376 377 378 379 380 381 382 383 384 385 386
	for _, val := range vals {
		collMeta := pb.CollectionInfo{}
		err = proto.UnmarshalText(val, &collMeta)
		if err != nil {
			log.Debug("unmarshal collection info failed", zap.Error(err))
			continue
		}
		if collMeta.Schema.Name == collectionName {
			return &collMeta, nil
		}
	}
	return nil, fmt.Errorf("can't find collection: %s, at timestamp = %d", collectionName, ts)
N
neza2017 已提交
387 388
}

389
func (mt *metaTable) ListCollections(ts typeutil.Timestamp) (map[string]typeutil.UniqueID, error) {
390 391
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
N
neza2017 已提交
392
	colls := make(map[string]typeutil.UniqueID)
393

394
	if ts == 0 {
N
neza2017 已提交
395 396 397 398
		for k, v := range mt.collName2ID {
			colls[k] = v
		}
		return colls, nil
399 400 401
	}
	_, vals, err := mt.client.LoadWithPrefix(CollectionMetaPrefix, ts)
	if err != nil {
N
neza2017 已提交
402
		log.Debug("load with prefix error", zap.Uint64("timestamp", ts), zap.Error(err))
403
		return nil, nil
404 405 406 407 408 409 410
	}
	for _, val := range vals {
		collMeta := pb.CollectionInfo{}
		err := proto.UnmarshalText(val, &collMeta)
		if err != nil {
			log.Debug("unmarshal collection info failed", zap.Error(err))
		}
411
		colls[collMeta.Schema.Name] = collMeta.ID
412 413 414 415
	}
	return colls, nil
}

416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
// ListCollectionVirtualChannels list virtual channel of all the collection
func (mt *metaTable) ListCollectionVirtualChannels() []string {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	vlist := []string{}

	for _, c := range mt.collID2Meta {
		vlist = append(vlist, c.VirtualChannelNames...)
	}
	return vlist
}

// ListCollectionPhysicalChannels list physical channel of all the collection
func (mt *metaTable) ListCollectionPhysicalChannels() []string {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
	plist := []string{}

	for _, c := range mt.collID2Meta {
		plist = append(plist, c.PhysicalChannelNames...)
	}
	return plist
}

N
neza2017 已提交
440
func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string, partitionID typeutil.UniqueID, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, error) {
441 442 443 444
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
	coll, ok := mt.collID2Meta[collID]
	if !ok {
445
		return 0, fmt.Errorf("can't find collection. id = %d", collID)
446 447 448
	}

	// number of partition tags (except _default) should be limited to 4096 by default
N
neza2017 已提交
449
	if int64(len(coll.PartitionIDs)) >= Params.MaxPartitionNum {
450
		return 0, fmt.Errorf("maximum partition's number should be limit to %d", Params.MaxPartitionNum)
451
	}
452

453 454
	if len(coll.PartitionIDs) != len(coll.PartitionNames) {
		return 0, fmt.Errorf("len(coll.PartitionIDs)=%d, len(coll.PartitionNames)=%d", len(coll.PartitionIDs), len(coll.PartitionNames))
455 456 457 458 459
	}

	for idx := range coll.PartitionIDs {
		if coll.PartitionIDs[idx] == partitionID {
			return 0, fmt.Errorf("partition id = %d already exists", partitionID)
Z
zhenshan.cao 已提交
460
		}
461
		if coll.PartitionNames[idx] == partitionName {
462
			return 0, fmt.Errorf("partition name = %s already exists", partitionName)
463
		}
464

465 466
	}
	coll.PartitionIDs = append(coll.PartitionIDs, partitionID)
467
	coll.PartitionNames = append(coll.PartitionNames, partitionName)
Z
zhenshan.cao 已提交
468 469
	mt.collID2Meta[collID] = coll

N
neza2017 已提交
470
	k1 := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
Z
zhenshan.cao 已提交
471
	v1 := proto.MarshalTextString(&coll)
472
	meta := map[string]string{k1: v1}
Z
zhenshan.cao 已提交
473

474
	// save ddOpStr into etcd
N
neza2017 已提交
475
	addition := mt.getAdditionKV(ddOpStr, meta)
476

N
neza2017 已提交
477
	ts, err := mt.client.MultiSave(meta, addition)
478 479
	if err != nil {
		_ = mt.reloadFromKV()
480
		return 0, err
481
	}
482
	return ts, nil
483 484
}

485
func (mt *metaTable) GetPartitionNameByID(collID, partitionID typeutil.UniqueID, ts typeutil.Timestamp) (string, error) {
486
	if ts == 0 {
487 488
		mt.ddLock.RLock()
		defer mt.ddLock.RUnlock()
489 490
		collMeta, ok := mt.collID2Meta[collID]
		if !ok {
491
			return "", fmt.Errorf("can't find collection id = %d", collID)
492
		}
493 494
		for idx := range collMeta.PartitionIDs {
			if collMeta.PartitionIDs[idx] == partitionID {
495
				return collMeta.PartitionNames[idx], nil
496 497
			}
		}
498
		return "", fmt.Errorf("partition %d does not exist", partitionID)
499 500 501 502
	}
	collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	collVal, err := mt.client.Load(collKey, ts)
	if err != nil {
503
		return "", err
504
	}
505
	collMeta := pb.CollectionInfo{}
506 507
	err = proto.UnmarshalText(collVal, &collMeta)
	if err != nil {
508
		return "", err
509
	}
510 511
	for idx := range collMeta.PartitionIDs {
		if collMeta.PartitionIDs[idx] == partitionID {
512
			return collMeta.PartitionNames[idx], nil
513
		}
514 515 516 517 518 519 520 521 522
	}
	return "", fmt.Errorf("partition %d does not exist", partitionID)
}

func (mt *metaTable) getPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
	if ts == 0 {
		collMeta, ok := mt.collID2Meta[collID]
		if !ok {
			return 0, fmt.Errorf("can't find collection id = %d", collID)
523
		}
524
		for idx := range collMeta.PartitionIDs {
525
			if collMeta.PartitionNames[idx] == partitionName {
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541
				return collMeta.PartitionIDs[idx], nil
			}
		}
		return 0, fmt.Errorf("partition %s does not exist", partitionName)
	}
	collKey := fmt.Sprintf("%s/%d", CollectionMetaPrefix, collID)
	collVal, err := mt.client.Load(collKey, ts)
	if err != nil {
		return 0, err
	}
	collMeta := pb.CollectionInfo{}
	err = proto.UnmarshalText(collVal, &collMeta)
	if err != nil {
		return 0, err
	}
	for idx := range collMeta.PartitionIDs {
542
		if collMeta.PartitionNames[idx] == partitionName {
543
			return collMeta.PartitionIDs[idx], nil
544 545
		}
	}
546
	return 0, fmt.Errorf("partition %s does not exist", partitionName)
547 548
}

549
func (mt *metaTable) GetPartitionByName(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) (typeutil.UniqueID, error) {
550 551
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
552
	return mt.getPartitionByName(collID, partitionName, ts)
553 554
}

555
func (mt *metaTable) HasPartition(collID typeutil.UniqueID, partitionName string, ts typeutil.Timestamp) bool {
556 557
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
558
	_, err := mt.getPartitionByName(collID, partitionName, ts)
559 560 561
	return err == nil
}

562
//return timestamp, partitionid, error
563
func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName string, ddOpStr func(ts typeutil.Timestamp) (string, error)) (typeutil.Timestamp, typeutil.UniqueID, error) {
564 565 566
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

Z
zhenshan.cao 已提交
567
	if partitionName == Params.DefaultPartitionName {
568
		return 0, 0, fmt.Errorf("default partition cannot be deleted")
569 570 571 572
	}

	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
573
		return 0, 0, fmt.Errorf("can't find collection id = %d", collID)
574 575 576 577 578 579
	}

	// check tag exists
	exist := false

	pd := make([]typeutil.UniqueID, 0, len(collMeta.PartitionIDs))
580
	pn := make([]string, 0, len(collMeta.PartitionNames))
581 582
	var partID typeutil.UniqueID
	for idx := range collMeta.PartitionIDs {
583
		if collMeta.PartitionNames[idx] == partitionName {
584 585 586 587
			partID = collMeta.PartitionIDs[idx]
			exist = true
		} else {
			pd = append(pd, collMeta.PartitionIDs[idx])
588
			pn = append(pn, collMeta.PartitionNames[idx])
589 590 591
		}
	}
	if !exist {
592
		return 0, 0, fmt.Errorf("partition %s does not exist", partitionName)
593
	}
Z
zhenshan.cao 已提交
594
	collMeta.PartitionIDs = pd
595
	collMeta.PartitionNames = pn
Z
zhenshan.cao 已提交
596
	mt.collID2Meta[collID] = collMeta
597

598 599 600 601
	// update segID2IndexMeta and partID2SegID
	if segIDMap, ok := mt.partID2SegID[partID]; ok {
		for segID := range segIDMap {
			delete(mt.segID2IndexMeta, segID)
602 603
		}
	}
604 605
	delete(mt.partID2SegID, partID)

606
	meta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)}
607
	delMetaKeys := []string{}
N
neza2017 已提交
608
	for _, idxInfo := range collMeta.FieldIndexes {
609
		k := fmt.Sprintf("%s/%d/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, idxInfo.IndexID, partID)
N
neza2017 已提交
610 611 612
		delMetaKeys = append(delMetaKeys, k)
	}

613
	// save ddOpStr into etcd
N
neza2017 已提交
614
	addition := mt.getAdditionKV(ddOpStr, meta)
615

N
neza2017 已提交
616
	ts, err := mt.client.MultiSaveAndRemoveWithPrefix(meta, delMetaKeys, addition)
617 618
	if err != nil {
		_ = mt.reloadFromKV()
619
		return 0, 0, err
620
	}
621
	return ts, partID, nil
622 623
}

624
func (mt *metaTable) AddIndex(segIdxInfo *pb.SegmentIndexInfo) (typeutil.Timestamp, error) {
625 626
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()
627

628
	collMeta, ok := mt.collID2Meta[segIdxInfo.CollectionID]
629
	if !ok {
630
		return 0, fmt.Errorf("collection id = %d not found", segIdxInfo.CollectionID)
631
	}
632 633 634 635 636 637
	exist := false
	for _, fidx := range collMeta.FieldIndexes {
		if fidx.IndexID == segIdxInfo.IndexID {
			exist = true
			break
		}
638
	}
639 640
	if !exist {
		return 0, fmt.Errorf("index id = %d not found", segIdxInfo.IndexID)
641
	}
642

643 644 645 646
	segIdxMap, ok := mt.segID2IndexMeta[segIdxInfo.SegmentID]
	if !ok {
		idxMap := map[typeutil.UniqueID]pb.SegmentIndexInfo{segIdxInfo.IndexID: *segIdxInfo}
		mt.segID2IndexMeta[segIdxInfo.SegmentID] = idxMap
647 648 649

		segIDMap := map[typeutil.UniqueID]bool{segIdxInfo.SegmentID: true}
		mt.partID2SegID[segIdxInfo.PartitionID] = segIDMap
650 651 652 653 654
	} else {
		tmpInfo, ok := segIdxMap[segIdxInfo.IndexID]
		if ok {
			if SegmentIndexInfoEqual(segIdxInfo, &tmpInfo) {
				if segIdxInfo.BuildID == tmpInfo.BuildID {
655
					log.Debug("Identical SegmentIndexInfo already exist", zap.Int64("IndexID", segIdxInfo.IndexID))
656
					return 0, nil
657 658
				}
				return 0, fmt.Errorf("index id = %d exist", segIdxInfo.IndexID)
659
			}
660 661 662
		}
	}

663
	mt.segID2IndexMeta[segIdxInfo.SegmentID][segIdxInfo.IndexID] = *segIdxInfo
664 665
	mt.partID2SegID[segIdxInfo.PartitionID][segIdxInfo.SegmentID] = true

666
	k := fmt.Sprintf("%s/%d/%d/%d/%d", SegmentIndexMetaPrefix, segIdxInfo.CollectionID, segIdxInfo.IndexID, segIdxInfo.PartitionID, segIdxInfo.SegmentID)
667
	v := proto.MarshalTextString(segIdxInfo)
N
neza2017 已提交
668

669
	ts, err := mt.client.Save(k, v)
N
neza2017 已提交
670 671
	if err != nil {
		_ = mt.reloadFromKV()
672
		return 0, err
N
neza2017 已提交
673
	}
674

675
	return ts, nil
N
neza2017 已提交
676 677
}

678
//return timestamp, index id, is dropped, error
679
func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil.Timestamp, typeutil.UniqueID, bool, error) {
N
neza2017 已提交
680 681 682 683 684
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

	collID, ok := mt.collName2ID[collName]
	if !ok {
685
		return 0, 0, false, fmt.Errorf("collection name = %s not exist", collName)
N
neza2017 已提交
686 687 688
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
689
		return 0, 0, false, fmt.Errorf("collection name  = %s not has meta", collName)
N
neza2017 已提交
690 691 692
	}
	fieldSch, err := mt.unlockGetFieldSchema(collName, fieldName)
	if err != nil {
693
		return 0, 0, false, err
N
neza2017 已提交
694 695 696 697 698 699 700 701 702 703 704
	}
	fieldIdxInfo := make([]*pb.FieldIndexInfo, 0, len(collMeta.FieldIndexes))
	var dropIdxID typeutil.UniqueID
	for i, info := range collMeta.FieldIndexes {
		if info.FiledID != fieldSch.FieldID {
			fieldIdxInfo = append(fieldIdxInfo, info)
			continue
		}
		idxMeta, ok := mt.indexID2Meta[info.IndexID]
		if !ok {
			fieldIdxInfo = append(fieldIdxInfo, info)
N
neza2017 已提交
705
			log.Warn("index id not has meta", zap.Int64("index id", info.IndexID))
N
neza2017 已提交
706 707 708 709 710 711 712 713 714 715 716
			continue
		}
		if idxMeta.IndexName != indexName {
			fieldIdxInfo = append(fieldIdxInfo, info)
			continue
		}
		dropIdxID = info.IndexID
		fieldIdxInfo = append(fieldIdxInfo, collMeta.FieldIndexes[i+1:]...)
		break
	}
	if len(fieldIdxInfo) == len(collMeta.FieldIndexes) {
N
neza2017 已提交
717
		log.Warn("drop index,index not found", zap.String("collection name", collName), zap.String("filed name", fieldName), zap.String("index name", indexName))
718
		return 0, 0, false, nil
N
neza2017 已提交
719 720 721 722 723 724 725
	}
	collMeta.FieldIndexes = fieldIdxInfo
	mt.collID2Meta[collID] = collMeta
	saveMeta := map[string]string{path.Join(CollectionMetaPrefix, strconv.FormatInt(collID, 10)): proto.MarshalTextString(&collMeta)}

	delete(mt.indexID2Meta, dropIdxID)

726 727 728 729 730 731 732
	// update segID2IndexMeta
	for partID := range collMeta.PartitionIDs {
		if segIDMap, ok := mt.partID2SegID[typeutil.UniqueID(partID)]; ok {
			for segID := range segIDMap {
				if segIndexInfos, ok := mt.segID2IndexMeta[segID]; ok {
					delete(segIndexInfos, dropIdxID)
				}
N
neza2017 已提交
733 734 735
			}
		}
	}
736

N
neza2017 已提交
737 738 739 740
	delMeta := []string{
		fmt.Sprintf("%s/%d/%d", SegmentIndexMetaPrefix, collMeta.ID, dropIdxID),
		fmt.Sprintf("%s/%d/%d", IndexMetaPrefix, collMeta.ID, dropIdxID),
	}
N
neza2017 已提交
741

N
neza2017 已提交
742
	ts, err := mt.client.MultiSaveAndRemoveWithPrefix(saveMeta, delMeta, nil)
N
neza2017 已提交
743 744
	if err != nil {
		_ = mt.reloadFromKV()
745
		return 0, 0, false, err
N
neza2017 已提交
746 747
	}

748
	return ts, dropIdxID, true, nil
N
neza2017 已提交
749 750
}

N
neza2017 已提交
751 752 753 754 755 756
func (mt *metaTable) GetSegmentIndexInfoByID(segID typeutil.UniqueID, filedID int64, idxName string) (pb.SegmentIndexInfo, error) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	segIdxMap, ok := mt.segID2IndexMeta[segID]
	if !ok {
757 758 759 760 761 762 763
		return pb.SegmentIndexInfo{
			SegmentID:   segID,
			FieldID:     filedID,
			IndexID:     0,
			BuildID:     0,
			EnableIndex: false,
		}, nil
N
neza2017 已提交
764
	}
765
	if len(segIdxMap) == 0 {
S
sunby 已提交
766
		return pb.SegmentIndexInfo{}, fmt.Errorf("segment id %d not has any index", segID)
N
neza2017 已提交
767 768
	}

B
bigsheeper 已提交
769
	if filedID == -1 && idxName == "" { // return default index
770
		for _, seg := range segIdxMap {
B
bigsheeper 已提交
771 772 773 774
			info, ok := mt.indexID2Meta[seg.IndexID]
			if ok && info.IndexName == Params.DefaultIndexName {
				return seg, nil
			}
N
neza2017 已提交
775 776
		}
	} else {
777
		for idxID, seg := range segIdxMap {
N
neza2017 已提交
778 779 780 781 782 783 784 785 786 787 788 789
			idxMeta, ok := mt.indexID2Meta[idxID]
			if ok {
				if idxMeta.IndexName != idxName {
					continue
				}
				if seg.FieldID != filedID {
					continue
				}
				return seg, nil
			}
		}
	}
S
sunby 已提交
790
	return pb.SegmentIndexInfo{}, fmt.Errorf("can't find index name = %s on segment = %d, with filed id = %d", idxName, segID, filedID)
N
neza2017 已提交
791 792 793 794 795 796
}

func (mt *metaTable) GetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

N
neza2017 已提交
797 798 799 800
	return mt.unlockGetFieldSchema(collName, fieldName)
}

func (mt *metaTable) unlockGetFieldSchema(collName string, fieldName string) (schemapb.FieldSchema, error) {
N
neza2017 已提交
801 802
	collID, ok := mt.collName2ID[collName]
	if !ok {
S
sunby 已提交
803
		return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
804 805 806
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
S
sunby 已提交
807
		return schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
808 809 810 811 812 813 814
	}

	for _, field := range collMeta.Schema.Fields {
		if field.Name == fieldName {
			return *field, nil
		}
	}
S
sunby 已提交
815
	return schemapb.FieldSchema{}, fmt.Errorf("collection %s doesn't have filed %s", collName, fieldName)
N
neza2017 已提交
816 817 818 819 820 821
}

//return true/false
func (mt *metaTable) IsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()
N
neza2017 已提交
822 823 824 825
	return mt.unlockIsSegmentIndexed(segID, fieldSchema, indexParams)
}

func (mt *metaTable) unlockIsSegmentIndexed(segID typeutil.UniqueID, fieldSchema *schemapb.FieldSchema, indexParams []*commonpb.KeyValuePair) bool {
N
neza2017 已提交
826 827 828 829 830
	segIdx, ok := mt.segID2IndexMeta[segID]
	if !ok {
		return false
	}
	exist := false
831
	for idxID, meta := range segIdx {
N
neza2017 已提交
832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847
		if meta.FieldID != fieldSchema.FieldID {
			continue
		}
		idxMeta, ok := mt.indexID2Meta[idxID]
		if !ok {
			continue
		}
		if EqualKeyPairArray(indexParams, idxMeta.IndexParams) {
			exist = true
			break
		}
	}
	return exist
}

// return segment ids, type params, error
848
func (mt *metaTable) GetNotIndexedSegments(collName string, fieldName string, idxInfo *pb.IndexInfo, segIDs []typeutil.UniqueID) ([]typeutil.UniqueID, schemapb.FieldSchema, error) {
N
neza2017 已提交
849 850 851
	mt.ddLock.Lock()
	defer mt.ddLock.Unlock()

N
neza2017 已提交
852 853 854
	if idxInfo.IndexParams == nil {
		return nil, schemapb.FieldSchema{}, fmt.Errorf("index param is nil")
	}
N
neza2017 已提交
855 856
	collID, ok := mt.collName2ID[collName]
	if !ok {
S
sunby 已提交
857
		return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
858 859 860
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
S
sunby 已提交
861
		return nil, schemapb.FieldSchema{}, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
862
	}
N
neza2017 已提交
863
	fieldSchema, err := mt.unlockGetFieldSchema(collName, fieldName)
N
neza2017 已提交
864 865 866 867
	if err != nil {
		return nil, fieldSchema, err
	}

N
neza2017 已提交
868 869
	var dupIdx typeutil.UniqueID = 0
	for _, f := range collMeta.FieldIndexes {
870 871 872 873
		if info, ok := mt.indexID2Meta[f.IndexID]; ok {
			if info.IndexName == idxInfo.IndexName {
				dupIdx = info.IndexID
				break
N
neza2017 已提交
874 875 876
			}
		}
	}
N
neza2017 已提交
877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892

	exist := false
	var existInfo pb.IndexInfo
	for _, f := range collMeta.FieldIndexes {
		if f.FiledID == fieldSchema.FieldID {
			existInfo, ok = mt.indexID2Meta[f.IndexID]
			if !ok {
				return nil, schemapb.FieldSchema{}, fmt.Errorf("index id = %d not found", f.IndexID)
			}
			if EqualKeyPairArray(existInfo.IndexParams, idxInfo.IndexParams) {
				exist = true
				break
			}
		}
	}
	if !exist {
N
neza2017 已提交
893
		idx := &pb.FieldIndexInfo{
N
neza2017 已提交
894 895
			FiledID: fieldSchema.FieldID,
			IndexID: idxInfo.IndexID,
N
neza2017 已提交
896 897
		}
		collMeta.FieldIndexes = append(collMeta.FieldIndexes, idx)
N
neza2017 已提交
898 899 900 901
		mt.collID2Meta[collMeta.ID] = collMeta
		k1 := path.Join(CollectionMetaPrefix, strconv.FormatInt(collMeta.ID, 10))
		v1 := proto.MarshalTextString(&collMeta)

N
neza2017 已提交
902 903
		mt.indexID2Meta[idx.IndexID] = *idxInfo
		k2 := path.Join(IndexMetaPrefix, strconv.FormatInt(idx.IndexID, 10))
Z
zhenshan.cao 已提交
904
		v2 := proto.MarshalTextString(idxInfo)
N
neza2017 已提交
905 906
		meta := map[string]string{k1: v1, k2: v2}

N
neza2017 已提交
907 908 909 910 911 912 913 914 915
		if dupIdx != 0 {
			dupInfo := mt.indexID2Meta[dupIdx]
			dupInfo.IndexName = dupInfo.IndexName + "_bak"
			mt.indexID2Meta[dupIdx] = dupInfo
			k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
			v := proto.MarshalTextString(&dupInfo)
			meta[k] = v
		}

N
neza2017 已提交
916
		_, err = mt.client.MultiSave(meta, nil)
N
neza2017 已提交
917 918 919 920 921
		if err != nil {
			_ = mt.reloadFromKV()
			return nil, schemapb.FieldSchema{}, err
		}

N
neza2017 已提交
922 923 924 925 926 927 928
	} else {
		idxInfo.IndexID = existInfo.IndexID
		if existInfo.IndexName != idxInfo.IndexName { //replace index name
			existInfo.IndexName = idxInfo.IndexName
			mt.indexID2Meta[existInfo.IndexID] = existInfo
			k := path.Join(IndexMetaPrefix, strconv.FormatInt(existInfo.IndexID, 10))
			v := proto.MarshalTextString(&existInfo)
N
neza2017 已提交
929 930 931 932 933 934 935 936 937 938
			meta := map[string]string{k: v}
			if dupIdx != 0 {
				dupInfo := mt.indexID2Meta[dupIdx]
				dupInfo.IndexName = dupInfo.IndexName + "_bak"
				mt.indexID2Meta[dupIdx] = dupInfo
				k := path.Join(IndexMetaPrefix, strconv.FormatInt(dupInfo.IndexID, 10))
				v := proto.MarshalTextString(&dupInfo)
				meta[k] = v
			}

N
neza2017 已提交
939
			_, err = mt.client.MultiSave(meta, nil)
N
neza2017 已提交
940 941 942 943 944
			if err != nil {
				_ = mt.reloadFromKV()
				return nil, schemapb.FieldSchema{}, err
			}
		}
N
neza2017 已提交
945 946
	}

N
neza2017 已提交
947
	rstID := make([]typeutil.UniqueID, 0, 16)
948 949 950
	for _, segID := range segIDs {
		if exist := mt.unlockIsSegmentIndexed(segID, &fieldSchema, idxInfo.IndexParams); !exist {
			rstID = append(rstID, segID)
N
neza2017 已提交
951 952 953 954 955
		}
	}
	return rstID, fieldSchema, nil
}

956
func (mt *metaTable) GetIndexByName(collName, indexName string) (pb.CollectionInfo, []pb.IndexInfo, error) {
N
neza2017 已提交
957
	mt.ddLock.RLock()
S
sunby 已提交
958
	defer mt.ddLock.RUnlock()
N
neza2017 已提交
959 960 961

	collID, ok := mt.collName2ID[collName]
	if !ok {
962
		return pb.CollectionInfo{}, nil, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
963 964 965
	}
	collMeta, ok := mt.collID2Meta[collID]
	if !ok {
966
		return pb.CollectionInfo{}, nil, fmt.Errorf("collection %s not found", collName)
N
neza2017 已提交
967 968
	}

N
neza2017 已提交
969
	rstIndex := make([]pb.IndexInfo, 0, len(collMeta.FieldIndexes))
Z
zhenshan.cao 已提交
970
	for _, idx := range collMeta.FieldIndexes {
971 972
		idxInfo, ok := mt.indexID2Meta[idx.IndexID]
		if !ok {
973
			return pb.CollectionInfo{}, nil, fmt.Errorf("index id = %d not found", idx.IndexID)
974 975 976
		}
		if indexName == "" || idxInfo.IndexName == indexName {
			rstIndex = append(rstIndex, idxInfo)
N
neza2017 已提交
977 978
		}
	}
979
	return collMeta, rstIndex, nil
N
neza2017 已提交
980
}
B
bigsheeper 已提交
981 982 983

func (mt *metaTable) GetIndexByID(indexID typeutil.UniqueID) (*pb.IndexInfo, error) {
	mt.ddLock.RLock()
S
sunby 已提交
984
	defer mt.ddLock.RUnlock()
B
bigsheeper 已提交
985 986 987

	indexInfo, ok := mt.indexID2Meta[indexID]
	if !ok {
S
sunby 已提交
988
		return nil, fmt.Errorf("cannot find index, id = %d", indexID)
B
bigsheeper 已提交
989 990 991
	}
	return &indexInfo, nil
}
992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017

func (mt *metaTable) dupMeta() (
	map[typeutil.UniqueID]pb.CollectionInfo,
	map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo,
	map[typeutil.UniqueID]pb.IndexInfo,
) {
	mt.ddLock.RLock()
	defer mt.ddLock.RUnlock()

	collID2Meta := map[typeutil.UniqueID]pb.CollectionInfo{}
	segID2IndexMeta := map[typeutil.UniqueID]map[typeutil.UniqueID]pb.SegmentIndexInfo{}
	indexID2Meta := map[typeutil.UniqueID]pb.IndexInfo{}
	for k, v := range mt.collID2Meta {
		collID2Meta[k] = v
	}
	for k, v := range mt.segID2IndexMeta {
		segID2IndexMeta[k] = map[typeutil.UniqueID]pb.SegmentIndexInfo{}
		for k2, v2 := range v {
			segID2IndexMeta[k][k2] = v2
		}
	}
	for k, v := range mt.indexID2Meta {
		indexID2Meta[k] = v
	}
	return collID2Meta, segID2IndexMeta, indexID2Meta
}