未验证 提交 619c4393 编写于 作者: X Xiaofan 提交者: GitHub

Refine logs in proxy search path(#7357) (#7753)

Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
上级 b4dd7b09
......@@ -52,4 +52,5 @@ require (
replace (
github.com/apache/pulsar-client-go => github.com/apache/pulsar-client-go v0.5.0
google.golang.org/grpc => google.golang.org/grpc v1.38.0
github.com/keybase/go-keychain => github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4
)
......@@ -16,6 +16,7 @@ import (
"errors"
"fmt"
"sync"
"time"
"go.uber.org/zap"
......@@ -129,15 +130,22 @@ func (m *MetaCache) GetCollectionSchema(ctx context.Context, collectionName stri
collInfo, ok := m.collInfo[collectionName]
if !ok {
t0 := time.Now()
m.mu.RUnlock()
coll, err := m.describeCollection(ctx, collectionName)
if err != nil {
log.Warn("Failed to load collection from rootcoord ",
zap.String("collection name ", collectionName),
zap.Error(err))
return nil, err
}
m.mu.Lock()
defer m.mu.Unlock()
m.updateCollection(coll, collectionName)
collInfo = m.collInfo[collectionName]
log.Debug("Reload collection from rootcoord ",
zap.String("collection name ", collectionName),
zap.Any("time take ", time.Since(t0)))
return collInfo.schema, nil
}
defer m.mu.RUnlock()
......@@ -189,8 +197,11 @@ func (m *MetaCache) GetPartitions(ctx context.Context, collectionName string) (m
m.mu.Lock()
defer m.mu.Unlock()
m.updatePartitions(partitions, collectionName)
err = m.updatePartitions(partitions, collectionName)
if err != nil {
return nil, err
}
log.Debug("proxy", zap.Any("GetPartitions:partitions after update", partitions), zap.Any("collectionName", collectionName))
ret := make(map[string]typeutil.UniqueID)
partInfo := m.collInfo[collectionName].partInfo
for k, v := range partInfo {
......@@ -236,8 +247,10 @@ func (m *MetaCache) GetPartitionInfo(ctx context.Context, collectionName string,
m.mu.Lock()
defer m.mu.Unlock()
log.Debug("proxy", zap.Any("GetPartitionID:partitions before update", partitions), zap.Any("collectionName", collectionName))
m.updatePartitions(partitions, collectionName)
err = m.updatePartitions(partitions, collectionName)
if err != nil {
return nil, err
}
log.Debug("proxy", zap.Any("GetPartitionID:partitions after update", partitions), zap.Any("collectionName", collectionName))
partInfo, ok = m.collInfo[collectionName].partInfo[partitionName]
......@@ -312,7 +325,7 @@ func (m *MetaCache) showPartitions(ctx context.Context, collectionName string) (
return partitions, nil
}
func (m *MetaCache) updatePartitions(partitions *milvuspb.ShowPartitionsResponse, collectionName string) {
func (m *MetaCache) updatePartitions(partitions *milvuspb.ShowPartitionsResponse, collectionName string) error {
_, ok := m.collInfo[collectionName]
if !ok {
m.collInfo[collectionName] = &collectionInfo{
......@@ -324,6 +337,11 @@ func (m *MetaCache) updatePartitions(partitions *milvuspb.ShowPartitionsResponse
partInfo = map[string]*partitionInfo{}
}
// check partitionID, createdTimestamp and utcstamp has sam element numbers
if len(partitions.PartitionNames) != len(partitions.CreatedTimestamps) || len(partitions.PartitionNames) != len(partitions.CreatedUtcTimestamps) {
return errors.New("partition names and timestamps number is not aligned, response " + partitions.String())
}
for i := 0; i < len(partitions.PartitionIDs); i++ {
if _, ok := partInfo[partitions.PartitionNames[i]]; !ok {
partInfo[partitions.PartitionNames[i]] = &partitionInfo{
......@@ -334,6 +352,7 @@ func (m *MetaCache) updatePartitions(partitions *milvuspb.ShowPartitionsResponse
}
}
m.collInfo[collectionName].partInfo = partInfo
return nil
}
func (m *MetaCache) RemoveCollection(ctx context.Context, collectionName string) {
......
......@@ -11,44 +11,80 @@
package proxy
/*
import (
"context"
"errors"
"fmt"
"testing"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/stretchr/testify/assert"
)
type MockRootCoordClientInterface struct {
types.RootCoord
Error bool
AccessCount int
}
func (m *MockRootCoordClientInterface) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
if m.Error {
return nil, errors.New("mocked error")
}
if in.CollectionName == "collection1" {
return &milvuspb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
PartitionIDs: []typeutil.UniqueID{1, 2},
PartitionNames: []string{"par1", "par2"},
PartitionIDs: []typeutil.UniqueID{1, 2},
CreatedTimestamps: []uint64{100, 200},
CreatedUtcTimestamps: []uint64{100, 200},
PartitionNames: []string{"par1", "par2"},
}, nil
}
if in.CollectionName == "collection2" {
return &milvuspb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
PartitionIDs: []typeutil.UniqueID{3, 4},
CreatedTimestamps: []uint64{201, 202},
CreatedUtcTimestamps: []uint64{201, 202},
PartitionNames: []string{"par1", "par2"},
}, nil
}
if in.CollectionName == "errorCollection" {
return &milvuspb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
PartitionIDs: []typeutil.UniqueID{5, 6},
CreatedTimestamps: []uint64{201},
CreatedUtcTimestamps: []uint64{201},
PartitionNames: []string{"par1", "par2"},
}, nil
}
return &milvuspb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
PartitionIDs: []typeutil.UniqueID{},
PartitionNames: []string{},
PartitionIDs: []typeutil.UniqueID{},
CreatedTimestamps: []uint64{},
CreatedUtcTimestamps: []uint64{},
PartitionNames: []string{},
}, nil
}
func (m *MockRootCoordClientInterface) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
if m.Error {
return nil, errors.New("mocked error")
}
m.AccessCount++
if in.CollectionName == "collection1" {
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
......@@ -60,15 +96,40 @@ func (m *MockRootCoordClientInterface) DescribeCollection(ctx context.Context, i
},
}, nil
}
if in.CollectionName == "collection2" {
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
CollectionID: typeutil.UniqueID(2),
Schema: &schemapb.CollectionSchema{
AutoID: true,
},
}, nil
}
if in.CollectionName == "errorCollection" {
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
CollectionID: typeutil.UniqueID(3),
Schema: &schemapb.CollectionSchema{
AutoID: true,
},
}, nil
}
err := fmt.Errorf("can't find collection: " + in.CollectionName)
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
ErrorCode: commonpb.ErrorCode_CollectionNotExists,
Reason: "describe collection failed: " + err.Error(),
},
CollectionID: typeutil.UniqueID(0),
Schema: nil,
Schema: nil,
}, nil
}
//Simulate the cache path and the
func TestMetaCache_GetCollection(t *testing.T) {
ctx := context.Background()
client := &MockRootCoordClientInterface{}
......@@ -78,15 +139,82 @@ func TestMetaCache_GetCollection(t *testing.T) {
id, err := globalMetaCache.GetCollectionID(ctx, "collection1")
assert.Nil(t, err)
assert.Equal(t, id, typeutil.UniqueID(1))
assert.Equal(t, client.AccessCount, 1)
// should'nt be accessed to remote root coord.
schema, err := globalMetaCache.GetCollectionSchema(ctx, "collection1")
assert.Equal(t, client.AccessCount, 1)
assert.Nil(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
})
id, err = globalMetaCache.GetCollectionID(ctx, "collection2")
assert.NotNil(t, err)
assert.Equal(t, id, typeutil.UniqueID(0))
assert.Equal(t, client.AccessCount, 2)
assert.Nil(t, err)
assert.Equal(t, id, typeutil.UniqueID(2))
schema, err = globalMetaCache.GetCollectionSchema(ctx, "collection2")
assert.Equal(t, client.AccessCount, 2)
assert.Nil(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
})
// test to get from cache, this should trigger root request
id, err = globalMetaCache.GetCollectionID(ctx, "collection1")
assert.Equal(t, client.AccessCount, 2)
assert.Nil(t, err)
assert.Equal(t, id, typeutil.UniqueID(1))
schema, err = globalMetaCache.GetCollectionSchema(ctx, "collection1")
assert.Equal(t, client.AccessCount, 2)
assert.Nil(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
})
}
func TestMetaCache_GetCollectionFailure(t *testing.T) {
ctx := context.Background()
client := &MockRootCoordClientInterface{}
err := InitMetaCache(client)
assert.Nil(t, err)
client.Error = true
schema, err := globalMetaCache.GetCollectionSchema(ctx, "collection1")
assert.NotNil(t, err)
assert.Nil(t, schema)
client.Error = false
schema, err = globalMetaCache.GetCollectionSchema(ctx, "collection1")
assert.Nil(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
})
client.Error = true
// should be cached with no error
assert.Nil(t, err)
assert.Equal(t, schema, &schemapb.CollectionSchema{
AutoID: true,
Fields: []*schemapb.FieldSchema{},
})
}
func TestMetaCache_GetNonExistCollection(t *testing.T) {
ctx := context.Background()
client := &MockRootCoordClientInterface{}
err := InitMetaCache(client)
assert.Nil(t, err)
id, err := globalMetaCache.GetCollectionID(ctx, "collection3")
assert.NotNil(t, err)
assert.Equal(t, id, int64(0))
schema, err := globalMetaCache.GetCollectionSchema(ctx, "collection3")
assert.NotNil(t, err)
assert.Nil(t, schema)
}
......@@ -103,14 +231,40 @@ func TestMetaCache_GetPartitionID(t *testing.T) {
id, err = globalMetaCache.GetPartitionID(ctx, "collection1", "par2")
assert.Nil(t, err)
assert.Equal(t, id, typeutil.UniqueID(2))
id, err = globalMetaCache.GetPartitionID(ctx, "collection1", "par3")
id, err = globalMetaCache.GetPartitionID(ctx, "collection2", "par1")
assert.Nil(t, err)
assert.Equal(t, id, typeutil.UniqueID(3))
id, err = globalMetaCache.GetPartitionID(ctx, "collection2", "par2")
assert.Nil(t, err)
assert.Equal(t, id, typeutil.UniqueID(4))
}
func TestMetaCache_GetPartitionError(t *testing.T) {
ctx := context.Background()
client := &MockRootCoordClientInterface{}
err := InitMetaCache(client)
assert.Nil(t, err)
// Test the case where ShowPartitionsResponse is not aligned
id, err := globalMetaCache.GetPartitionID(ctx, "errorCollection", "par1")
assert.NotNil(t, err)
log.Debug(err.Error())
assert.Equal(t, id, typeutil.UniqueID(0))
id, err = globalMetaCache.GetPartitionID(ctx, "collection2", "par3")
partitions, err2 := globalMetaCache.GetPartitions(ctx, "errorCollection")
assert.NotNil(t, err2)
log.Debug(err.Error())
assert.Equal(t, len(partitions), 0)
// Test non existed tables
id, err = globalMetaCache.GetPartitionID(ctx, "nonExisted", "par1")
assert.NotNil(t, err)
log.Debug(err.Error())
assert.Equal(t, id, typeutil.UniqueID(0))
id, err = globalMetaCache.GetPartitionID(ctx, "collection2", "par4")
// Test non existed partition
id, err = globalMetaCache.GetPartitionID(ctx, "collection1", "par3")
assert.NotNil(t, err)
log.Debug(err.Error())
assert.Equal(t, id, typeutil.UniqueID(0))
}
*/
......@@ -1957,16 +1957,17 @@ func (st *searchTask) PostExecute(ctx context.Context) error {
}
availableQueryNodeNum := len(filterSearchResult)
log.Debug("Proxy Search PostExecute stage1", zap.Any("availableQueryNodeNum", availableQueryNodeNum))
log.Debug("Proxy Search PostExecute stage1",
zap.Any("availableQueryNodeNum", availableQueryNodeNum),
zap.Any("time cost", time.Since(t0)))
if availableQueryNodeNum <= 0 {
log.Debug("Proxy Search PostExecute failed", zap.Any("filterReason", filterReason))
st.result = &milvuspb.SearchResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: filterReason,
},
}
return errors.New(filterReason)
return fmt.Errorf("No Available Query node result, filter reason %s: id %d", filterReason, st.ID())
}
availableQueryNodeNum = 0
......@@ -1996,7 +1997,6 @@ func (st *searchTask) PostExecute(ctx context.Context) error {
}
results, err := decodeSearchResults(filterSearchResult)
log.Debug("Proxy Search PostExecute decodeSearchResults", zap.Error(err))
if err != nil {
return err
}
......@@ -2022,7 +2022,6 @@ func (st *searchTask) PostExecute(ctx context.Context) error {
}
}
}
log.Debug("Proxy Search PostExecute Done")
return nil
}
}
......
package proxy
import (
"context"
"fmt"
"strconv"
"testing"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/stretchr/testify/assert"
)
......@@ -577,3 +577,108 @@ func TestTranslateOutputFields(t *testing.T) {
assert.Equal(t, nil, err)
assert.ElementsMatch(t, []string{idFieldName, floatVectorFieldName, binaryVectorFieldName}, outputFields)
}
func TestSearchTask(t *testing.T) {
ctx := context.Background()
ctxCancel, cancel := context.WithCancel(ctx)
qt := &searchTask{
ctx: ctxCancel,
Condition: NewTaskCondition(context.TODO()),
SearchRequest: &internalpb.SearchRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Search,
SourceID: Params.ProxyID,
},
ResultChannelID: strconv.FormatInt(Params.ProxyID, 10),
},
resultBuf: make(chan []*internalpb.SearchResults),
query: nil,
chMgr: nil,
qc: nil,
}
// no result
go func() {
qt.resultBuf <- []*internalpb.SearchResults{}
}()
err := qt.PostExecute(context.TODO())
assert.NotNil(t, err)
// test trace context done
cancel()
err = qt.PostExecute(context.TODO())
assert.NotNil(t, err)
// error result
ctx = context.Background()
qt = &searchTask{
ctx: ctx,
Condition: NewTaskCondition(context.TODO()),
SearchRequest: &internalpb.SearchRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Search,
SourceID: Params.ProxyID,
},
ResultChannelID: strconv.FormatInt(Params.ProxyID, 10),
},
resultBuf: make(chan []*internalpb.SearchResults),
query: nil,
chMgr: nil,
qc: nil,
}
// no result
go func() {
result := internalpb.SearchResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "test",
},
}
results := make([]*internalpb.SearchResults, 1)
results[0] = &result
qt.resultBuf <- results
}()
err = qt.PostExecute(context.TODO())
assert.NotNil(t, err)
log.Debug("PostExecute failed" + err.Error())
// check result SlicedBlob
ctx = context.Background()
qt = &searchTask{
ctx: ctx,
Condition: NewTaskCondition(context.TODO()),
SearchRequest: &internalpb.SearchRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Search,
SourceID: Params.ProxyID,
},
ResultChannelID: strconv.FormatInt(Params.ProxyID, 10),
},
resultBuf: make(chan []*internalpb.SearchResults),
query: nil,
chMgr: nil,
qc: nil,
}
// no result
go func() {
result := internalpb.SearchResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "test",
},
SlicedBlob: nil,
}
results := make([]*internalpb.SearchResults, 1)
results[0] = &result
qt.resultBuf <- results
}()
err = qt.PostExecute(context.TODO())
assert.Nil(t, err)
assert.Equal(t, qt.result.Status.ErrorCode, commonpb.ErrorCode_Success)
// TODO, add decode result, reduce result test
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册