From 24f9411249a5db5a2b522936d62274b20c6fea3c Mon Sep 17 00:00:00 2001 From: Cai Yudong Date: Wed, 27 Oct 2021 17:34:35 +0800 Subject: [PATCH] Optimize queryTask PostExecute (#10739) Signed-off-by: yudong.cai --- internal/proxy/task.go | 122 +++++++++++++++++++---------------------- 1 file changed, 56 insertions(+), 66 deletions(-) diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 80d51916d..24b4285d1 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -1753,7 +1753,7 @@ func selectSearchResultData(dataArray []*schemapb.SearchResultData, offsets []in return sel } -func copySearchResultData(dst *schemapb.SearchResultData, src *schemapb.SearchResultData, idx int64) error { +func copySearchResultData(dst *schemapb.SearchResultData, src *schemapb.SearchResultData, idx int64) { for i, fieldData := range src.FieldsData { switch fieldType := fieldData.Field.(type) { case *schemapb.FieldData_Scalars: @@ -1829,7 +1829,6 @@ func copySearchResultData(dst *schemapb.SearchResultData, src *schemapb.SearchRe } default: log.Debug("Not supported field type", zap.String("field type", fieldData.Type.String())) - return fmt.Errorf("not supported field type: %s", fieldData.Type.String()) } case *schemapb.FieldData_Vectors: dim := fieldType.Vectors.Dim @@ -1867,11 +1866,9 @@ func copySearchResultData(dst *schemapb.SearchResultData, src *schemapb.SearchRe } default: log.Debug("Not supported field type", zap.String("field type", fieldData.Type.String())) - return fmt.Errorf("not supported field type: %s", fieldData.Type.String()) } } } - return nil } //func printSearchResultData(data *schemapb.SearchResultData, header string) { @@ -2035,11 +2032,10 @@ func (st *searchTask) PostExecute(ctx context.Context) error { } } - availableQueryNodeNum := len(filterSearchResults) log.Debug("Proxy Search PostExecute stage1", - zap.Any("availableQueryNodeNum", availableQueryNodeNum)) + zap.Any("len(filterSearchResults)", len(filterSearchResults))) tr.Record("Proxy Search PostExecute stage1 done") - if availableQueryNodeNum <= 0 { + if len(filterSearchResults) <= 0 { st.result = &milvuspb.SearchResults{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -2054,12 +2050,11 @@ func (st *searchTask) PostExecute(ctx context.Context) error { return err } - log.Debug("Proxy Search PostExecute stage2", zap.Any("availableQueryNodeNum", availableQueryNodeNum)) - + log.Debug("Proxy Search PostExecute stage2", zap.Any("len(validSearchResults)", len(validSearchResults))) if len(validSearchResults) <= 0 { + filterReason += "empty search result\n" log.Debug("Proxy Search PostExecute stage2 failed", zap.Any("filterReason", filterReason)) - filterReason += "empty search result\n" st.result = &milvuspb.SearchResults{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -2420,6 +2415,45 @@ func (qt *queryTask) Execute(ctx context.Context) error { return err } +func copyQueryResultData(dst *milvuspb.QueryResults, src *internalpb.RetrieveResults) { + // handles initialization, cannot use idx==0 since first result may be empty + if len(dst.FieldsData) == 0 { + dst.FieldsData = append(dst.FieldsData, src.FieldsData...) + } else { + for i, fieldData := range src.FieldsData { + switch fieldType := fieldData.Field.(type) { + case *schemapb.FieldData_Scalars: + dstScalar := dst.FieldsData[i].GetScalars() + switch srcScalar := fieldType.Scalars.Data.(type) { + case *schemapb.ScalarField_BoolData: + dstScalar.GetBoolData().Data = append(dstScalar.GetBoolData().Data, srcScalar.BoolData.Data...) + case *schemapb.ScalarField_IntData: + dstScalar.GetIntData().Data = append(dstScalar.GetIntData().Data, srcScalar.IntData.Data...) + case *schemapb.ScalarField_LongData: + dstScalar.GetLongData().Data = append(dstScalar.GetLongData().Data, srcScalar.LongData.Data...) + case *schemapb.ScalarField_FloatData: + dstScalar.GetFloatData().Data = append(dstScalar.GetFloatData().Data, srcScalar.FloatData.Data...) + case *schemapb.ScalarField_DoubleData: + dstScalar.GetDoubleData().Data = append(dstScalar.GetDoubleData().Data, srcScalar.DoubleData.Data...) + default: + log.Debug("Query received not supported data type", zap.String("field type", fieldData.Type.String())) + } + case *schemapb.FieldData_Vectors: + dstVector := dst.FieldsData[i].GetVectors() + switch srcVector := fieldType.Vectors.Data.(type) { + case *schemapb.VectorField_BinaryVector: + dstVector.Data.(*schemapb.VectorField_BinaryVector).BinaryVector = + append(dstVector.Data.(*schemapb.VectorField_BinaryVector).BinaryVector, srcVector.BinaryVector...) + case *schemapb.VectorField_FloatVector: + dstVector.GetFloatVector().Data = append(dstVector.GetFloatVector().Data, srcVector.FloatVector.Data...) + default: + log.Debug("Query received not supported data type", zap.String("field type", fieldData.Type.String())) + } + } + } + } +} + func (qt *queryTask) PostExecute(ctx context.Context) error { tr := timerecord.NewTimeRecorder("queryTask PostExecute") defer func() { @@ -2430,17 +2464,17 @@ func (qt *queryTask) PostExecute(ctx context.Context) error { log.Debug("proxy", zap.Int64("Query: wait to finish failed, timeout!, taskID:", qt.ID())) return fmt.Errorf("queryTask:wait to finish failed, timeout : %d", qt.ID()) case retrieveResults := <-qt.resultBuf: - retrieveResult := make([]*internalpb.RetrieveResults, 0) + filterRetrieveResults := make([]*internalpb.RetrieveResults, 0) var reason string for _, partialRetrieveResult := range retrieveResults { if partialRetrieveResult.Status.ErrorCode == commonpb.ErrorCode_Success { - retrieveResult = append(retrieveResult, partialRetrieveResult) + filterRetrieveResults = append(filterRetrieveResults, partialRetrieveResult) } else { reason += partialRetrieveResult.Status.Reason + "\n" } } - if len(retrieveResult) == 0 { + if len(filterRetrieveResults) == 0 { qt.result = &milvuspb.QueryResults{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -2452,69 +2486,26 @@ func (qt *queryTask) PostExecute(ctx context.Context) error { return errors.New(reason) } - availableQueryNodeNum := 0 qt.result = &milvuspb.QueryResults{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, FieldsData: make([]*schemapb.FieldData, 0), } - for _, partialRetrieveResult := range retrieveResult { - availableQueryNodeNum++ - if partialRetrieveResult.Ids == nil { - reason += "ids is nil\n" - continue - } else { - // handles initialization, cannot use idx==0 since first result may be empty - if len(qt.result.FieldsData) == 0 { - qt.result.FieldsData = append(qt.result.FieldsData, partialRetrieveResult.FieldsData...) - } else { - for k, fieldData := range partialRetrieveResult.FieldsData { - switch fieldType := fieldData.Field.(type) { - case *schemapb.FieldData_Scalars: - switch scalarType := fieldType.Scalars.Data.(type) { - case *schemapb.ScalarField_BoolData: - qt.result.FieldsData[k].GetScalars().GetBoolData().Data = append(qt.result.FieldsData[k].GetScalars().GetBoolData().Data, scalarType.BoolData.Data...) - case *schemapb.ScalarField_IntData: - qt.result.FieldsData[k].GetScalars().GetIntData().Data = append(qt.result.FieldsData[k].GetScalars().GetIntData().Data, scalarType.IntData.Data...) - case *schemapb.ScalarField_LongData: - qt.result.FieldsData[k].GetScalars().GetLongData().Data = append(qt.result.FieldsData[k].GetScalars().GetLongData().Data, scalarType.LongData.Data...) - case *schemapb.ScalarField_FloatData: - qt.result.FieldsData[k].GetScalars().GetFloatData().Data = append(qt.result.FieldsData[k].GetScalars().GetFloatData().Data, scalarType.FloatData.Data...) - case *schemapb.ScalarField_DoubleData: - qt.result.FieldsData[k].GetScalars().GetDoubleData().Data = append(qt.result.FieldsData[k].GetScalars().GetDoubleData().Data, scalarType.DoubleData.Data...) - default: - log.Debug("Query received not supported data type") - } - case *schemapb.FieldData_Vectors: - switch vectorType := fieldType.Vectors.Data.(type) { - case *schemapb.VectorField_BinaryVector: - qt.result.FieldsData[k].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector = append(qt.result.FieldsData[k].GetVectors().Data.(*schemapb.VectorField_BinaryVector).BinaryVector, vectorType.BinaryVector...) - case *schemapb.VectorField_FloatVector: - qt.result.FieldsData[k].GetVectors().GetFloatVector().Data = append(qt.result.FieldsData[k].GetVectors().GetFloatVector().Data, vectorType.FloatVector.Data...) - } - default: - } - } - } + + validRetrieveResults := make([]*internalpb.RetrieveResults, 0) + for _, partialRetrieveResult := range filterRetrieveResults { + if partialRetrieveResult.Ids != nil { + validRetrieveResults = append(validRetrieveResults, partialRetrieveResult) } } - if availableQueryNodeNum == 0 { - log.Info("Not any valid result found.", - zap.Any("requestID", qt.Base.MsgID), zap.Any("requestType", "query")) - qt.result = &milvuspb.QueryResults{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: reason, - }, - } - return nil + for _, partialRetrieveResult := range validRetrieveResults { + copyQueryResultData(qt.result, partialRetrieveResult) } if len(qt.result.FieldsData) == 0 { - log.Info("Query result is nil.", - zap.Any("requestID", qt.Base.MsgID), zap.Any("requestType", "query")) + log.Info("Query result is nil", zap.Any("requestID", qt.Base.MsgID), zap.Any("requestType", "query")) qt.result = &milvuspb.QueryResults{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_EmptyCollection, @@ -2539,8 +2530,7 @@ func (qt *queryTask) PostExecute(ctx context.Context) error { } } - log.Info("Query PostExecute done.", - zap.Any("requestID", qt.Base.MsgID), zap.Any("requestType", "query")) + log.Info("Query PostExecute done", zap.Any("requestID", qt.Base.MsgID), zap.Any("requestType", "query")) return nil } -- GitLab