meta_cache.go 22.4 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

C
Cai Yudong 已提交
17
package proxy
18 19

import (
G
godchen 已提交
20
	"context"
S
sunby 已提交
21 22
	"errors"
	"fmt"
23
	"strconv"
24
	"sync"
25
	"time"
26

Z
zhenshan.cao 已提交
27 28
	"go.uber.org/zap"

29
	"github.com/milvus-io/milvus/internal/common"
30
	"github.com/milvus-io/milvus/internal/log"
31
	"github.com/milvus-io/milvus/internal/metrics"
X
Xiangyu Wang 已提交
32
	"github.com/milvus-io/milvus/internal/proto/commonpb"
33
	"github.com/milvus-io/milvus/internal/proto/internalpb"
X
Xiangyu Wang 已提交
34
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
35
	"github.com/milvus-io/milvus/internal/proto/querypb"
36
	"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
X
Xiangyu Wang 已提交
37 38
	"github.com/milvus-io/milvus/internal/proto/schemapb"
	"github.com/milvus-io/milvus/internal/types"
39
	"github.com/milvus-io/milvus/internal/util/retry"
40
	"github.com/milvus-io/milvus/internal/util/timerecord"
X
Xiangyu Wang 已提交
41
	"github.com/milvus-io/milvus/internal/util/typeutil"
42 43
)

44
// Cache is the interface for system meta data cache
45
type Cache interface {
46
	// GetCollectionID get collection's id by name.
G
godchen 已提交
47
	GetCollectionID(ctx context.Context, collectionName string) (typeutil.UniqueID, error)
48
	// GetCollectionInfo get collection's information by name, such as collection id, schema, and etc.
49
	GetCollectionInfo(ctx context.Context, collectionName string) (*collectionInfo, error)
50
	// GetPartitionID get partition's identifier of specific collection.
G
godchen 已提交
51
	GetPartitionID(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error)
52
	// GetPartitions get all partitions' id of specific collection.
Z
zhenshan.cao 已提交
53
	GetPartitions(ctx context.Context, collectionName string) (map[string]typeutil.UniqueID, error)
54
	// GetPartitionInfo get partition's info.
55
	GetPartitionInfo(ctx context.Context, collectionName string, partitionName string) (*partitionInfo, error)
56
	// GetCollectionSchema get collection's schema.
G
godchen 已提交
57
	GetCollectionSchema(ctx context.Context, collectionName string) (*schemapb.CollectionSchema, error)
58
	GetShards(ctx context.Context, withCache bool, collectionName string, qc types.QueryCoord) (map[string][]queryNode, error)
59
	ClearShards(collectionName string)
G
godchen 已提交
60
	RemoveCollection(ctx context.Context, collectionName string)
61
	RemoveCollectionsByID(ctx context.Context, collectionID UniqueID)
G
godchen 已提交
62
	RemovePartition(ctx context.Context, collectionName string, partitionName string)
63 64 65 66 67 68 69

	// GetCredentialInfo operate credential cache
	GetCredentialInfo(ctx context.Context, username string) (*internalpb.CredentialInfo, error)
	RemoveCredential(username string)
	UpdateCredential(credInfo *internalpb.CredentialInfo)
	GetCredUsernames(ctx context.Context) ([]string, error)
	ClearCredUsers()
G
godchen 已提交
70 71 72
}

type collectionInfo struct {
73 74 75
	collID              typeutil.UniqueID
	schema              *schemapb.CollectionSchema
	partInfo            map[string]*partitionInfo
76
	shardLeaders        map[string][]queryNode
77
	leaderMutex         sync.Mutex
78 79
	createdTimestamp    uint64
	createdUtcTimestamp uint64
80
	isLoaded            bool
81 82
}

83 84 85 86 87 88 89 90 91 92 93 94
// CloneShardLeaders returns a copy of shard leaders
// leaderMutex shall be accuired before invoking this method
func (c *collectionInfo) CloneShardLeaders() map[string][]queryNode {
	m := make(map[string][]queryNode)
	for channel, leaders := range c.shardLeaders {
		l := make([]queryNode, len(leaders))
		copy(l, leaders)
		m[channel] = l
	}
	return m
}

95 96 97 98
type partitionInfo struct {
	partitionID         typeutil.UniqueID
	createdTimestamp    uint64
	createdUtcTimestamp uint64
G
godchen 已提交
99 100
}

101
// make sure MetaCache implements Cache.
102 103
var _ Cache = (*MetaCache)(nil)

104
// MetaCache implements Cache, provides collection meta cache based on internal RootCoord
G
godchen 已提交
105
type MetaCache struct {
106 107
	rootCoord  types.RootCoord
	queryCoord types.QueryCoord
G
godchen 已提交
108

109 110 111 112 113
	collInfo         map[string]*collectionInfo
	credMap          map[string]*internalpb.CredentialInfo // cache for credential, lazy load
	credUsernameList []string                              // no need initialize when NewMetaCache
	mu               sync.RWMutex
	credMut          sync.RWMutex
114 115
}

116
// globalMetaCache is singleton instance of Cache
117
var globalMetaCache Cache
118

119
// InitMetaCache initializes globalMetaCache
120
func InitMetaCache(rootCoord types.RootCoord, queryCoord types.QueryCoord) error {
G
godchen 已提交
121
	var err error
122
	globalMetaCache, err = NewMetaCache(rootCoord, queryCoord)
G
godchen 已提交
123 124 125 126
	if err != nil {
		return err
	}
	return nil
127 128
}

129 130
// NewMetaCache creates a MetaCache with provided RootCoord and QueryNode
func NewMetaCache(rootCoord types.RootCoord, queryCoord types.QueryCoord) (*MetaCache, error) {
G
godchen 已提交
131
	return &MetaCache{
132 133 134 135
		rootCoord:  rootCoord,
		queryCoord: queryCoord,
		collInfo:   map[string]*collectionInfo{},
		credMap:    map[string]*internalpb.CredentialInfo{},
G
godchen 已提交
136
	}, nil
137 138
}

139
// GetCollectionID returns the corresponding collection id for provided collection name
Z
zhenshan.cao 已提交
140
func (m *MetaCache) GetCollectionID(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
G
godchen 已提交
141 142
	m.mu.RLock()
	collInfo, ok := m.collInfo[collectionName]
Z
zhenshan.cao 已提交
143

144
	if !ok {
X
Xiaofan 已提交
145
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GeCollectionID", metrics.CacheMissLabel).Inc()
146
		tr := timerecord.NewTimeRecorder("UpdateCache")
Z
zhenshan.cao 已提交
147 148 149 150 151 152 153 154
		m.mu.RUnlock()
		coll, err := m.describeCollection(ctx, collectionName)
		if err != nil {
			return 0, err
		}
		m.mu.Lock()
		defer m.mu.Unlock()
		m.updateCollection(coll, collectionName)
X
Xiaofan 已提交
155
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
Z
zhenshan.cao 已提交
156 157
		collInfo = m.collInfo[collectionName]
		return collInfo.collID, nil
158
	}
Z
zhenshan.cao 已提交
159
	defer m.mu.RUnlock()
X
Xiaofan 已提交
160
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionID", metrics.CacheHitLabel).Inc()
Z
zhenshan.cao 已提交
161

G
godchen 已提交
162
	return collInfo.collID, nil
163 164
}

165 166
// GetCollectionInfo returns the collection information related to provided collection name
// If the information is not found, proxy will try to fetch information for other source (RootCoord for now)
167 168 169 170 171 172 173
func (m *MetaCache) GetCollectionInfo(ctx context.Context, collectionName string) (*collectionInfo, error) {
	m.mu.RLock()
	var collInfo *collectionInfo
	collInfo, ok := m.collInfo[collectionName]
	m.mu.RUnlock()

	if !ok {
174
		tr := timerecord.NewTimeRecorder("UpdateCache")
X
Xiaofan 已提交
175
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheMissLabel).Inc()
176 177 178 179 180 181 182
		coll, err := m.describeCollection(ctx, collectionName)
		if err != nil {
			return nil, err
		}
		m.mu.Lock()
		m.updateCollection(coll, collectionName)
		collInfo = m.collInfo[collectionName]
183
		m.mu.Unlock()
X
Xiaofan 已提交
184
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
185
	}
186

187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
	if !collInfo.isLoaded {
		// check if collection was loaded
		showResp, err := m.queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{
			Base: &commonpb.MsgBase{
				MsgType:  commonpb.MsgType_ShowCollections,
				SourceID: Params.ProxyCfg.GetNodeID(),
			},
		})
		if err != nil {
			return nil, err
		}
		if showResp.Status.ErrorCode != commonpb.ErrorCode_Success {
			return nil, errors.New(showResp.Status.Reason)
		}
		log.Debug("QueryCoord show collections",
			zap.Int64("collID", collInfo.collID),
			zap.Int64s("collections", showResp.GetCollectionIDs()),
			zap.Int64s("collectionsInMemoryPercentages", showResp.GetInMemoryPercentages()),
		)
		loaded := false
		for index, collID := range showResp.CollectionIDs {
			if collID == collInfo.collID && showResp.GetInMemoryPercentages()[index] >= int64(100) {
				loaded = true
				break
			}
		}
		if loaded {
			m.mu.Lock()
			m.collInfo[collectionName].isLoaded = true
			m.mu.Unlock()
		}
	}

X
Xiaofan 已提交
220
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheHitLabel).Inc()
221
	return collInfo, nil
222 223
}

Z
zhenshan.cao 已提交
224
func (m *MetaCache) GetCollectionSchema(ctx context.Context, collectionName string) (*schemapb.CollectionSchema, error) {
G
godchen 已提交
225 226
	m.mu.RLock()
	collInfo, ok := m.collInfo[collectionName]
Z
zhenshan.cao 已提交
227

G
godchen 已提交
228
	if !ok {
X
Xiaofan 已提交
229
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheMissLabel).Inc()
230
		tr := timerecord.NewTimeRecorder("UpdateCache")
Z
zhenshan.cao 已提交
231 232 233
		m.mu.RUnlock()
		coll, err := m.describeCollection(ctx, collectionName)
		if err != nil {
234 235 236
			log.Warn("Failed to load collection from rootcoord ",
				zap.String("collection name ", collectionName),
				zap.Error(err))
Z
zhenshan.cao 已提交
237 238 239 240 241 242
			return nil, err
		}
		m.mu.Lock()
		defer m.mu.Unlock()
		m.updateCollection(coll, collectionName)
		collInfo = m.collInfo[collectionName]
X
Xiaofan 已提交
243
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
244
		log.Debug("Reload collection from root coordinator ",
245
			zap.String("collection name ", collectionName),
246
			zap.Any("time (milliseconds) take ", tr.ElapseSpan().Milliseconds()))
Z
zhenshan.cao 已提交
247
		return collInfo.schema, nil
G
godchen 已提交
248
	}
Z
zhenshan.cao 已提交
249
	defer m.mu.RUnlock()
X
Xiaofan 已提交
250
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheHitLabel).Inc()
Z
zhenshan.cao 已提交
251

G
godchen 已提交
252 253 254
	return collInfo.schema, nil
}

Z
zhenshan.cao 已提交
255 256 257 258 259 260 261
func (m *MetaCache) updateCollection(coll *milvuspb.DescribeCollectionResponse, collectionName string) {
	_, ok := m.collInfo[collectionName]
	if !ok {
		m.collInfo[collectionName] = &collectionInfo{}
	}
	m.collInfo[collectionName].schema = coll.Schema
	m.collInfo[collectionName].collID = coll.CollectionID
262 263
	m.collInfo[collectionName].createdTimestamp = coll.CreatedTimestamp
	m.collInfo[collectionName].createdUtcTimestamp = coll.CreatedUtcTimestamp
Z
zhenshan.cao 已提交
264 265 266
}

func (m *MetaCache) GetPartitionID(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error) {
267
	partInfo, err := m.GetPartitionInfo(ctx, collectionName, partitionName)
Z
zhenshan.cao 已提交
268 269 270
	if err != nil {
		return 0, err
	}
271 272 273 274 275 276 277 278
	return partInfo.partitionID, nil
}

func (m *MetaCache) GetPartitions(ctx context.Context, collectionName string) (map[string]typeutil.UniqueID, error) {
	_, err := m.GetCollectionID(ctx, collectionName)
	if err != nil {
		return nil, err
	}
Z
zhenshan.cao 已提交
279

G
godchen 已提交
280 281
	m.mu.RLock()

C
cai.zhang 已提交
282 283
	collInfo, ok := m.collInfo[collectionName]
	if !ok {
Z
zhenshan.cao 已提交
284
		m.mu.RUnlock()
285
		return nil, fmt.Errorf("can't find collection name:%s", collectionName)
C
cai.zhang 已提交
286 287
	}

288
	if collInfo.partInfo == nil || len(collInfo.partInfo) == 0 {
289
		tr := timerecord.NewTimeRecorder("UpdateCache")
X
Xiaofan 已提交
290
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitions", metrics.CacheMissLabel).Inc()
291
		m.mu.RUnlock()
Z
zhenshan.cao 已提交
292 293 294

		partitions, err := m.showPartitions(ctx, collectionName)
		if err != nil {
295
			return nil, err
Z
zhenshan.cao 已提交
296 297 298 299
		}

		m.mu.Lock()
		defer m.mu.Unlock()
300

301 302 303 304
		err = m.updatePartitions(partitions, collectionName)
		if err != nil {
			return nil, err
		}
X
Xiaofan 已提交
305
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
306
		log.Debug("proxy", zap.Any("GetPartitions:partitions after update", partitions), zap.Any("collectionName", collectionName))
307
		ret := make(map[string]typeutil.UniqueID)
Z
zhenshan.cao 已提交
308
		partInfo := m.collInfo[collectionName].partInfo
309 310
		for k, v := range partInfo {
			ret[k] = v.partitionID
Z
zhenshan.cao 已提交
311
		}
312 313
		return ret, nil

G
godchen 已提交
314
	}
315
	defer m.mu.RUnlock()
X
Xiaofan 已提交
316
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitions", metrics.CacheHitLabel).Inc()
317 318 319 320 321 322 323 324

	ret := make(map[string]typeutil.UniqueID)
	partInfo := m.collInfo[collectionName].partInfo
	for k, v := range partInfo {
		ret[k] = v.partitionID
	}

	return ret, nil
G
godchen 已提交
325 326
}

327
func (m *MetaCache) GetPartitionInfo(ctx context.Context, collectionName string, partitionName string) (*partitionInfo, error) {
Z
zhenshan.cao 已提交
328 329 330
	_, err := m.GetCollectionID(ctx, collectionName)
	if err != nil {
		return nil, err
331
	}
D
dragondriver 已提交
332

Z
zhenshan.cao 已提交
333 334 335 336 337 338
	m.mu.RLock()

	collInfo, ok := m.collInfo[collectionName]
	if !ok {
		m.mu.RUnlock()
		return nil, fmt.Errorf("can't find collection name:%s", collectionName)
G
godchen 已提交
339
	}
Z
zhenshan.cao 已提交
340

341 342 343
	var partInfo *partitionInfo
	partInfo, ok = collInfo.partInfo[partitionName]
	m.mu.RUnlock()
Z
zhenshan.cao 已提交
344

345
	if !ok {
346
		tr := timerecord.NewTimeRecorder("UpdateCache")
X
Xiaofan 已提交
347
		metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheMissLabel).Inc()
Z
zhenshan.cao 已提交
348 349 350 351 352 353 354
		partitions, err := m.showPartitions(ctx, collectionName)
		if err != nil {
			return nil, err
		}

		m.mu.Lock()
		defer m.mu.Unlock()
355 356 357 358
		err = m.updatePartitions(partitions, collectionName)
		if err != nil {
			return nil, err
		}
X
Xiaofan 已提交
359
		metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds()))
360 361 362 363
		log.Debug("proxy", zap.Any("GetPartitionID:partitions after update", partitions), zap.Any("collectionName", collectionName))
		partInfo, ok = m.collInfo[collectionName].partInfo[partitionName]
		if !ok {
			return nil, fmt.Errorf("partitionID of partitionName:%s can not be find", partitionName)
Z
zhenshan.cao 已提交
364
		}
N
neza2017 已提交
365
	}
X
Xiaofan 已提交
366
	metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheHitLabel).Inc()
367 368 369 370 371
	return &partitionInfo{
		partitionID:         partInfo.partitionID,
		createdTimestamp:    partInfo.createdTimestamp,
		createdUtcTimestamp: partInfo.createdUtcTimestamp,
	}, nil
G
godchen 已提交
372 373
}

374
// Get the collection information from rootcoord.
Z
zhenshan.cao 已提交
375
func (m *MetaCache) describeCollection(ctx context.Context, collectionName string) (*milvuspb.DescribeCollectionResponse, error) {
G
godchen 已提交
376 377
	req := &milvuspb.DescribeCollectionRequest{
		Base: &commonpb.MsgBase{
378
			MsgType: commonpb.MsgType_DescribeCollection,
G
godchen 已提交
379 380 381
		},
		CollectionName: collectionName,
	}
382
	coll, err := m.rootCoord.DescribeCollection(ctx, req)
G
godchen 已提交
383 384 385
	if err != nil {
		return nil, err
	}
386
	if coll.Status.ErrorCode != commonpb.ErrorCode_Success {
S
sunby 已提交
387
		return nil, errors.New(coll.Status.Reason)
G
godchen 已提交
388
	}
389 390 391 392 393 394 395 396 397 398 399
	resp := &milvuspb.DescribeCollectionResponse{
		Status: coll.Status,
		Schema: &schemapb.CollectionSchema{
			Name:        coll.Schema.Name,
			Description: coll.Schema.Description,
			AutoID:      coll.Schema.AutoID,
			Fields:      make([]*schemapb.FieldSchema, 0),
		},
		CollectionID:         coll.CollectionID,
		VirtualChannelNames:  coll.VirtualChannelNames,
		PhysicalChannelNames: coll.PhysicalChannelNames,
400 401
		CreatedTimestamp:     coll.CreatedTimestamp,
		CreatedUtcTimestamp:  coll.CreatedUtcTimestamp,
402 403
	}
	for _, field := range coll.Schema.Fields {
404
		if field.FieldID >= common.StartOfUserFieldID {
405 406 407 408
			resp.Schema.Fields = append(resp.Schema.Fields, field)
		}
	}
	return resp, nil
409 410
}

Z
zhenshan.cao 已提交
411
func (m *MetaCache) showPartitions(ctx context.Context, collectionName string) (*milvuspb.ShowPartitionsResponse, error) {
G
godchen 已提交
412
	req := &milvuspb.ShowPartitionsRequest{
G
godchen 已提交
413
		Base: &commonpb.MsgBase{
414
			MsgType: commonpb.MsgType_ShowPartitions,
G
godchen 已提交
415 416 417
		},
		CollectionName: collectionName,
	}
Z
zhenshan.cao 已提交
418

419
	partitions, err := m.rootCoord.ShowPartitions(ctx, req)
G
godchen 已提交
420
	if err != nil {
Z
zhenshan.cao 已提交
421
		return nil, err
G
godchen 已提交
422
	}
423
	if partitions.Status.ErrorCode != commonpb.ErrorCode_Success {
Z
zhenshan.cao 已提交
424
		return nil, fmt.Errorf("%s", partitions.Status.Reason)
G
godchen 已提交
425
	}
B
bigsheeper 已提交
426

G
godchen 已提交
427
	if len(partitions.PartitionIDs) != len(partitions.PartitionNames) {
Z
zhenshan.cao 已提交
428
		return nil, fmt.Errorf("partition ids len: %d doesn't equal Partition name len %d",
G
godchen 已提交
429 430
			len(partitions.PartitionIDs), len(partitions.PartitionNames))
	}
C
cai.zhang 已提交
431

Z
zhenshan.cao 已提交
432 433 434
	return partitions, nil
}

435
func (m *MetaCache) updatePartitions(partitions *milvuspb.ShowPartitionsResponse, collectionName string) error {
C
cai.zhang 已提交
436 437 438
	_, ok := m.collInfo[collectionName]
	if !ok {
		m.collInfo[collectionName] = &collectionInfo{
439
			partInfo: map[string]*partitionInfo{},
C
cai.zhang 已提交
440 441 442
		}
	}
	partInfo := m.collInfo[collectionName].partInfo
G
godchen 已提交
443
	if partInfo == nil {
444
		partInfo = map[string]*partitionInfo{}
G
godchen 已提交
445
	}
446

447 448 449 450 451
	// check partitionID, createdTimestamp and utcstamp has sam element numbers
	if len(partitions.PartitionNames) != len(partitions.CreatedTimestamps) || len(partitions.PartitionNames) != len(partitions.CreatedUtcTimestamps) {
		return errors.New("partition names and timestamps number is not aligned, response " + partitions.String())
	}

G
godchen 已提交
452
	for i := 0; i < len(partitions.PartitionIDs); i++ {
453 454 455 456 457 458
		if _, ok := partInfo[partitions.PartitionNames[i]]; !ok {
			partInfo[partitions.PartitionNames[i]] = &partitionInfo{
				partitionID:         partitions.PartitionIDs[i],
				createdTimestamp:    partitions.CreatedTimestamps[i],
				createdUtcTimestamp: partitions.CreatedUtcTimestamps[i],
			}
G
godchen 已提交
459 460
		}
	}
Z
zhenshan.cao 已提交
461
	m.collInfo[collectionName].partInfo = partInfo
462
	return nil
463 464
}

G
godchen 已提交
465
func (m *MetaCache) RemoveCollection(ctx context.Context, collectionName string) {
G
godchen 已提交
466 467 468
	m.mu.Lock()
	defer m.mu.Unlock()
	delete(m.collInfo, collectionName)
469 470
}

471 472 473 474 475 476 477 478 479 480
func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) {
	m.mu.Lock()
	defer m.mu.Unlock()
	for k, v := range m.collInfo {
		if v.collID == collectionID {
			delete(m.collInfo, k)
		}
	}
}

G
godchen 已提交
481
func (m *MetaCache) RemovePartition(ctx context.Context, collectionName, partitionName string) {
G
godchen 已提交
482 483
	m.mu.Lock()
	defer m.mu.Unlock()
C
cai.zhang 已提交
484 485 486 487 488 489 490 491 492
	_, ok := m.collInfo[collectionName]
	if !ok {
		return
	}
	partInfo := m.collInfo[collectionName].partInfo
	if partInfo == nil {
		return
	}
	delete(partInfo, partitionName)
493
}
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509

// GetCredentialInfo returns the credential related to provided username
// If the cache missed, proxy will try to fetch from storage
func (m *MetaCache) GetCredentialInfo(ctx context.Context, username string) (*internalpb.CredentialInfo, error) {
	m.credMut.RLock()
	var credInfo *internalpb.CredentialInfo
	credInfo, ok := m.credMap[username]
	m.credMut.RUnlock()

	if !ok {
		req := &rootcoordpb.GetCredentialRequest{
			Base: &commonpb.MsgBase{
				MsgType: commonpb.MsgType_GetCredential,
			},
			Username: username,
		}
510
		resp, err := m.rootCoord.GetCredential(ctx, req)
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573
		if err != nil {
			return &internalpb.CredentialInfo{}, err
		}
		credInfo = &internalpb.CredentialInfo{
			Username:          resp.Username,
			EncryptedPassword: resp.Password,
		}
		m.UpdateCredential(credInfo)
	}

	return &internalpb.CredentialInfo{
		Username:          credInfo.Username,
		EncryptedPassword: credInfo.EncryptedPassword,
	}, nil
}

func (m *MetaCache) ClearCredUsers() {
	m.credMut.Lock()
	defer m.credMut.Unlock()
	// clear credUsernameList
	m.credUsernameList = nil
}

func (m *MetaCache) RemoveCredential(username string) {
	m.credMut.Lock()
	defer m.credMut.Unlock()
	// delete pair in credMap
	delete(m.credMap, username)
	// clear credUsernameList
	m.credUsernameList = nil
}

func (m *MetaCache) UpdateCredential(credInfo *internalpb.CredentialInfo) {
	m.credMut.Lock()
	defer m.credMut.Unlock()
	// update credMap
	username := credInfo.Username
	password := credInfo.EncryptedPassword
	_, ok := m.credMap[username]
	if !ok {
		m.credMap[username] = &internalpb.CredentialInfo{}
	}
	m.credMap[username].Username = username
	m.credMap[username].EncryptedPassword = password
}

func (m *MetaCache) UpdateCredUsersListCache(usernames []string) {
	m.credMut.Lock()
	defer m.credMut.Unlock()
	m.credUsernameList = usernames
}

func (m *MetaCache) GetCredUsernames(ctx context.Context) ([]string, error) {
	m.credMut.RLock()
	usernames := m.credUsernameList
	m.credMut.RUnlock()

	if usernames == nil {
		req := &milvuspb.ListCredUsersRequest{
			Base: &commonpb.MsgBase{
				MsgType: commonpb.MsgType_ListCredUsernames,
			},
		}
574
		resp, err := m.rootCoord.ListCredUsers(ctx, req)
575 576 577 578 579 580 581 582 583
		if err != nil {
			return nil, err
		}
		usernames = resp.Usernames
		m.UpdateCredUsersListCache(usernames)
	}

	return usernames, nil
}
584 585

// GetShards update cache if withCache == false
586
func (m *MetaCache) GetShards(ctx context.Context, withCache bool, collectionName string, qc types.QueryCoord) (map[string][]queryNode, error) {
587 588 589 590 591 592 593
	info, err := m.GetCollectionInfo(ctx, collectionName)
	if err != nil {
		return nil, err
	}

	if withCache {
		if len(info.shardLeaders) > 0 {
594 595
			info.leaderMutex.Lock()
			updateShardsWithRoundRobin(info.shardLeaders)
596

597 598
			shards := info.CloneShardLeaders()
			info.leaderMutex.Unlock()
599
			return shards, nil
600 601 602 603 604 605 606 607
		}
		log.Info("no shard cache for collection, try to get shard leaders from QueryCoord",
			zap.String("collectionName", collectionName))
	}

	req := &querypb.GetShardLeadersRequest{
		Base: &commonpb.MsgBase{
			MsgType:  commonpb.MsgType_GetShardLeaders,
X
Xiaofan 已提交
608
			SourceID: Params.ProxyCfg.GetNodeID(),
609 610 611
		},
		CollectionID: info.collID,
	}
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630

	// retry until service available or context timeout
	var resp *querypb.GetShardLeadersResponse
	childCtx, cancel := context.WithTimeout(ctx, time.Second*10)
	defer cancel()
	err = retry.Do(childCtx, func() error {
		resp, err = qc.GetShardLeaders(ctx, req)
		if err != nil {
			return retry.Unrecoverable(err)
		}
		if resp.Status.ErrorCode == commonpb.ErrorCode_Success {
			return nil
		}
		// do not retry unless got NoReplicaAvailable from querycoord
		if resp.Status.ErrorCode != commonpb.ErrorCode_NoReplicaAvailable {
			return retry.Unrecoverable(fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.Status.Reason))
		}
		return fmt.Errorf("fail to get shard leaders from QueryCoord: %s", resp.Status.Reason)
	})
631
	if err != nil {
632
		return nil, fmt.Errorf("GetShardLeaders timeout, error: %s", err.Error())
633 634
	}

635 636
	shards := parseShardLeaderList2QueryNode(resp.GetShards())

637 638 639 640 641 642 643 644
	// manipulate info in map, get map returns a copy of the information
	m.mu.RLock()
	defer m.mu.RUnlock()
	info = m.collInfo[collectionName]
	// lock leader
	info.leaderMutex.Lock()
	defer info.leaderMutex.Unlock()
	info.shardLeaders = shards
645

646
	return info.CloneShardLeaders(), nil
647
}
648

649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664
func parseShardLeaderList2QueryNode(shardsLeaders []*querypb.ShardLeadersList) map[string][]queryNode {
	shard2QueryNodes := make(map[string][]queryNode)

	for _, leaders := range shardsLeaders {
		qns := make([]queryNode, len(leaders.GetNodeIds()))

		for j := range qns {
			qns[j] = queryNode{leaders.GetNodeIds()[j], leaders.GetNodeAddrs()[j]}
		}

		shard2QueryNodes[leaders.GetChannelName()] = qns
	}

	return shard2QueryNodes
}

665 666 667 668 669 670 671 672 673 674 675 676 677
// ClearShards clear the shard leader cache of a collection
func (m *MetaCache) ClearShards(collectionName string) {
	log.Info("clearing shard cache for collection", zap.String("collectionName", collectionName))
	m.mu.Lock()
	defer m.mu.Unlock()
	_, ok := m.collInfo[collectionName]

	if !ok {
		return
	}

	m.collInfo[collectionName].shardLeaders = nil
}