meta_cache.go 3.6 KB
Newer Older
1 2 3 4 5 6
package proxy

import (
	"context"
	"sync"

C
cai.zhang 已提交
7
	"github.com/zilliztech/milvus-distributed/internal/allocator"
8 9
	"github.com/zilliztech/milvus-distributed/internal/errors"
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
C
cai.zhang 已提交
10
	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
11 12 13
	"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
)

14
type Cache interface {
15 16
	Hit(collectionName string) bool
	Get(collectionName string) (*servicepb.CollectionDescription, error)
C
cai.zhang 已提交
17
	Update(collectionName string) error
18
	Remove(collectionName string) error
19 20
}

21
var globalMetaCache Cache
22 23

type SimpleMetaCache struct {
C
cai.zhang 已提交
24 25 26 27 28 29 30
	mu             sync.RWMutex
	proxyID        UniqueID
	metas          map[string]*servicepb.CollectionDescription // collection name to schema
	masterClient   masterpb.MasterClient
	reqIDAllocator *allocator.IDAllocator
	tsoAllocator   *allocator.TimestampAllocator
	ctx            context.Context
31 32
}

33 34 35 36
func (metaCache *SimpleMetaCache) Hit(collectionName string) bool {
	metaCache.mu.RLock()
	defer metaCache.mu.RUnlock()
	_, ok := metaCache.metas[collectionName]
37 38 39
	return ok
}

40 41 42 43
func (metaCache *SimpleMetaCache) Get(collectionName string) (*servicepb.CollectionDescription, error) {
	metaCache.mu.RLock()
	defer metaCache.mu.RUnlock()
	schema, ok := metaCache.metas[collectionName]
44 45 46 47 48 49
	if !ok {
		return nil, errors.New("collection meta miss")
	}
	return schema, nil
}

C
cai.zhang 已提交
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
func (metaCache *SimpleMetaCache) Update(collectionName string) error {
	reqID, err := metaCache.reqIDAllocator.AllocOne()
	if err != nil {
		return err
	}
	ts, err := metaCache.tsoAllocator.AllocOne()
	if err != nil {
		return err
	}
	hasCollectionReq := &internalpb.HasCollectionRequest{
		MsgType:   internalpb.MsgType_kHasCollection,
		ReqID:     reqID,
		Timestamp: ts,
		ProxyID:   metaCache.proxyID,
		CollectionName: &servicepb.CollectionName{
			CollectionName: collectionName,
66 67
		},
	}
C
cai.zhang 已提交
68
	has, err := metaCache.masterClient.HasCollection(metaCache.ctx, hasCollectionReq)
69 70 71
	if err != nil {
		return err
	}
C
cai.zhang 已提交
72 73 74
	if !has.Value {
		return errors.New("collection " + collectionName + " not exists")
	}
75

C
cai.zhang 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
	reqID, err = metaCache.reqIDAllocator.AllocOne()
	if err != nil {
		return err
	}
	ts, err = metaCache.tsoAllocator.AllocOne()
	if err != nil {
		return err
	}
	req := &internalpb.DescribeCollectionRequest{
		MsgType:   internalpb.MsgType_kDescribeCollection,
		ReqID:     reqID,
		Timestamp: ts,
		ProxyID:   metaCache.proxyID,
		CollectionName: &servicepb.CollectionName{
			CollectionName: collectionName,
		},
	}
	resp, err := metaCache.masterClient.DescribeCollection(metaCache.ctx, req)
	if err != nil {
		return err
	}
97

98 99
	metaCache.mu.Lock()
	defer metaCache.mu.Unlock()
C
cai.zhang 已提交
100
	metaCache.metas[collectionName] = resp
101 102 103 104 105 106 107 108 109 110 111 112 113

	return nil
}

func (metaCache *SimpleMetaCache) Remove(collectionName string) error {
	metaCache.mu.Lock()
	defer metaCache.mu.Unlock()

	_, ok := metaCache.metas[collectionName]
	if !ok {
		return errors.New("cannot find collection: " + collectionName)
	}
	delete(metaCache.metas, collectionName)
114 115 116 117

	return nil
}

C
cai.zhang 已提交
118 119 120 121
func newSimpleMetaCache(ctx context.Context,
	mCli masterpb.MasterClient,
	idAllocator *allocator.IDAllocator,
	tsoAllocator *allocator.TimestampAllocator) *SimpleMetaCache {
122
	return &SimpleMetaCache{
C
cai.zhang 已提交
123 124 125 126 127 128
		metas:          make(map[string]*servicepb.CollectionDescription),
		masterClient:   mCli,
		reqIDAllocator: idAllocator,
		tsoAllocator:   tsoAllocator,
		proxyID:        Params.ProxyID(),
		ctx:            ctx,
129 130 131
	}
}

C
cai.zhang 已提交
132 133 134 135 136
func initGlobalMetaCache(ctx context.Context,
	mCli masterpb.MasterClient,
	idAllocator *allocator.IDAllocator,
	tsoAllocator *allocator.TimestampAllocator) {
	globalMetaCache = newSimpleMetaCache(ctx, mCli, idAllocator, tsoAllocator)
137
}