From 3ccfd8419579af28b399d1a075d16a5ef0f3616a Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Mon, 21 Jun 2021 19:20:31 +0800 Subject: [PATCH] update meta sync in query service (#5937) Signed-off-by: bigsheeper --- internal/proxynode/task.go | 3 ++- internal/queryservice/impl.go | 22 +++++++++++----------- internal/queryservice/meta.go | 3 +++ internal/queryservice/task_scheduler.go | 3 +++ 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index f5ec0ec87..9cc190c52 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -1235,8 +1235,9 @@ func (st *SearchTask) PreExecute(ctx context.Context) error { return errors.New(showResp.Status.Reason) } log.Debug("query service show collections", + zap.Any("collID", collID), zap.Any("collections", showResp.CollectionIDs), - zap.Any("collID", collID)) + ) collectionLoaded := false for _, collectionID := range showResp.CollectionIDs { if collectionID == collID { diff --git a/internal/queryservice/impl.go b/internal/queryservice/impl.go index 7c5eeea2f..ca2161d25 100644 --- a/internal/queryservice/impl.go +++ b/internal/queryservice/impl.go @@ -160,11 +160,11 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol } qs.scheduler.Enqueue([]task{loadCollectionTask}) - //err := loadCollectionTask.WaitToFinish() - //if err != nil { - // status.Reason = err.Error() - // return status, err - //} + err := loadCollectionTask.WaitToFinish() + if err != nil { + status.Reason = err.Error() + return status, err + } //qs.meta.setLoadCollection(collectionID, true) log.Debug("LoadCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID)) @@ -275,12 +275,12 @@ func (qs *QueryService) LoadPartitions(ctx context.Context, req *querypb.LoadPar } qs.scheduler.Enqueue([]task{loadPartitionTask}) - //err := loadPartitionTask.WaitToFinish() - //if err != nil { - // status.ErrorCode = commonpb.ErrorCode_UnexpectedError - // status.Reason = err.Error() - // return status, err - //} + err := loadPartitionTask.WaitToFinish() + if err != nil { + status.ErrorCode = commonpb.ErrorCode_UnexpectedError + status.Reason = err.Error() + return status, err + } } status.ErrorCode = commonpb.ErrorCode_Success diff --git a/internal/queryservice/meta.go b/internal/queryservice/meta.go index 82bf57c31..31a67fbac 100644 --- a/internal/queryservice/meta.go +++ b/internal/queryservice/meta.go @@ -188,6 +188,9 @@ func (m *meta) addCollection(collectionID UniqueID, schema *schemapb.CollectionS if err != nil { log.Error("save collectionInfo error", zap.Any("error", err.Error()), zap.Int64("collectionID", collectionID)) } + log.Debug("add collection", + zap.Any("collectionID", collectionID), + ) return nil } diff --git a/internal/queryservice/task_scheduler.go b/internal/queryservice/task_scheduler.go index b96b237f2..d740fbd73 100644 --- a/internal/queryservice/task_scheduler.go +++ b/internal/queryservice/task_scheduler.go @@ -518,6 +518,9 @@ func (scheduler *TaskScheduler) scheduleLoop() { log.Error("scheduleLoop: process task error", zap.Any("error", err.Error())) continue } + if t.Type() == commonpb.MsgType_LoadCollection || t.Type() == commonpb.MsgType_LoadPartitions { + t.Notify(err) + } } log.Debug("scheduleLoop: num of child task", zap.Int("num child task", len(t.GetChildTask()))) for _, childTask := range t.GetChildTask() { -- GitLab