未验证 提交 24f94112 编写于 作者: C Cai Yudong 提交者: GitHub

Optimize queryTask PostExecute (#10739)

Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 f5323de2
...@@ -1753,7 +1753,7 @@ func selectSearchResultData(dataArray []*schemapb.SearchResultData, offsets []in ...@@ -1753,7 +1753,7 @@ func selectSearchResultData(dataArray []*schemapb.SearchResultData, offsets []in
return sel return sel
} }
func copySearchResultData(dst *schemapb.SearchResultData, src *schemapb.SearchResultData, idx int64) error { func copySearchResultData(dst *schemapb.SearchResultData, src *schemapb.SearchResultData, idx int64) {
for i, fieldData := range src.FieldsData { for i, fieldData := range src.FieldsData {
switch fieldType := fieldData.Field.(type) { switch fieldType := fieldData.Field.(type) {
case *schemapb.FieldData_Scalars: case *schemapb.FieldData_Scalars:
...@@ -1829,7 +1829,6 @@ func copySearchResultData(dst *schemapb.SearchResultData, src *schemapb.SearchRe ...@@ -1829,7 +1829,6 @@ func copySearchResultData(dst *schemapb.SearchResultData, src *schemapb.SearchRe
} }
default: default:
log.Debug("Not supported field type", zap.String("field type", fieldData.Type.String())) log.Debug("Not supported field type", zap.String("field type", fieldData.Type.String()))
return fmt.Errorf("not supported field type: %s", fieldData.Type.String())
} }
case *schemapb.FieldData_Vectors: case *schemapb.FieldData_Vectors:
dim := fieldType.Vectors.Dim dim := fieldType.Vectors.Dim
...@@ -1867,11 +1866,9 @@ func copySearchResultData(dst *schemapb.SearchResultData, src *schemapb.SearchRe ...@@ -1867,11 +1866,9 @@ func copySearchResultData(dst *schemapb.SearchResultData, src *schemapb.SearchRe
} }
default: default:
log.Debug("Not supported field type", zap.String("field type", fieldData.Type.String())) log.Debug("Not supported field type", zap.String("field type", fieldData.Type.String()))
return fmt.Errorf("not supported field type: %s", fieldData.Type.String())
} }
} }
} }
return nil
} }
//func printSearchResultData(data *schemapb.SearchResultData, header string) { //func printSearchResultData(data *schemapb.SearchResultData, header string) {
...@@ -2035,11 +2032,10 @@ func (st *searchTask) PostExecute(ctx context.Context) error { ...@@ -2035,11 +2032,10 @@ func (st *searchTask) PostExecute(ctx context.Context) error {
} }
} }
availableQueryNodeNum := len(filterSearchResults)
log.Debug("Proxy Search PostExecute stage1", log.Debug("Proxy Search PostExecute stage1",
zap.Any("availableQueryNodeNum", availableQueryNodeNum)) zap.Any("len(filterSearchResults)", len(filterSearchResults)))
tr.Record("Proxy Search PostExecute stage1 done") tr.Record("Proxy Search PostExecute stage1 done")
if availableQueryNodeNum <= 0 { if len(filterSearchResults) <= 0 {
st.result = &milvuspb.SearchResults{ st.result = &milvuspb.SearchResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError, ErrorCode: commonpb.ErrorCode_UnexpectedError,
...@@ -2054,12 +2050,11 @@ func (st *searchTask) PostExecute(ctx context.Context) error { ...@@ -2054,12 +2050,11 @@ func (st *searchTask) PostExecute(ctx context.Context) error {
return err return err
} }
log.Debug("Proxy Search PostExecute stage2", zap.Any("availableQueryNodeNum", availableQueryNodeNum)) log.Debug("Proxy Search PostExecute stage2", zap.Any("len(validSearchResults)", len(validSearchResults)))
if len(validSearchResults) <= 0 { if len(validSearchResults) <= 0 {
filterReason += "empty search result\n"
log.Debug("Proxy Search PostExecute stage2 failed", zap.Any("filterReason", filterReason)) log.Debug("Proxy Search PostExecute stage2 failed", zap.Any("filterReason", filterReason))
filterReason += "empty search result\n"
st.result = &milvuspb.SearchResults{ st.result = &milvuspb.SearchResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success, ErrorCode: commonpb.ErrorCode_Success,
...@@ -2420,6 +2415,45 @@ func (qt *queryTask) Execute(ctx context.Context) error { ...@@ -2420,6 +2415,45 @@ func (qt *queryTask) Execute(ctx context.Context) error {
return err return err
} }
func copyQueryResultData(dst *milvuspb.QueryResults, src *internalpb.RetrieveResults) {
// handles initialization, cannot use idx==0 since first result may be empty
if len(dst.FieldsData) == 0 {
dst.FieldsData = append(dst.FieldsData, src.FieldsData...)
} else {
for i, fieldData := range src.FieldsData {
switch fieldType := fieldData.Field.(type) {
case *schemapb.FieldData_Scalars:
dstScalar := dst.FieldsData[i].GetScalars()
switch srcScalar := fieldType.Scalars.Data.(type) {
case *schemapb.ScalarField_BoolData:
dstScalar.GetBoolData().Data = append(dstScalar.GetBoolData().Data, srcScalar.BoolData.Data...)
case *schemapb.ScalarField_IntData:
dstScalar.GetIntData().Data = append(dstScalar.GetIntData().Data, srcScalar.IntData.Data...)
case *schemapb.ScalarField_LongData:
dstScalar.GetLongData().Data = append(dstScalar.GetLongData().Data, srcScalar.LongData.Data...)
case *schemapb.ScalarField_FloatData:
dstScalar.GetFloatData().Data = append(dstScalar.GetFloatData().Data, srcScalar.FloatData.Data...)
case *schemapb.ScalarField_DoubleData:
dstScalar.GetDoubleData().Data = append(dstScalar.GetDoubleData().Data, srcScalar.DoubleData.Data...)
default:
log.Debug("Query received not supported data type", zap.String("field type", fieldData.Type.String()))
}
case *schemapb.FieldData_Vectors:
dstVector := dst.FieldsData[i].GetVectors()
switch srcVector := fieldType.Vectors.Data.(type) {
case *schemapb.VectorField_BinaryVector:
dstVector.Data.(*schemapb.VectorField_BinaryVector).BinaryVector =
append(dstVector.Data.(*schemapb.VectorField_BinaryVector).BinaryVector, srcVector.BinaryVector...)
case *schemapb.VectorField_FloatVector:
dstVector.GetFloatVector().Data = append(dstVector.GetFloatVector().Data, srcVector.FloatVector.Data...)
default:
log.Debug("Query received not supported data type", zap.String("field type", fieldData.Type.String()))
}
}
}
}
}
func (qt *queryTask) PostExecute(ctx context.Context) error { func (qt *queryTask) PostExecute(ctx context.Context) error {
tr := timerecord.NewTimeRecorder("queryTask PostExecute") tr := timerecord.NewTimeRecorder("queryTask PostExecute")
defer func() { defer func() {
...@@ -2430,17 +2464,17 @@ func (qt *queryTask) PostExecute(ctx context.Context) error { ...@@ -2430,17 +2464,17 @@ func (qt *queryTask) PostExecute(ctx context.Context) error {
log.Debug("proxy", zap.Int64("Query: wait to finish failed, timeout!, taskID:", qt.ID())) log.Debug("proxy", zap.Int64("Query: wait to finish failed, timeout!, taskID:", qt.ID()))
return fmt.Errorf("queryTask:wait to finish failed, timeout : %d", qt.ID()) return fmt.Errorf("queryTask:wait to finish failed, timeout : %d", qt.ID())
case retrieveResults := <-qt.resultBuf: case retrieveResults := <-qt.resultBuf:
retrieveResult := make([]*internalpb.RetrieveResults, 0) filterRetrieveResults := make([]*internalpb.RetrieveResults, 0)
var reason string var reason string
for _, partialRetrieveResult := range retrieveResults { for _, partialRetrieveResult := range retrieveResults {
if partialRetrieveResult.Status.ErrorCode == commonpb.ErrorCode_Success { if partialRetrieveResult.Status.ErrorCode == commonpb.ErrorCode_Success {
retrieveResult = append(retrieveResult, partialRetrieveResult) filterRetrieveResults = append(filterRetrieveResults, partialRetrieveResult)
} else { } else {
reason += partialRetrieveResult.Status.Reason + "\n" reason += partialRetrieveResult.Status.Reason + "\n"
} }
} }
if len(retrieveResult) == 0 { if len(filterRetrieveResults) == 0 {
qt.result = &milvuspb.QueryResults{ qt.result = &milvuspb.QueryResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError, ErrorCode: commonpb.ErrorCode_UnexpectedError,
...@@ -2452,69 +2486,26 @@ func (qt *queryTask) PostExecute(ctx context.Context) error { ...@@ -2452,69 +2486,26 @@ func (qt *queryTask) PostExecute(ctx context.Context) error {
return errors.New(reason) return errors.New(reason)
} }
availableQueryNodeNum := 0
qt.result = &milvuspb.QueryResults{ qt.result = &milvuspb.QueryResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success, ErrorCode: commonpb.ErrorCode_Success,
}, },
FieldsData: make([]*schemapb.FieldData, 0), FieldsData: make([]*schemapb.FieldData, 0),
} }
for _, partialRetrieveResult := range retrieveResult {
availableQueryNodeNum++ validRetrieveResults := make([]*internalpb.RetrieveResults, 0)
if partialRetrieveResult.Ids == nil { for _, partialRetrieveResult := range filterRetrieveResults {
reason += "ids is nil\n" if partialRetrieveResult.Ids != nil {
continue validRetrieveResults = append(validRetrieveResults, partialRetrieveResult)
} else {
// handles initialization, cannot use idx==0 since first result may be empty
if len(qt.result.FieldsData) == 0 {
qt.result.FieldsData = append(qt.result.FieldsData, partialRetrieveResult.FieldsData...)
} else {
for k, fieldData := range partialRetrieveResult.FieldsData {
switch fieldType := fieldData.Field.(type) {
case *schemapb.FieldData_Scalars:
switch scalarType := fieldType.Scalars.Data.(type) {
case *schemapb.ScalarField_BoolData:
qt.result.FieldsData[k].GetScalars().GetBoolData().Data = append(qt.result.FieldsData[k].GetScalars().GetBoolData().Data, scalarType.BoolData.Data...)
case *schemapb.ScalarField_IntData:
qt.result.FieldsData[k].GetScalars().GetIntData().Data = append(qt.result.FieldsData[k].GetScalars().GetIntData().Data, scalarType.IntData.Data...)
case *schemapb.ScalarField_LongData:
qt.result.FieldsData[k].GetScalars().GetLongData().Data = append(qt.result.FieldsData[k].GetScalars().GetLongData().Data, scalarType.LongData.Data...)
case *schemapb.ScalarField_FloatData:
qt.result.FieldsData[k].GetScalars().GetFloatData().Data = append(qt.result.FieldsData[k].GetScalars().GetFloatData().Data, scalarType.FloatData.Data...)
case *schemapb.ScalarField_DoubleData:
qt.result.FieldsData[k].GetScalars().GetDoubleData().Data = append(qt.result.FieldsData[k].GetScalars().GetDoubleData().Data, scalarType.DoubleData.Data...)
default:
log.Debug("Query received not supported data type")
}
case *schemapb.FieldData_Vectors:
switch vectorType := fieldType.Vectors.Data.(type) {
case *schemapb.VectorField_BinaryVector:
qt.result.FieldsData[k].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector = append(qt.result.FieldsData[k].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector, vectorType.BinaryVector...)
case *schemapb.VectorField_FloatVector:
qt.result.FieldsData[k].GetVectors().GetFloatVector().Data = append(qt.result.FieldsData[k].GetVectors().GetFloatVector().Data, vectorType.FloatVector.Data...)
}
default:
}
}
}
} }
} }
if availableQueryNodeNum == 0 { for _, partialRetrieveResult := range validRetrieveResults {
log.Info("Not any valid result found.", copyQueryResultData(qt.result, partialRetrieveResult)
zap.Any("requestID", qt.Base.MsgID), zap.Any("requestType", "query"))
qt.result = &milvuspb.QueryResults{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: reason,
},
}
return nil
} }
if len(qt.result.FieldsData) == 0 { if len(qt.result.FieldsData) == 0 {
log.Info("Query result is nil.", log.Info("Query result is nil", zap.Any("requestID", qt.Base.MsgID), zap.Any("requestType", "query"))
zap.Any("requestID", qt.Base.MsgID), zap.Any("requestType", "query"))
qt.result = &milvuspb.QueryResults{ qt.result = &milvuspb.QueryResults{
Status: &commonpb.Status{ Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_EmptyCollection, ErrorCode: commonpb.ErrorCode_EmptyCollection,
...@@ -2539,8 +2530,7 @@ func (qt *queryTask) PostExecute(ctx context.Context) error { ...@@ -2539,8 +2530,7 @@ func (qt *queryTask) PostExecute(ctx context.Context) error {
} }
} }
log.Info("Query PostExecute done.", log.Info("Query PostExecute done", zap.Any("requestID", qt.Base.MsgID), zap.Any("requestType", "query"))
zap.Any("requestID", qt.Base.MsgID), zap.Any("requestType", "query"))
return nil return nil
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册