meta_cache.go 17.7 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19

import (
G
godchen 已提交
20
	"context"
S
sunby 已提交
21 22
	"errors"
	"fmt"
23
	"strconv"
24
	"sync"
25

Z
zhenshan.cao 已提交
26 27
	"go.uber.org/zap"

28
	"github.com/milvus-io/milvus/internal/common"
29
	"github.com/milvus-io/milvus/internal/log"
30
	"github.com/milvus-io/milvus/internal/metrics"
X
Xiangyu Wang 已提交
31
	"github.com/milvus-io/milvus/internal/proto/commonpb"
32
	"github.com/milvus-io/milvus/internal/proto/internalpb"
X
Xiangyu Wang 已提交
33
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
34
	"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
X
Xiangyu Wang 已提交
35 36
	"github.com/milvus-io/milvus/internal/proto/schemapb"
	"github.com/milvus-io/milvus/internal/types"
37
	"github.com/milvus-io/milvus/internal/util/timerecord"
X
Xiangyu Wang 已提交
38
	"github.com/milvus-io/milvus/internal/util/typeutil"
39 40
)

41
// Cache is the interface for system meta data cache
42
type Cache interface {
43
	// GetCollectionID get collection's id by name.
G
godchen 已提交
44
	GetCollectionID(ctx context.Context, collectionName string) (typeutil.UniqueID, error)
45
	// GetCollectionInfo get collection's information by name, such as collection id, schema, and etc.
46
	GetCollectionInfo(ctx context.Context, collectionName string) (*collectionInfo, error)
47
	// GetPartitionID get partition's identifier of specific collection.
G
godchen 已提交
48
	GetPartitionID(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error)
49
	// GetPartitions get all partitions' id of specific collection.
Z
zhenshan.cao 已提交
50
	GetPartitions(ctx context.Context, collectionName string) (map[string]typeutil.UniqueID, error)
51
	// GetPartitionInfo get partition's info.
52
	GetPartitionInfo(ctx context.Context, collectionName string, partitionName string) (*partitionInfo, error)
53
	// GetCollectionSchema get collection's schema.
G
godchen 已提交
54 55 56
	GetCollectionSchema(ctx context.Context, collectionName string) (*schemapb.CollectionSchema, error)
	RemoveCollection(ctx context.Context, collectionName string)
	RemovePartition(ctx context.Context, collectionName string, partitionName string)
57 58 59 60 61 62 63

	// GetCredentialInfo operate credential cache
	GetCredentialInfo(ctx context.Context, username string) (*internalpb.CredentialInfo, error)
	RemoveCredential(username string)
	UpdateCredential(credInfo *internalpb.CredentialInfo)
	GetCredUsernames(ctx context.Context) ([]string, error)
	ClearCredUsers()
G
godchen 已提交
64 65 66
}

type collectionInfo struct {
67 68 69 70 71 72 73 74 75 76 77
	collID              typeutil.UniqueID
	schema              *schemapb.CollectionSchema
	partInfo            map[string]*partitionInfo
	createdTimestamp    uint64
	createdUtcTimestamp uint64
}

type partitionInfo struct {
	partitionID         typeutil.UniqueID
	createdTimestamp    uint64
	createdUtcTimestamp uint64
G
godchen 已提交
78 79
}

80
// make sure MetaCache implements Cache.
81 82
var _ Cache = (*MetaCache)(nil)

83
// MetaCache implements Cache, provides collection meta cache based on internal RootCoord
G
godchen 已提交
84
type MetaCache struct {
85
	client types.RootCoord
G
godchen 已提交
86

87 88 89 90 91
	collInfo         map[string]*collectionInfo
	credMap          map[string]*internalpb.CredentialInfo // cache for credential, lazy load
	credUsernameList []string                              // no need initialize when NewMetaCache
	mu               sync.RWMutex
	credMut          sync.RWMutex
92 93
}

94
// globalMetaCache is singleton instance of Cache
95
var globalMetaCache Cache
96

97
// InitMetaCache initializes globalMetaCache
98
func InitMetaCache(client types.RootCoord) error {
G
godchen 已提交
99 100 101 102 103 104
	var err error
	globalMetaCache, err = NewMetaCache(client)
	if err != nil {
		return err
	}
	return nil
105 106
}

107
// NewMetaCache creates a MetaCache with provided RootCoord
108
func NewMetaCache(client types.RootCoord) (*MetaCache, error) {
G
godchen 已提交
109 110 111
	return &MetaCache{
		client:   client,
		collInfo: map[string]*collectionInfo{},
112
		credMap:  map[string]*internalpb.CredentialInfo{},
G
godchen 已提交
113
	}, nil
114 115
}

116
// GetCollectionID returns the corresponding collection id for provided collection name
Z
zhenshan.cao 已提交
117
func (m *MetaCache) GetCollectionID(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
G
godchen 已提交
118 119
	m.mu.RLock()
	collInfo, ok := m.collInfo[collectionName]
Z
zhenshan.cao 已提交
120

121
	if !ok {
122 123
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), "GeCollectionID", metrics.CacheMissLabel).Inc()
		tr := timerecord.NewTimeRecorder("UpdateCache")
Z
zhenshan.cao 已提交
124 125 126 127 128 129 130 131
		m.mu.RUnlock()
		coll, err := m.describeCollection(ctx, collectionName)
		if err != nil {
			return 0, err
		}
		m.mu.Lock()
		defer m.mu.Unlock()
		m.updateCollection(coll, collectionName)
132
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
Z
zhenshan.cao 已提交
133 134
		collInfo = m.collInfo[collectionName]
		return collInfo.collID, nil
135
	}
Z
zhenshan.cao 已提交
136
	defer m.mu.RUnlock()
137
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), "GetCollectionID", metrics.CacheHitLabel).Inc()
Z
zhenshan.cao 已提交
138

G
godchen 已提交
139
	return collInfo.collID, nil
140 141
}

142 143
// GetCollectionInfo returns the collection information related to provided collection name
// If the information is not found, proxy will try to fetch information for other source (RootCoord for now)
144 145 146 147 148 149 150
func (m *MetaCache) GetCollectionInfo(ctx context.Context, collectionName string) (*collectionInfo, error) {
	m.mu.RLock()
	var collInfo *collectionInfo
	collInfo, ok := m.collInfo[collectionName]
	m.mu.RUnlock()

	if !ok {
151 152
		tr := timerecord.NewTimeRecorder("UpdateCache")
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), "GetCollectionInfo", metrics.CacheMissLabel).Inc()
153 154 155 156 157 158 159 160
		coll, err := m.describeCollection(ctx, collectionName)
		if err != nil {
			return nil, err
		}
		m.mu.Lock()
		defer m.mu.Unlock()
		m.updateCollection(coll, collectionName)
		collInfo = m.collInfo[collectionName]
161
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
162
	}
163
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), "GetCollectionInfo", metrics.CacheHitLabel).Inc()
164 165 166 167 168 169 170 171 172
	return &collectionInfo{
		collID:              collInfo.collID,
		schema:              collInfo.schema,
		partInfo:            collInfo.partInfo,
		createdTimestamp:    collInfo.createdTimestamp,
		createdUtcTimestamp: collInfo.createdUtcTimestamp,
	}, nil
}

Z
zhenshan.cao 已提交
173
func (m *MetaCache) GetCollectionSchema(ctx context.Context, collectionName string) (*schemapb.CollectionSchema, error) {
G
godchen 已提交
174 175
	m.mu.RLock()
	collInfo, ok := m.collInfo[collectionName]
Z
zhenshan.cao 已提交
176

G
godchen 已提交
177
	if !ok {
178 179
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), "GetCollectionSchema", metrics.CacheMissLabel).Inc()
		tr := timerecord.NewTimeRecorder("UpdateCache")
Z
zhenshan.cao 已提交
180 181 182
		m.mu.RUnlock()
		coll, err := m.describeCollection(ctx, collectionName)
		if err != nil {
183 184 185
			log.Warn("Failed to load collection from rootcoord ",
				zap.String("collection name ", collectionName),
				zap.Error(err))
Z
zhenshan.cao 已提交
186 187 188 189 190 191
			return nil, err
		}
		m.mu.Lock()
		defer m.mu.Unlock()
		m.updateCollection(coll, collectionName)
		collInfo = m.collInfo[collectionName]
192 193
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
		log.Debug("Reload collection from root coordinator ",
194
			zap.String("collection name ", collectionName),
195
			zap.Any("time (milliseconds) take ", tr.ElapseSpan().Milliseconds()))
Z
zhenshan.cao 已提交
196
		return collInfo.schema, nil
G
godchen 已提交
197
	}
Z
zhenshan.cao 已提交
198
	defer m.mu.RUnlock()
199
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), "GetCollectionSchema", metrics.CacheHitLabel).Inc()
Z
zhenshan.cao 已提交
200

G
godchen 已提交
201 202 203
	return collInfo.schema, nil
}

Z
zhenshan.cao 已提交
204 205 206 207 208 209 210
func (m *MetaCache) updateCollection(coll *milvuspb.DescribeCollectionResponse, collectionName string) {
	_, ok := m.collInfo[collectionName]
	if !ok {
		m.collInfo[collectionName] = &collectionInfo{}
	}
	m.collInfo[collectionName].schema = coll.Schema
	m.collInfo[collectionName].collID = coll.CollectionID
211 212
	m.collInfo[collectionName].createdTimestamp = coll.CreatedTimestamp
	m.collInfo[collectionName].createdUtcTimestamp = coll.CreatedUtcTimestamp
Z
zhenshan.cao 已提交
213 214 215
}

func (m *MetaCache) GetPartitionID(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error) {
216
	partInfo, err := m.GetPartitionInfo(ctx, collectionName, partitionName)
Z
zhenshan.cao 已提交
217 218 219
	if err != nil {
		return 0, err
	}
220 221 222 223 224 225 226 227
	return partInfo.partitionID, nil
}

func (m *MetaCache) GetPartitions(ctx context.Context, collectionName string) (map[string]typeutil.UniqueID, error) {
	_, err := m.GetCollectionID(ctx, collectionName)
	if err != nil {
		return nil, err
	}
Z
zhenshan.cao 已提交
228

G
godchen 已提交
229 230
	m.mu.RLock()

C
cai.zhang 已提交
231 232
	collInfo, ok := m.collInfo[collectionName]
	if !ok {
Z
zhenshan.cao 已提交
233
		m.mu.RUnlock()
234
		return nil, fmt.Errorf("can't find collection name:%s", collectionName)
C
cai.zhang 已提交
235 236
	}

237
	if collInfo.partInfo == nil || len(collInfo.partInfo) == 0 {
238 239
		tr := timerecord.NewTimeRecorder("UpdateCache")
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), "GetPartitions", metrics.CacheMissLabel).Inc()
240
		m.mu.RUnlock()
Z
zhenshan.cao 已提交
241 242 243

		partitions, err := m.showPartitions(ctx, collectionName)
		if err != nil {
244
			return nil, err
Z
zhenshan.cao 已提交
245 246 247 248
		}

		m.mu.Lock()
		defer m.mu.Unlock()
249

250 251 252 253
		err = m.updatePartitions(partitions, collectionName)
		if err != nil {
			return nil, err
		}
254
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
255
		log.Debug("proxy", zap.Any("GetPartitions:partitions after update", partitions), zap.Any("collectionName", collectionName))
256
		ret := make(map[string]typeutil.UniqueID)
Z
zhenshan.cao 已提交
257
		partInfo := m.collInfo[collectionName].partInfo
258 259
		for k, v := range partInfo {
			ret[k] = v.partitionID
Z
zhenshan.cao 已提交
260
		}
261 262
		return ret, nil

G
godchen 已提交
263
	}
264
	defer m.mu.RUnlock()
265
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), "GetPartitions", metrics.CacheHitLabel).Inc()
266 267 268 269 270 271 272 273

	ret := make(map[string]typeutil.UniqueID)
	partInfo := m.collInfo[collectionName].partInfo
	for k, v := range partInfo {
		ret[k] = v.partitionID
	}

	return ret, nil
G
godchen 已提交
274 275
}

276
func (m *MetaCache) GetPartitionInfo(ctx context.Context, collectionName string, partitionName string) (*partitionInfo, error) {
Z
zhenshan.cao 已提交
277 278 279
	_, err := m.GetCollectionID(ctx, collectionName)
	if err != nil {
		return nil, err
280
	}
D
dragondriver 已提交
281

Z
zhenshan.cao 已提交
282 283 284 285 286 287
	m.mu.RLock()

	collInfo, ok := m.collInfo[collectionName]
	if !ok {
		m.mu.RUnlock()
		return nil, fmt.Errorf("can't find collection name:%s", collectionName)
G
godchen 已提交
288
	}
Z
zhenshan.cao 已提交
289

290 291 292
	var partInfo *partitionInfo
	partInfo, ok = collInfo.partInfo[partitionName]
	m.mu.RUnlock()
Z
zhenshan.cao 已提交
293

294
	if !ok {
295 296
		tr := timerecord.NewTimeRecorder("UpdateCache")
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), "GetPartitionInfo", metrics.CacheMissLabel).Inc()
Z
zhenshan.cao 已提交
297 298 299 300 301 302 303
		partitions, err := m.showPartitions(ctx, collectionName)
		if err != nil {
			return nil, err
		}

		m.mu.Lock()
		defer m.mu.Unlock()
304 305 306 307
		err = m.updatePartitions(partitions, collectionName)
		if err != nil {
			return nil, err
		}
308
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
309 310 311 312
		log.Debug("proxy", zap.Any("GetPartitionID:partitions after update", partitions), zap.Any("collectionName", collectionName))
		partInfo, ok = m.collInfo[collectionName].partInfo[partitionName]
		if !ok {
			return nil, fmt.Errorf("partitionID of partitionName:%s can not be find", partitionName)
Z
zhenshan.cao 已提交
313
		}
N
neza2017 已提交
314
	}
315
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.ProxyID, 10), "GetPartitionInfo", metrics.CacheHitLabel).Inc()
316 317 318 319 320
	return &partitionInfo{
		partitionID:         partInfo.partitionID,
		createdTimestamp:    partInfo.createdTimestamp,
		createdUtcTimestamp: partInfo.createdUtcTimestamp,
	}, nil
G
godchen 已提交
321 322
}

323
// Get the collection information from rootcoord.
Z
zhenshan.cao 已提交
324
func (m *MetaCache) describeCollection(ctx context.Context, collectionName string) (*milvuspb.DescribeCollectionResponse, error) {
G
godchen 已提交
325 326
	req := &milvuspb.DescribeCollectionRequest{
		Base: &commonpb.MsgBase{
327
			MsgType: commonpb.MsgType_DescribeCollection,
G
godchen 已提交
328 329 330
		},
		CollectionName: collectionName,
	}
G
godchen 已提交
331
	coll, err := m.client.DescribeCollection(ctx, req)
G
godchen 已提交
332 333 334
	if err != nil {
		return nil, err
	}
335
	if coll.Status.ErrorCode != commonpb.ErrorCode_Success {
S
sunby 已提交
336
		return nil, errors.New(coll.Status.Reason)
G
godchen 已提交
337
	}
338 339 340 341 342 343 344 345 346 347 348
	resp := &milvuspb.DescribeCollectionResponse{
		Status: coll.Status,
		Schema: &schemapb.CollectionSchema{
			Name:        coll.Schema.Name,
			Description: coll.Schema.Description,
			AutoID:      coll.Schema.AutoID,
			Fields:      make([]*schemapb.FieldSchema, 0),
		},
		CollectionID:         coll.CollectionID,
		VirtualChannelNames:  coll.VirtualChannelNames,
		PhysicalChannelNames: coll.PhysicalChannelNames,
349 350
		CreatedTimestamp:     coll.CreatedTimestamp,
		CreatedUtcTimestamp:  coll.CreatedUtcTimestamp,
351 352
	}
	for _, field := range coll.Schema.Fields {
353
		if field.FieldID >= common.StartOfUserFieldID {
354 355 356 357
			resp.Schema.Fields = append(resp.Schema.Fields, field)
		}
	}
	return resp, nil
358 359
}

Z
zhenshan.cao 已提交
360
func (m *MetaCache) showPartitions(ctx context.Context, collectionName string) (*milvuspb.ShowPartitionsResponse, error) {
G
godchen 已提交
361
	req := &milvuspb.ShowPartitionsRequest{
G
godchen 已提交
362
		Base: &commonpb.MsgBase{
363
			MsgType: commonpb.MsgType_ShowPartitions,
G
godchen 已提交
364 365 366
		},
		CollectionName: collectionName,
	}
Z
zhenshan.cao 已提交
367

G
godchen 已提交
368
	partitions, err := m.client.ShowPartitions(ctx, req)
G
godchen 已提交
369
	if err != nil {
Z
zhenshan.cao 已提交
370
		return nil, err
G
godchen 已提交
371
	}
372
	if partitions.Status.ErrorCode != commonpb.ErrorCode_Success {
Z
zhenshan.cao 已提交
373
		return nil, fmt.Errorf("%s", partitions.Status.Reason)
G
godchen 已提交
374
	}
B
bigsheeper 已提交
375

G
godchen 已提交
376
	if len(partitions.PartitionIDs) != len(partitions.PartitionNames) {
Z
zhenshan.cao 已提交
377
		return nil, fmt.Errorf("partition ids len: %d doesn't equal Partition name len %d",
G
godchen 已提交
378 379
			len(partitions.PartitionIDs), len(partitions.PartitionNames))
	}
C
cai.zhang 已提交
380

Z
zhenshan.cao 已提交
381 382 383
	return partitions, nil
}

384
func (m *MetaCache) updatePartitions(partitions *milvuspb.ShowPartitionsResponse, collectionName string) error {
C
cai.zhang 已提交
385 386 387
	_, ok := m.collInfo[collectionName]
	if !ok {
		m.collInfo[collectionName] = &collectionInfo{
388
			partInfo: map[string]*partitionInfo{},
C
cai.zhang 已提交
389 390 391
		}
	}
	partInfo := m.collInfo[collectionName].partInfo
G
godchen 已提交
392
	if partInfo == nil {
393
		partInfo = map[string]*partitionInfo{}
G
godchen 已提交
394
	}
395

396 397 398 399 400
	// check partitionID, createdTimestamp and utcstamp has sam element numbers
	if len(partitions.PartitionNames) != len(partitions.CreatedTimestamps) || len(partitions.PartitionNames) != len(partitions.CreatedUtcTimestamps) {
		return errors.New("partition names and timestamps number is not aligned, response " + partitions.String())
	}

G
godchen 已提交
401
	for i := 0; i < len(partitions.PartitionIDs); i++ {
402 403 404 405 406 407
		if _, ok := partInfo[partitions.PartitionNames[i]]; !ok {
			partInfo[partitions.PartitionNames[i]] = &partitionInfo{
				partitionID:         partitions.PartitionIDs[i],
				createdTimestamp:    partitions.CreatedTimestamps[i],
				createdUtcTimestamp: partitions.CreatedUtcTimestamps[i],
			}
G
godchen 已提交
408 409
		}
	}
Z
zhenshan.cao 已提交
410
	m.collInfo[collectionName].partInfo = partInfo
411
	return nil
412 413
}

G
godchen 已提交
414
func (m *MetaCache) RemoveCollection(ctx context.Context, collectionName string) {
G
godchen 已提交
415 416 417
	m.mu.Lock()
	defer m.mu.Unlock()
	delete(m.collInfo, collectionName)
418 419
}

G
godchen 已提交
420
func (m *MetaCache) RemovePartition(ctx context.Context, collectionName, partitionName string) {
G
godchen 已提交
421 422
	m.mu.Lock()
	defer m.mu.Unlock()
C
cai.zhang 已提交
423 424 425 426 427 428 429 430 431
	_, ok := m.collInfo[collectionName]
	if !ok {
		return
	}
	partInfo := m.collInfo[collectionName].partInfo
	if partInfo == nil {
		return
	}
	delete(partInfo, partitionName)
432
}
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522

// GetCredentialInfo returns the credential related to provided username
// If the cache missed, proxy will try to fetch from storage
func (m *MetaCache) GetCredentialInfo(ctx context.Context, username string) (*internalpb.CredentialInfo, error) {
	m.credMut.RLock()
	var credInfo *internalpb.CredentialInfo
	credInfo, ok := m.credMap[username]
	m.credMut.RUnlock()

	if !ok {
		req := &rootcoordpb.GetCredentialRequest{
			Base: &commonpb.MsgBase{
				MsgType: commonpb.MsgType_GetCredential,
			},
			Username: username,
		}
		resp, err := m.client.GetCredential(ctx, req)
		if err != nil {
			return &internalpb.CredentialInfo{}, err
		}
		credInfo = &internalpb.CredentialInfo{
			Username:          resp.Username,
			EncryptedPassword: resp.Password,
		}
		m.UpdateCredential(credInfo)
	}

	return &internalpb.CredentialInfo{
		Username:          credInfo.Username,
		EncryptedPassword: credInfo.EncryptedPassword,
	}, nil
}

func (m *MetaCache) ClearCredUsers() {
	m.credMut.Lock()
	defer m.credMut.Unlock()
	// clear credUsernameList
	m.credUsernameList = nil
}

func (m *MetaCache) RemoveCredential(username string) {
	m.credMut.Lock()
	defer m.credMut.Unlock()
	// delete pair in credMap
	delete(m.credMap, username)
	// clear credUsernameList
	m.credUsernameList = nil
}

func (m *MetaCache) UpdateCredential(credInfo *internalpb.CredentialInfo) {
	m.credMut.Lock()
	defer m.credMut.Unlock()
	// update credMap
	username := credInfo.Username
	password := credInfo.EncryptedPassword
	_, ok := m.credMap[username]
	if !ok {
		m.credMap[username] = &internalpb.CredentialInfo{}
	}
	m.credMap[username].Username = username
	m.credMap[username].EncryptedPassword = password
}

func (m *MetaCache) UpdateCredUsersListCache(usernames []string) {
	m.credMut.Lock()
	defer m.credMut.Unlock()
	m.credUsernameList = usernames
}

func (m *MetaCache) GetCredUsernames(ctx context.Context) ([]string, error) {
	m.credMut.RLock()
	usernames := m.credUsernameList
	m.credMut.RUnlock()

	if usernames == nil {
		req := &milvuspb.ListCredUsersRequest{
			Base: &commonpb.MsgBase{
				MsgType: commonpb.MsgType_ListCredUsernames,
			},
		}
		resp, err := m.client.ListCredUsers(ctx, req)
		if err != nil {
			return nil, err
		}
		usernames = resp.Usernames
		m.UpdateCredUsersListCache(usernames)
	}

	return usernames, nil
}