meta_cache.go 19.1 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19

import (
G
godchen 已提交
20
	"context"
S
sunby 已提交
21 22
	"errors"
	"fmt"
23
	"strconv"
24
	"sync"
25

Z
zhenshan.cao 已提交
26 27
	"go.uber.org/zap"

28
	"github.com/milvus-io/milvus/internal/common"
29
	"github.com/milvus-io/milvus/internal/log"
30
	"github.com/milvus-io/milvus/internal/metrics"
X
Xiangyu Wang 已提交
31
	"github.com/milvus-io/milvus/internal/proto/commonpb"
32
	"github.com/milvus-io/milvus/internal/proto/internalpb"
X
Xiangyu Wang 已提交
33
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
34
	"github.com/milvus-io/milvus/internal/proto/querypb"
35
	"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
X
Xiangyu Wang 已提交
36 37
	"github.com/milvus-io/milvus/internal/proto/schemapb"
	"github.com/milvus-io/milvus/internal/types"
38
	"github.com/milvus-io/milvus/internal/util/timerecord"
X
Xiangyu Wang 已提交
39
	"github.com/milvus-io/milvus/internal/util/typeutil"
40 41
)

42
// Cache is the interface for system meta data cache
43
type Cache interface {
44
	// GetCollectionID get collection's id by name.
G
godchen 已提交
45
	GetCollectionID(ctx context.Context, collectionName string) (typeutil.UniqueID, error)
46
	// GetCollectionInfo get collection's information by name, such as collection id, schema, and etc.
47
	GetCollectionInfo(ctx context.Context, collectionName string) (*collectionInfo, error)
48
	// GetPartitionID get partition's identifier of specific collection.
G
godchen 已提交
49
	GetPartitionID(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error)
50
	// GetPartitions get all partitions' id of specific collection.
Z
zhenshan.cao 已提交
51
	GetPartitions(ctx context.Context, collectionName string) (map[string]typeutil.UniqueID, error)
52
	// GetPartitionInfo get partition's info.
53
	GetPartitionInfo(ctx context.Context, collectionName string, partitionName string) (*partitionInfo, error)
54
	// GetCollectionSchema get collection's schema.
G
godchen 已提交
55
	GetCollectionSchema(ctx context.Context, collectionName string) (*schemapb.CollectionSchema, error)
56
	GetShards(ctx context.Context, withCache bool, collectionName string, qc types.QueryCoord) ([]*querypb.ShardLeadersList, error)
G
godchen 已提交
57 58
	RemoveCollection(ctx context.Context, collectionName string)
	RemovePartition(ctx context.Context, collectionName string, partitionName string)
59 60 61 62 63 64 65

	// GetCredentialInfo operate credential cache
	GetCredentialInfo(ctx context.Context, username string) (*internalpb.CredentialInfo, error)
	RemoveCredential(username string)
	UpdateCredential(credInfo *internalpb.CredentialInfo)
	GetCredUsernames(ctx context.Context) ([]string, error)
	ClearCredUsers()
G
godchen 已提交
66 67 68
}

type collectionInfo struct {
69 70 71
	collID              typeutil.UniqueID
	schema              *schemapb.CollectionSchema
	partInfo            map[string]*partitionInfo
72
	shardLeaders        []*querypb.ShardLeadersList
73 74 75 76 77 78 79 80
	createdTimestamp    uint64
	createdUtcTimestamp uint64
}

type partitionInfo struct {
	partitionID         typeutil.UniqueID
	createdTimestamp    uint64
	createdUtcTimestamp uint64
G
godchen 已提交
81 82
}

83
// make sure MetaCache implements Cache.
84 85
var _ Cache = (*MetaCache)(nil)

86
// MetaCache implements Cache, provides collection meta cache based on internal RootCoord
G
godchen 已提交
87
type MetaCache struct {
88
	client types.RootCoord
G
godchen 已提交
89

90 91 92 93 94
	collInfo         map[string]*collectionInfo
	credMap          map[string]*internalpb.CredentialInfo // cache for credential, lazy load
	credUsernameList []string                              // no need initialize when NewMetaCache
	mu               sync.RWMutex
	credMut          sync.RWMutex
95 96
}

97
// globalMetaCache is singleton instance of Cache
98
var globalMetaCache Cache
99

100
// InitMetaCache initializes globalMetaCache
101
func InitMetaCache(client types.RootCoord) error {
G
godchen 已提交
102 103 104 105 106 107
	var err error
	globalMetaCache, err = NewMetaCache(client)
	if err != nil {
		return err
	}
	return nil
108 109
}

110
// NewMetaCache creates a MetaCache with provided RootCoord
111
func NewMetaCache(client types.RootCoord) (*MetaCache, error) {
G
godchen 已提交
112 113 114
	return &MetaCache{
		client:   client,
		collInfo: map[string]*collectionInfo{},
115
		credMap:  map[string]*internalpb.CredentialInfo{},
G
godchen 已提交
116
	}, nil
117 118
}

119
// GetCollectionID returns the corresponding collection id for provided collection name
Z
zhenshan.cao 已提交
120
func (m *MetaCache) GetCollectionID(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
G
godchen 已提交
121 122
	m.mu.RLock()
	collInfo, ok := m.collInfo[collectionName]
Z
zhenshan.cao 已提交
123

124
	if !ok {
X
Xiaofan 已提交
125
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GeCollectionID", metrics.CacheMissLabel).Inc()
126
		tr := timerecord.NewTimeRecorder("UpdateCache")
Z
zhenshan.cao 已提交
127 128 129 130 131 132 133 134
		m.mu.RUnlock()
		coll, err := m.describeCollection(ctx, collectionName)
		if err != nil {
			return 0, err
		}
		m.mu.Lock()
		defer m.mu.Unlock()
		m.updateCollection(coll, collectionName)
X
Xiaofan 已提交
135
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
Z
zhenshan.cao 已提交
136 137
		collInfo = m.collInfo[collectionName]
		return collInfo.collID, nil
138
	}
Z
zhenshan.cao 已提交
139
	defer m.mu.RUnlock()
X
Xiaofan 已提交
140
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionID", metrics.CacheHitLabel).Inc()
Z
zhenshan.cao 已提交
141

G
godchen 已提交
142
	return collInfo.collID, nil
143 144
}

145 146
// GetCollectionInfo returns the collection information related to provided collection name
// If the information is not found, proxy will try to fetch information for other source (RootCoord for now)
147 148 149 150 151 152 153
func (m *MetaCache) GetCollectionInfo(ctx context.Context, collectionName string) (*collectionInfo, error) {
	m.mu.RLock()
	var collInfo *collectionInfo
	collInfo, ok := m.collInfo[collectionName]
	m.mu.RUnlock()

	if !ok {
154
		tr := timerecord.NewTimeRecorder("UpdateCache")
X
Xiaofan 已提交
155
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheMissLabel).Inc()
156 157 158 159 160 161 162 163
		coll, err := m.describeCollection(ctx, collectionName)
		if err != nil {
			return nil, err
		}
		m.mu.Lock()
		defer m.mu.Unlock()
		m.updateCollection(coll, collectionName)
		collInfo = m.collInfo[collectionName]
X
Xiaofan 已提交
164
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
165
	}
166

X
Xiaofan 已提交
167
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheHitLabel).Inc()
168 169 170 171 172 173
	return &collectionInfo{
		collID:              collInfo.collID,
		schema:              collInfo.schema,
		partInfo:            collInfo.partInfo,
		createdTimestamp:    collInfo.createdTimestamp,
		createdUtcTimestamp: collInfo.createdUtcTimestamp,
174
		shardLeaders:        collInfo.shardLeaders,
175 176 177
	}, nil
}

Z
zhenshan.cao 已提交
178
func (m *MetaCache) GetCollectionSchema(ctx context.Context, collectionName string) (*schemapb.CollectionSchema, error) {
G
godchen 已提交
179 180
	m.mu.RLock()
	collInfo, ok := m.collInfo[collectionName]
Z
zhenshan.cao 已提交
181

G
godchen 已提交
182
	if !ok {
X
Xiaofan 已提交
183
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheMissLabel).Inc()
184
		tr := timerecord.NewTimeRecorder("UpdateCache")
Z
zhenshan.cao 已提交
185 186 187
		m.mu.RUnlock()
		coll, err := m.describeCollection(ctx, collectionName)
		if err != nil {
188 189 190
			log.Warn("Failed to load collection from rootcoord ",
				zap.String("collection name ", collectionName),
				zap.Error(err))
Z
zhenshan.cao 已提交
191 192 193 194 195 196
			return nil, err
		}
		m.mu.Lock()
		defer m.mu.Unlock()
		m.updateCollection(coll, collectionName)
		collInfo = m.collInfo[collectionName]
X
Xiaofan 已提交
197
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
198
		log.Debug("Reload collection from root coordinator ",
199
			zap.String("collection name ", collectionName),
200
			zap.Any("time (milliseconds) take ", tr.ElapseSpan().Milliseconds()))
Z
zhenshan.cao 已提交
201
		return collInfo.schema, nil
G
godchen 已提交
202
	}
Z
zhenshan.cao 已提交
203
	defer m.mu.RUnlock()
X
Xiaofan 已提交
204
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheHitLabel).Inc()
Z
zhenshan.cao 已提交
205

G
godchen 已提交
206 207 208
	return collInfo.schema, nil
}

Z
zhenshan.cao 已提交
209 210 211 212 213 214 215
func (m *MetaCache) updateCollection(coll *milvuspb.DescribeCollectionResponse, collectionName string) {
	_, ok := m.collInfo[collectionName]
	if !ok {
		m.collInfo[collectionName] = &collectionInfo{}
	}
	m.collInfo[collectionName].schema = coll.Schema
	m.collInfo[collectionName].collID = coll.CollectionID
216 217
	m.collInfo[collectionName].createdTimestamp = coll.CreatedTimestamp
	m.collInfo[collectionName].createdUtcTimestamp = coll.CreatedUtcTimestamp
Z
zhenshan.cao 已提交
218 219 220
}

func (m *MetaCache) GetPartitionID(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error) {
221
	partInfo, err := m.GetPartitionInfo(ctx, collectionName, partitionName)
Z
zhenshan.cao 已提交
222 223 224
	if err != nil {
		return 0, err
	}
225 226 227 228 229 230 231 232
	return partInfo.partitionID, nil
}

func (m *MetaCache) GetPartitions(ctx context.Context, collectionName string) (map[string]typeutil.UniqueID, error) {
	_, err := m.GetCollectionID(ctx, collectionName)
	if err != nil {
		return nil, err
	}
Z
zhenshan.cao 已提交
233

G
godchen 已提交
234 235
	m.mu.RLock()

C
cai.zhang 已提交
236 237
	collInfo, ok := m.collInfo[collectionName]
	if !ok {
Z
zhenshan.cao 已提交
238
		m.mu.RUnlock()
239
		return nil, fmt.Errorf("can't find collection name:%s", collectionName)
C
cai.zhang 已提交
240 241
	}

242
	if collInfo.partInfo == nil || len(collInfo.partInfo) == 0 {
243
		tr := timerecord.NewTimeRecorder("UpdateCache")
X
Xiaofan 已提交
244
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitions", metrics.CacheMissLabel).Inc()
245
		m.mu.RUnlock()
Z
zhenshan.cao 已提交
246 247 248

		partitions, err := m.showPartitions(ctx, collectionName)
		if err != nil {
249
			return nil, err
Z
zhenshan.cao 已提交
250 251 252 253
		}

		m.mu.Lock()
		defer m.mu.Unlock()
254

255 256 257 258
		err = m.updatePartitions(partitions, collectionName)
		if err != nil {
			return nil, err
		}
X
Xiaofan 已提交
259
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
260
		log.Debug("proxy", zap.Any("GetPartitions:partitions after update", partitions), zap.Any("collectionName", collectionName))
261
		ret := make(map[string]typeutil.UniqueID)
Z
zhenshan.cao 已提交
262
		partInfo := m.collInfo[collectionName].partInfo
263 264
		for k, v := range partInfo {
			ret[k] = v.partitionID
Z
zhenshan.cao 已提交
265
		}
266 267
		return ret, nil

G
godchen 已提交
268
	}
269
	defer m.mu.RUnlock()
X
Xiaofan 已提交
270
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitions", metrics.CacheHitLabel).Inc()
271 272 273 274 275 276 277 278

	ret := make(map[string]typeutil.UniqueID)
	partInfo := m.collInfo[collectionName].partInfo
	for k, v := range partInfo {
		ret[k] = v.partitionID
	}

	return ret, nil
G
godchen 已提交
279 280
}

281
func (m *MetaCache) GetPartitionInfo(ctx context.Context, collectionName string, partitionName string) (*partitionInfo, error) {
Z
zhenshan.cao 已提交
282 283 284
	_, err := m.GetCollectionID(ctx, collectionName)
	if err != nil {
		return nil, err
285
	}
D
dragondriver 已提交
286

Z
zhenshan.cao 已提交
287 288 289 290 291 292
	m.mu.RLock()

	collInfo, ok := m.collInfo[collectionName]
	if !ok {
		m.mu.RUnlock()
		return nil, fmt.Errorf("can't find collection name:%s", collectionName)
G
godchen 已提交
293
	}
Z
zhenshan.cao 已提交
294

295 296 297
	var partInfo *partitionInfo
	partInfo, ok = collInfo.partInfo[partitionName]
	m.mu.RUnlock()
Z
zhenshan.cao 已提交
298

299
	if !ok {
300
		tr := timerecord.NewTimeRecorder("UpdateCache")
X
Xiaofan 已提交
301
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheMissLabel).Inc()
Z
zhenshan.cao 已提交
302 303 304 305 306 307 308
		partitions, err := m.showPartitions(ctx, collectionName)
		if err != nil {
			return nil, err
		}

		m.mu.Lock()
		defer m.mu.Unlock()
309 310 311 312
		err = m.updatePartitions(partitions, collectionName)
		if err != nil {
			return nil, err
		}
X
Xiaofan 已提交
313
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
314 315 316 317
		log.Debug("proxy", zap.Any("GetPartitionID:partitions after update", partitions), zap.Any("collectionName", collectionName))
		partInfo, ok = m.collInfo[collectionName].partInfo[partitionName]
		if !ok {
			return nil, fmt.Errorf("partitionID of partitionName:%s can not be find", partitionName)
Z
zhenshan.cao 已提交
318
		}
N
neza2017 已提交
319
	}
X
Xiaofan 已提交
320
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheHitLabel).Inc()
321 322 323 324 325
	return &partitionInfo{
		partitionID:         partInfo.partitionID,
		createdTimestamp:    partInfo.createdTimestamp,
		createdUtcTimestamp: partInfo.createdUtcTimestamp,
	}, nil
G
godchen 已提交
326 327
}

328
// Get the collection information from rootcoord.
Z
zhenshan.cao 已提交
329
func (m *MetaCache) describeCollection(ctx context.Context, collectionName string) (*milvuspb.DescribeCollectionResponse, error) {
G
godchen 已提交
330 331
	req := &milvuspb.DescribeCollectionRequest{
		Base: &commonpb.MsgBase{
332
			MsgType: commonpb.MsgType_DescribeCollection,
G
godchen 已提交
333 334 335
		},
		CollectionName: collectionName,
	}
G
godchen 已提交
336
	coll, err := m.client.DescribeCollection(ctx, req)
G
godchen 已提交
337 338 339
	if err != nil {
		return nil, err
	}
340
	if coll.Status.ErrorCode != commonpb.ErrorCode_Success {
S
sunby 已提交
341
		return nil, errors.New(coll.Status.Reason)
G
godchen 已提交
342
	}
343 344 345 346 347 348 349 350 351 352 353
	resp := &milvuspb.DescribeCollectionResponse{
		Status: coll.Status,
		Schema: &schemapb.CollectionSchema{
			Name:        coll.Schema.Name,
			Description: coll.Schema.Description,
			AutoID:      coll.Schema.AutoID,
			Fields:      make([]*schemapb.FieldSchema, 0),
		},
		CollectionID:         coll.CollectionID,
		VirtualChannelNames:  coll.VirtualChannelNames,
		PhysicalChannelNames: coll.PhysicalChannelNames,
354 355
		CreatedTimestamp:     coll.CreatedTimestamp,
		CreatedUtcTimestamp:  coll.CreatedUtcTimestamp,
356 357
	}
	for _, field := range coll.Schema.Fields {
358
		if field.FieldID >= common.StartOfUserFieldID {
359 360 361 362
			resp.Schema.Fields = append(resp.Schema.Fields, field)
		}
	}
	return resp, nil
363 364
}

Z
zhenshan.cao 已提交
365
func (m *MetaCache) showPartitions(ctx context.Context, collectionName string) (*milvuspb.ShowPartitionsResponse, error) {
G
godchen 已提交
366
	req := &milvuspb.ShowPartitionsRequest{
G
godchen 已提交
367
		Base: &commonpb.MsgBase{
368
			MsgType: commonpb.MsgType_ShowPartitions,
G
godchen 已提交
369 370 371
		},
		CollectionName: collectionName,
	}
Z
zhenshan.cao 已提交
372

G
godchen 已提交
373
	partitions, err := m.client.ShowPartitions(ctx, req)
G
godchen 已提交
374
	if err != nil {
Z
zhenshan.cao 已提交
375
		return nil, err
G
godchen 已提交
376
	}
377
	if partitions.Status.ErrorCode != commonpb.ErrorCode_Success {
Z
zhenshan.cao 已提交
378
		return nil, fmt.Errorf("%s", partitions.Status.Reason)
G
godchen 已提交
379
	}
B
bigsheeper 已提交
380

G
godchen 已提交
381
	if len(partitions.PartitionIDs) != len(partitions.PartitionNames) {
Z
zhenshan.cao 已提交
382
		return nil, fmt.Errorf("partition ids len: %d doesn't equal Partition name len %d",
G
godchen 已提交
383 384
			len(partitions.PartitionIDs), len(partitions.PartitionNames))
	}
C
cai.zhang 已提交
385

Z
zhenshan.cao 已提交
386 387 388
	return partitions, nil
}

389
func (m *MetaCache) updatePartitions(partitions *milvuspb.ShowPartitionsResponse, collectionName string) error {
C
cai.zhang 已提交
390 391 392
	_, ok := m.collInfo[collectionName]
	if !ok {
		m.collInfo[collectionName] = &collectionInfo{
393
			partInfo: map[string]*partitionInfo{},
C
cai.zhang 已提交
394 395 396
		}
	}
	partInfo := m.collInfo[collectionName].partInfo
G
godchen 已提交
397
	if partInfo == nil {
398
		partInfo = map[string]*partitionInfo{}
G
godchen 已提交
399
	}
400

401 402 403 404 405
	// check partitionID, createdTimestamp and utcstamp has sam element numbers
	if len(partitions.PartitionNames) != len(partitions.CreatedTimestamps) || len(partitions.PartitionNames) != len(partitions.CreatedUtcTimestamps) {
		return errors.New("partition names and timestamps number is not aligned, response " + partitions.String())
	}

G
godchen 已提交
406
	for i := 0; i < len(partitions.PartitionIDs); i++ {
407 408 409 410 411 412
		if _, ok := partInfo[partitions.PartitionNames[i]]; !ok {
			partInfo[partitions.PartitionNames[i]] = &partitionInfo{
				partitionID:         partitions.PartitionIDs[i],
				createdTimestamp:    partitions.CreatedTimestamps[i],
				createdUtcTimestamp: partitions.CreatedUtcTimestamps[i],
			}
G
godchen 已提交
413 414
		}
	}
Z
zhenshan.cao 已提交
415
	m.collInfo[collectionName].partInfo = partInfo
416
	return nil
417 418
}

G
godchen 已提交
419
func (m *MetaCache) RemoveCollection(ctx context.Context, collectionName string) {
G
godchen 已提交
420 421 422
	m.mu.Lock()
	defer m.mu.Unlock()
	delete(m.collInfo, collectionName)
423 424
}

G
godchen 已提交
425
func (m *MetaCache) RemovePartition(ctx context.Context, collectionName, partitionName string) {
G
godchen 已提交
426 427
	m.mu.Lock()
	defer m.mu.Unlock()
C
cai.zhang 已提交
428 429 430 431 432 433 434 435 436
	_, ok := m.collInfo[collectionName]
	if !ok {
		return
	}
	partInfo := m.collInfo[collectionName].partInfo
	if partInfo == nil {
		return
	}
	delete(partInfo, partitionName)
437
}
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527

// GetCredentialInfo returns the credential related to provided username
// If the cache missed, proxy will try to fetch from storage
func (m *MetaCache) GetCredentialInfo(ctx context.Context, username string) (*internalpb.CredentialInfo, error) {
	m.credMut.RLock()
	var credInfo *internalpb.CredentialInfo
	credInfo, ok := m.credMap[username]
	m.credMut.RUnlock()

	if !ok {
		req := &rootcoordpb.GetCredentialRequest{
			Base: &commonpb.MsgBase{
				MsgType: commonpb.MsgType_GetCredential,
			},
			Username: username,
		}
		resp, err := m.client.GetCredential(ctx, req)
		if err != nil {
			return &internalpb.CredentialInfo{}, err
		}
		credInfo = &internalpb.CredentialInfo{
			Username:          resp.Username,
			EncryptedPassword: resp.Password,
		}
		m.UpdateCredential(credInfo)
	}

	return &internalpb.CredentialInfo{
		Username:          credInfo.Username,
		EncryptedPassword: credInfo.EncryptedPassword,
	}, nil
}

func (m *MetaCache) ClearCredUsers() {
	m.credMut.Lock()
	defer m.credMut.Unlock()
	// clear credUsernameList
	m.credUsernameList = nil
}

func (m *MetaCache) RemoveCredential(username string) {
	m.credMut.Lock()
	defer m.credMut.Unlock()
	// delete pair in credMap
	delete(m.credMap, username)
	// clear credUsernameList
	m.credUsernameList = nil
}

func (m *MetaCache) UpdateCredential(credInfo *internalpb.CredentialInfo) {
	m.credMut.Lock()
	defer m.credMut.Unlock()
	// update credMap
	username := credInfo.Username
	password := credInfo.EncryptedPassword
	_, ok := m.credMap[username]
	if !ok {
		m.credMap[username] = &internalpb.CredentialInfo{}
	}
	m.credMap[username].Username = username
	m.credMap[username].EncryptedPassword = password
}

func (m *MetaCache) UpdateCredUsersListCache(usernames []string) {
	m.credMut.Lock()
	defer m.credMut.Unlock()
	m.credUsernameList = usernames
}

func (m *MetaCache) GetCredUsernames(ctx context.Context) ([]string, error) {
	m.credMut.RLock()
	usernames := m.credUsernameList
	m.credMut.RUnlock()

	if usernames == nil {
		req := &milvuspb.ListCredUsersRequest{
			Base: &commonpb.MsgBase{
				MsgType: commonpb.MsgType_ListCredUsernames,
			},
		}
		resp, err := m.client.ListCredUsers(ctx, req)
		if err != nil {
			return nil, err
		}
		usernames = resp.Usernames
		m.UpdateCredUsersListCache(usernames)
	}

	return usernames, nil
}
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548

// GetShards update cache if withCache == false
func (m *MetaCache) GetShards(ctx context.Context, withCache bool, collectionName string, qc types.QueryCoord) ([]*querypb.ShardLeadersList, error) {
	info, err := m.GetCollectionInfo(ctx, collectionName)
	if err != nil {
		return nil, err
	}

	if withCache {
		if len(info.shardLeaders) > 0 {
			return info.shardLeaders, nil
		}
		log.Info("no shard cache for collection, try to get shard leaders from QueryCoord",
			zap.String("collectionName", collectionName))
	}

	m.mu.Lock()
	defer m.mu.Unlock()
	req := &querypb.GetShardLeadersRequest{
		Base: &commonpb.MsgBase{
			MsgType:  commonpb.MsgType_GetShardLeaders,
X
Xiaofan 已提交
549
			SourceID: Params.ProxyCfg.GetNodeID(),
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565
		},
		CollectionID: info.collID,
	}
	resp, err := qc.GetShardLeaders(ctx, req)
	if err != nil {
		return nil, err
	}
	if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
		return nil, fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.Status.Reason)
	}

	shards := resp.GetShards()

	m.collInfo[collectionName].shardLeaders = shards
	return shards, nil
}