提交 84b15c1f 编写于 作者: D dragondriver 提交者: yefu.chen

Fix the bug of meta cache in Proxy

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 c1fbc4f8
......@@ -31,6 +31,8 @@ ParseVecNode(Plan* plan, const Json& out_body) {
std::string field_name = iter.key();
auto& vec_info = iter.value();
auto topK = vec_info["topk"];
AssertInfo(topK > 0, "topK must greater than 0");
AssertInfo(topK < 16384, "topK is too large");
vec_node->query_info_.topK_ = topK;
vec_node->query_info_.metric_type_ = vec_info["metric_type"];
vec_node->query_info_.search_params_ = vec_info["params"];
......
......@@ -4,30 +4,26 @@ import (
"context"
"sync"
"github.com/zilliztech/milvus-distributed/internal/allocator"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
)
type Cache interface {
Hit(collectionName string) bool
Get(collectionName string) (*servicepb.CollectionDescription, error)
Update(collectionName string) error
Sync(collectionName string) error
Update(collectionName string, desc *servicepb.CollectionDescription) error
Remove(collectionName string) error
}
var globalMetaCache Cache
type SimpleMetaCache struct {
mu sync.RWMutex
proxyID UniqueID
metas map[string]*servicepb.CollectionDescription // collection name to schema
masterClient masterpb.MasterClient
reqIDAllocator *allocator.IDAllocator
tsoAllocator *allocator.TimestampAllocator
ctx context.Context
mu sync.RWMutex
metas map[string]*servicepb.CollectionDescription // collection name to schema
ctx context.Context
proxyInstance *Proxy
}
func (metaCache *SimpleMetaCache) Hit(collectionName string) bool {
......@@ -47,58 +43,34 @@ func (metaCache *SimpleMetaCache) Get(collectionName string) (*servicepb.Collect
return schema, nil
}
func (metaCache *SimpleMetaCache) Update(collectionName string) error {
reqID, err := metaCache.reqIDAllocator.AllocOne()
if err != nil {
return err
}
ts, err := metaCache.tsoAllocator.AllocOne()
if err != nil {
return err
}
hasCollectionReq := &internalpb.HasCollectionRequest{
MsgType: internalpb.MsgType_kHasCollection,
ReqID: reqID,
Timestamp: ts,
ProxyID: metaCache.proxyID,
CollectionName: &servicepb.CollectionName{
CollectionName: collectionName,
func (metaCache *SimpleMetaCache) Sync(collectionName string) error {
dct := &DescribeCollectionTask{
Condition: NewTaskCondition(metaCache.ctx),
DescribeCollectionRequest: internalpb.DescribeCollectionRequest{
MsgType: internalpb.MsgType_kDescribeCollection,
CollectionName: &servicepb.CollectionName{
CollectionName: collectionName,
},
},
masterClient: metaCache.proxyInstance.masterClient,
}
has, err := metaCache.masterClient.HasCollection(metaCache.ctx, hasCollectionReq)
if err != nil {
return err
}
if !has.Value {
return errors.New("collection " + collectionName + " not exists")
}
var cancel func()
dct.ctx, cancel = context.WithTimeout(metaCache.ctx, reqTimeoutInterval)
defer cancel()
reqID, err = metaCache.reqIDAllocator.AllocOne()
if err != nil {
return err
}
ts, err = metaCache.tsoAllocator.AllocOne()
if err != nil {
return err
}
req := &internalpb.DescribeCollectionRequest{
MsgType: internalpb.MsgType_kDescribeCollection,
ReqID: reqID,
Timestamp: ts,
ProxyID: metaCache.proxyID,
CollectionName: &servicepb.CollectionName{
CollectionName: collectionName,
},
}
resp, err := metaCache.masterClient.DescribeCollection(metaCache.ctx, req)
err := metaCache.proxyInstance.sched.DdQueue.Enqueue(dct)
if err != nil {
return err
}
return dct.WaitToFinish()
}
func (metaCache *SimpleMetaCache) Update(collectionName string, desc *servicepb.CollectionDescription) error {
metaCache.mu.Lock()
defer metaCache.mu.Unlock()
metaCache.metas[collectionName] = resp
metaCache.metas[collectionName] = desc
return nil
}
......@@ -115,23 +87,14 @@ func (metaCache *SimpleMetaCache) Remove(collectionName string) error {
return nil
}
func newSimpleMetaCache(ctx context.Context,
mCli masterpb.MasterClient,
idAllocator *allocator.IDAllocator,
tsoAllocator *allocator.TimestampAllocator) *SimpleMetaCache {
func newSimpleMetaCache(ctx context.Context, proxyInstance *Proxy) *SimpleMetaCache {
return &SimpleMetaCache{
metas: make(map[string]*servicepb.CollectionDescription),
masterClient: mCli,
reqIDAllocator: idAllocator,
tsoAllocator: tsoAllocator,
proxyID: Params.ProxyID(),
ctx: ctx,
metas: make(map[string]*servicepb.CollectionDescription),
proxyInstance: proxyInstance,
ctx: ctx,
}
}
func initGlobalMetaCache(ctx context.Context,
mCli masterpb.MasterClient,
idAllocator *allocator.IDAllocator,
tsoAllocator *allocator.TimestampAllocator) {
globalMetaCache = newSimpleMetaCache(ctx, mCli, idAllocator, tsoAllocator)
func initGlobalMetaCache(ctx context.Context, proxyInstance *Proxy) {
globalMetaCache = newSimpleMetaCache(ctx, proxyInstance)
}
......@@ -109,7 +109,7 @@ func (p *Proxy) startProxy() error {
if err != nil {
return err
}
initGlobalMetaCache(p.proxyLoopCtx, p.masterClient, p.idAllocator, p.tsoAllocator)
initGlobalMetaCache(p.proxyLoopCtx, p)
p.manipulationMsgStream.Start()
p.queryMsgStream.Start()
p.sched.Start()
......
......@@ -91,7 +91,7 @@ func (it *InsertTask) PreExecute() error {
func (it *InsertTask) Execute() error {
collectionName := it.BaseInsertTask.CollectionName
if !globalMetaCache.Hit(collectionName) {
err := globalMetaCache.Update(collectionName)
err := globalMetaCache.Sync(collectionName)
if err != nil {
return err
}
......@@ -352,7 +352,7 @@ func (qt *QueryTask) SetTs(ts Timestamp) {
func (qt *QueryTask) PreExecute() error {
collectionName := qt.query.CollectionName
if !globalMetaCache.Hit(collectionName) {
err := globalMetaCache.Update(collectionName)
err := globalMetaCache.Sync(collectionName)
if err != nil {
return err
}
......@@ -605,14 +605,9 @@ func (dct *DescribeCollectionTask) PreExecute() error {
}
func (dct *DescribeCollectionTask) Execute() error {
if !globalMetaCache.Hit(dct.CollectionName.CollectionName) {
err := globalMetaCache.Update(dct.CollectionName.CollectionName)
if err != nil {
return err
}
}
var err error
dct.result, err = globalMetaCache.Get(dct.CollectionName.CollectionName)
dct.result, err = dct.masterClient.DescribeCollection(dct.ctx, &dct.DescribeCollectionRequest)
globalMetaCache.Update(dct.CollectionName.CollectionName, dct.result)
return err
}
......
......@@ -29,7 +29,7 @@ func createPlan(col Collection, dsl string) (*Plan, error) {
if errorCode != 0 {
errorMsg := C.GoString(status.error_msg)
defer C.free(unsafe.Pointer(status.error_msg))
return nil, errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
return nil, errors.New("Create plan failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
}
var newPlan = &Plan{cPlan: cPlan}
......@@ -60,7 +60,7 @@ func parserPlaceholderGroup(plan *Plan, placeHolderBlob []byte) (*PlaceholderGro
if errorCode != 0 {
errorMsg := C.GoString(status.error_msg)
defer C.free(unsafe.Pointer(status.error_msg))
return nil, errors.New("Insert failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
return nil, errors.New("Parser placeholder group failed, C runtime error detected, error code = " + strconv.Itoa(int(errorCode)) + ", error msg = " + errorMsg)
}
var newPlaceholderGroup = &PlaceholderGroup{cPlaceholderGroup: cPlaceholderGroup}
......
......@@ -139,7 +139,7 @@ func (ss *searchService) receiveSearchMsg() {
err := ss.search(msg)
if err != nil {
log.Println(err)
err = ss.publishFailedSearchResult(msg)
err = ss.publishFailedSearchResult(msg, err.Error())
if err != nil {
log.Println("publish FailedSearchResult failed, error message: ", err)
}
......@@ -191,7 +191,7 @@ func (ss *searchService) doUnsolvedMsgSearch() {
err := ss.search(msg)
if err != nil {
log.Println(err)
err = ss.publishFailedSearchResult(msg)
err = ss.publishFailedSearchResult(msg, err.Error())
if err != nil {
log.Println("publish FailedSearchResult failed, error message: ", err)
}
......@@ -346,7 +346,7 @@ func (ss *searchService) publishSearchResult(msg msgstream.TsMsg) error {
return nil
}
func (ss *searchService) publishFailedSearchResult(msg msgstream.TsMsg) error {
func (ss *searchService) publishFailedSearchResult(msg msgstream.TsMsg, errMsg string) error {
msgPack := msgstream.MsgPack{}
searchMsg, ok := msg.(*msgstream.SearchMsg)
if !ok {
......@@ -354,7 +354,7 @@ func (ss *searchService) publishFailedSearchResult(msg msgstream.TsMsg) error {
}
var results = internalpb.SearchResult{
MsgType: internalpb.MsgType_kSearchResult,
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: errMsg},
ReqID: searchMsg.ReqID,
ProxyID: searchMsg.ProxyID,
QueryNodeID: searchMsg.ProxyID,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册