meta_cache.go 10.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

C
Cai Yudong 已提交
12
package proxy
13 14

import (
G
godchen 已提交
15
	"context"
S
sunby 已提交
16 17
	"errors"
	"fmt"
18 19
	"sync"

Z
zhenshan.cao 已提交
20 21
	"go.uber.org/zap"

22
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
23 24 25 26 27
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/typeutil"
28 29
)

30
type Cache interface {
G
godchen 已提交
31
	GetCollectionID(ctx context.Context, collectionName string) (typeutil.UniqueID, error)
32
	GetCollectionInfo(ctx context.Context, collectionName string) (*collectionInfo, error)
G
godchen 已提交
33
	GetPartitionID(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error)
Z
zhenshan.cao 已提交
34
	GetPartitions(ctx context.Context, collectionName string) (map[string]typeutil.UniqueID, error)
35
	GetPartitionInfo(ctx context.Context, collectionName string, partitionName string) (*partitionInfo, error)
G
godchen 已提交
36 37 38
	GetCollectionSchema(ctx context.Context, collectionName string) (*schemapb.CollectionSchema, error)
	RemoveCollection(ctx context.Context, collectionName string)
	RemovePartition(ctx context.Context, collectionName string, partitionName string)
G
godchen 已提交
39 40 41
}

type collectionInfo struct {
42 43 44 45 46 47 48 49 50 51 52
	collID              typeutil.UniqueID
	schema              *schemapb.CollectionSchema
	partInfo            map[string]*partitionInfo
	createdTimestamp    uint64
	createdUtcTimestamp uint64
}

type partitionInfo struct {
	partitionID         typeutil.UniqueID
	createdTimestamp    uint64
	createdUtcTimestamp uint64
G
godchen 已提交
53 54 55
}

type MetaCache struct {
56
	client types.RootCoord
G
godchen 已提交
57 58 59

	collInfo map[string]*collectionInfo
	mu       sync.RWMutex
60 61
}

62
var globalMetaCache Cache
63

64
func InitMetaCache(client types.RootCoord) error {
G
godchen 已提交
65 66 67 68 69 70
	var err error
	globalMetaCache, err = NewMetaCache(client)
	if err != nil {
		return err
	}
	return nil
71 72
}

73
func NewMetaCache(client types.RootCoord) (*MetaCache, error) {
G
godchen 已提交
74 75 76 77
	return &MetaCache{
		client:   client,
		collInfo: map[string]*collectionInfo{},
	}, nil
78 79
}

Z
zhenshan.cao 已提交
80
func (m *MetaCache) GetCollectionID(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
G
godchen 已提交
81 82
	m.mu.RLock()
	collInfo, ok := m.collInfo[collectionName]
Z
zhenshan.cao 已提交
83

84
	if !ok {
Z
zhenshan.cao 已提交
85 86 87 88 89 90 91 92 93 94
		m.mu.RUnlock()
		coll, err := m.describeCollection(ctx, collectionName)
		if err != nil {
			return 0, err
		}
		m.mu.Lock()
		defer m.mu.Unlock()
		m.updateCollection(coll, collectionName)
		collInfo = m.collInfo[collectionName]
		return collInfo.collID, nil
95
	}
Z
zhenshan.cao 已提交
96 97
	defer m.mu.RUnlock()

G
godchen 已提交
98
	return collInfo.collID, nil
99 100
}

101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
func (m *MetaCache) GetCollectionInfo(ctx context.Context, collectionName string) (*collectionInfo, error) {
	m.mu.RLock()
	var collInfo *collectionInfo
	collInfo, ok := m.collInfo[collectionName]
	m.mu.RUnlock()

	if !ok {
		coll, err := m.describeCollection(ctx, collectionName)
		if err != nil {
			return nil, err
		}
		m.mu.Lock()
		defer m.mu.Unlock()
		m.updateCollection(coll, collectionName)
		collInfo = m.collInfo[collectionName]
	}

	return &collectionInfo{
		collID:              collInfo.collID,
		schema:              collInfo.schema,
		partInfo:            collInfo.partInfo,
		createdTimestamp:    collInfo.createdTimestamp,
		createdUtcTimestamp: collInfo.createdUtcTimestamp,
	}, nil
}

Z
zhenshan.cao 已提交
127
func (m *MetaCache) GetCollectionSchema(ctx context.Context, collectionName string) (*schemapb.CollectionSchema, error) {
G
godchen 已提交
128 129
	m.mu.RLock()
	collInfo, ok := m.collInfo[collectionName]
Z
zhenshan.cao 已提交
130

G
godchen 已提交
131
	if !ok {
Z
zhenshan.cao 已提交
132 133 134 135 136 137 138 139 140 141
		m.mu.RUnlock()
		coll, err := m.describeCollection(ctx, collectionName)
		if err != nil {
			return nil, err
		}
		m.mu.Lock()
		defer m.mu.Unlock()
		m.updateCollection(coll, collectionName)
		collInfo = m.collInfo[collectionName]
		return collInfo.schema, nil
G
godchen 已提交
142
	}
Z
zhenshan.cao 已提交
143 144
	defer m.mu.RUnlock()

G
godchen 已提交
145 146 147
	return collInfo.schema, nil
}

Z
zhenshan.cao 已提交
148 149 150 151 152 153 154
func (m *MetaCache) updateCollection(coll *milvuspb.DescribeCollectionResponse, collectionName string) {
	_, ok := m.collInfo[collectionName]
	if !ok {
		m.collInfo[collectionName] = &collectionInfo{}
	}
	m.collInfo[collectionName].schema = coll.Schema
	m.collInfo[collectionName].collID = coll.CollectionID
155 156
	m.collInfo[collectionName].createdTimestamp = coll.CreatedTimestamp
	m.collInfo[collectionName].createdUtcTimestamp = coll.CreatedUtcTimestamp
Z
zhenshan.cao 已提交
157 158 159
}

func (m *MetaCache) GetPartitionID(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error) {
160
	partInfo, err := m.GetPartitionInfo(ctx, collectionName, partitionName)
Z
zhenshan.cao 已提交
161 162 163
	if err != nil {
		return 0, err
	}
164 165 166 167 168 169 170 171
	return partInfo.partitionID, nil
}

func (m *MetaCache) GetPartitions(ctx context.Context, collectionName string) (map[string]typeutil.UniqueID, error) {
	_, err := m.GetCollectionID(ctx, collectionName)
	if err != nil {
		return nil, err
	}
Z
zhenshan.cao 已提交
172

G
godchen 已提交
173 174
	m.mu.RLock()

C
cai.zhang 已提交
175 176
	collInfo, ok := m.collInfo[collectionName]
	if !ok {
Z
zhenshan.cao 已提交
177
		m.mu.RUnlock()
178
		return nil, fmt.Errorf("can't find collection name:%s", collectionName)
C
cai.zhang 已提交
179 180
	}

181 182
	if collInfo.partInfo == nil || len(collInfo.partInfo) == 0 {
		m.mu.RUnlock()
Z
zhenshan.cao 已提交
183 184 185

		partitions, err := m.showPartitions(ctx, collectionName)
		if err != nil {
186
			return nil, err
Z
zhenshan.cao 已提交
187 188 189 190
		}

		m.mu.Lock()
		defer m.mu.Unlock()
191

Z
zhenshan.cao 已提交
192 193
		m.updatePartitions(partitions, collectionName)

194
		ret := make(map[string]typeutil.UniqueID)
Z
zhenshan.cao 已提交
195
		partInfo := m.collInfo[collectionName].partInfo
196 197
		for k, v := range partInfo {
			ret[k] = v.partitionID
Z
zhenshan.cao 已提交
198
		}
199 200
		return ret, nil

G
godchen 已提交
201
	}
202 203 204 205 206 207 208 209 210
	defer m.mu.RUnlock()

	ret := make(map[string]typeutil.UniqueID)
	partInfo := m.collInfo[collectionName].partInfo
	for k, v := range partInfo {
		ret[k] = v.partitionID
	}

	return ret, nil
G
godchen 已提交
211 212
}

213
func (m *MetaCache) GetPartitionInfo(ctx context.Context, collectionName string, partitionName string) (*partitionInfo, error) {
Z
zhenshan.cao 已提交
214 215 216
	_, err := m.GetCollectionID(ctx, collectionName)
	if err != nil {
		return nil, err
217
	}
D
dragondriver 已提交
218

Z
zhenshan.cao 已提交
219 220 221 222 223 224
	m.mu.RLock()

	collInfo, ok := m.collInfo[collectionName]
	if !ok {
		m.mu.RUnlock()
		return nil, fmt.Errorf("can't find collection name:%s", collectionName)
G
godchen 已提交
225
	}
Z
zhenshan.cao 已提交
226

227 228 229
	var partInfo *partitionInfo
	partInfo, ok = collInfo.partInfo[partitionName]
	m.mu.RUnlock()
Z
zhenshan.cao 已提交
230

231
	if !ok {
Z
zhenshan.cao 已提交
232 233 234 235 236 237 238
		partitions, err := m.showPartitions(ctx, collectionName)
		if err != nil {
			return nil, err
		}

		m.mu.Lock()
		defer m.mu.Unlock()
239
		log.Debug("proxy", zap.Any("GetPartitionID:partitions before update", partitions), zap.Any("collectionName", collectionName))
Z
zhenshan.cao 已提交
240
		m.updatePartitions(partitions, collectionName)
241
		log.Debug("proxy", zap.Any("GetPartitionID:partitions after update", partitions), zap.Any("collectionName", collectionName))
Z
zhenshan.cao 已提交
242

243 244 245
		partInfo, ok = m.collInfo[collectionName].partInfo[partitionName]
		if !ok {
			return nil, fmt.Errorf("partitionID of partitionName:%s can not be find", partitionName)
Z
zhenshan.cao 已提交
246
		}
N
neza2017 已提交
247
	}
248 249 250 251 252
	return &partitionInfo{
		partitionID:         partInfo.partitionID,
		createdTimestamp:    partInfo.createdTimestamp,
		createdUtcTimestamp: partInfo.createdUtcTimestamp,
	}, nil
G
godchen 已提交
253 254
}

Z
zhenshan.cao 已提交
255
func (m *MetaCache) describeCollection(ctx context.Context, collectionName string) (*milvuspb.DescribeCollectionResponse, error) {
G
godchen 已提交
256 257
	req := &milvuspb.DescribeCollectionRequest{
		Base: &commonpb.MsgBase{
258
			MsgType: commonpb.MsgType_DescribeCollection,
G
godchen 已提交
259 260 261
		},
		CollectionName: collectionName,
	}
G
godchen 已提交
262
	coll, err := m.client.DescribeCollection(ctx, req)
G
godchen 已提交
263 264 265
	if err != nil {
		return nil, err
	}
266
	if coll.Status.ErrorCode != commonpb.ErrorCode_Success {
S
sunby 已提交
267
		return nil, errors.New(coll.Status.Reason)
G
godchen 已提交
268
	}
269 270 271 272 273 274 275 276 277 278 279
	resp := &milvuspb.DescribeCollectionResponse{
		Status: coll.Status,
		Schema: &schemapb.CollectionSchema{
			Name:        coll.Schema.Name,
			Description: coll.Schema.Description,
			AutoID:      coll.Schema.AutoID,
			Fields:      make([]*schemapb.FieldSchema, 0),
		},
		CollectionID:         coll.CollectionID,
		VirtualChannelNames:  coll.VirtualChannelNames,
		PhysicalChannelNames: coll.PhysicalChannelNames,
280 281
		CreatedTimestamp:     coll.CreatedTimestamp,
		CreatedUtcTimestamp:  coll.CreatedUtcTimestamp,
282 283 284 285 286 287 288
	}
	for _, field := range coll.Schema.Fields {
		if field.FieldID >= 100 { // TODO(dragondriver): use StartOfUserField to replace 100
			resp.Schema.Fields = append(resp.Schema.Fields, field)
		}
	}
	return resp, nil
289 290
}

Z
zhenshan.cao 已提交
291
func (m *MetaCache) showPartitions(ctx context.Context, collectionName string) (*milvuspb.ShowPartitionsResponse, error) {
G
godchen 已提交
292
	req := &milvuspb.ShowPartitionsRequest{
G
godchen 已提交
293
		Base: &commonpb.MsgBase{
294
			MsgType: commonpb.MsgType_ShowPartitions,
G
godchen 已提交
295 296 297
		},
		CollectionName: collectionName,
	}
Z
zhenshan.cao 已提交
298

G
godchen 已提交
299
	partitions, err := m.client.ShowPartitions(ctx, req)
G
godchen 已提交
300
	if err != nil {
Z
zhenshan.cao 已提交
301
		return nil, err
G
godchen 已提交
302
	}
303
	if partitions.Status.ErrorCode != commonpb.ErrorCode_Success {
Z
zhenshan.cao 已提交
304
		return nil, fmt.Errorf("%s", partitions.Status.Reason)
G
godchen 已提交
305
	}
B
bigsheeper 已提交
306

G
godchen 已提交
307
	if len(partitions.PartitionIDs) != len(partitions.PartitionNames) {
Z
zhenshan.cao 已提交
308
		return nil, fmt.Errorf("partition ids len: %d doesn't equal Partition name len %d",
G
godchen 已提交
309 310
			len(partitions.PartitionIDs), len(partitions.PartitionNames))
	}
C
cai.zhang 已提交
311

Z
zhenshan.cao 已提交
312 313 314 315
	return partitions, nil
}

func (m *MetaCache) updatePartitions(partitions *milvuspb.ShowPartitionsResponse, collectionName string) {
C
cai.zhang 已提交
316 317 318
	_, ok := m.collInfo[collectionName]
	if !ok {
		m.collInfo[collectionName] = &collectionInfo{
319
			partInfo: map[string]*partitionInfo{},
C
cai.zhang 已提交
320 321 322
		}
	}
	partInfo := m.collInfo[collectionName].partInfo
G
godchen 已提交
323
	if partInfo == nil {
324
		partInfo = map[string]*partitionInfo{}
G
godchen 已提交
325
	}
326

G
godchen 已提交
327
	for i := 0; i < len(partitions.PartitionIDs); i++ {
328 329 330 331 332 333
		if _, ok := partInfo[partitions.PartitionNames[i]]; !ok {
			partInfo[partitions.PartitionNames[i]] = &partitionInfo{
				partitionID:         partitions.PartitionIDs[i],
				createdTimestamp:    partitions.CreatedTimestamps[i],
				createdUtcTimestamp: partitions.CreatedUtcTimestamps[i],
			}
G
godchen 已提交
334 335
		}
	}
Z
zhenshan.cao 已提交
336
	m.collInfo[collectionName].partInfo = partInfo
337 338
}

G
godchen 已提交
339
func (m *MetaCache) RemoveCollection(ctx context.Context, collectionName string) {
G
godchen 已提交
340 341 342
	m.mu.Lock()
	defer m.mu.Unlock()
	delete(m.collInfo, collectionName)
343 344
}

G
godchen 已提交
345
func (m *MetaCache) RemovePartition(ctx context.Context, collectionName, partitionName string) {
G
godchen 已提交
346 347
	m.mu.Lock()
	defer m.mu.Unlock()
C
cai.zhang 已提交
348 349 350 351 352 353 354 355 356
	_, ok := m.collInfo[collectionName]
	if !ok {
		return
	}
	partInfo := m.collInfo[collectionName].partInfo
	if partInfo == nil {
		return
	}
	delete(partInfo, partitionName)
357
}