From 38845d9669acf384287848d872d00494de21f9e8 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 17 Nov 2022 15:13:08 +0800 Subject: [PATCH] Reduce log frequency in IndexCoord (#20638) Signed-off-by: cai.zhang Signed-off-by: cai.zhang --- internal/indexcoord/flush_segment_watcher.go | 25 +++----- internal/indexcoord/handoff.go | 31 +++++----- internal/indexcoord/handoff_test.go | 18 +++--- internal/indexcoord/index_builder.go | 65 +++++++++----------- internal/indexcoord/index_coord.go | 6 +- internal/indexcoord/meta_table.go | 2 + 6 files changed, 71 insertions(+), 76 deletions(-) diff --git a/internal/indexcoord/flush_segment_watcher.go b/internal/indexcoord/flush_segment_watcher.go index df88c70f7..ac7ba5fcc 100644 --- a/internal/indexcoord/flush_segment_watcher.go +++ b/internal/indexcoord/flush_segment_watcher.go @@ -140,6 +140,8 @@ func (fsw *flushedSegmentWatcher) enqueueInternalTask(segment *datapb.SegmentInf } } } + log.Ctx(fsw.ctx).Info("flushed segment task enqueue successfully", zap.Int64("segID", segment.GetID()), + zap.Bool("isFake", segment.GetIsFake())) } func (fsw *flushedSegmentWatcher) internalScheduler() { @@ -229,13 +231,10 @@ func (fsw *flushedSegmentWatcher) setInternalTaskSegmentInfo(segID UniqueID, seg if _, ok := fsw.internalTasks[segID]; ok { fsw.internalTasks[segID].segmentInfo = segInfo } - log.Ctx(fsw.ctx).Debug("flushedSegmentWatcher set internal task segment info success", zap.Int64("segID", segID)) } func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) { t := fsw.getInternalTask(segID) - log.Ctx(fsw.ctx).RatedDebug(10, "flushedSegmentWatcher process internal task", zap.Int64("segID", segID), - zap.String("state", t.state.String())) switch t.state { case indexTaskPrepare: @@ -247,13 +246,13 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) { } if err := fsw.prepare(segID); err != nil { - log.Ctx(fsw.ctx).RatedWarn(10, "flushedSegmentWatcher prepare internal task fail", zap.Int64("segID", segID), zap.Error(err)) + log.Ctx(fsw.ctx).Warn("flushedSegmentWatcher prepare internal task fail", zap.Int64("segID", segID), zap.Error(err)) return } fsw.updateInternalTaskState(segID, indexTaskInit) case indexTaskInit: if err := fsw.constructTask(t); err != nil { - log.Ctx(fsw.ctx).RatedWarn(10, "flushedSegmentWatcher construct task fail", zap.Int64("segID", segID), zap.Error(err)) + log.Ctx(fsw.ctx).Warn("flushedSegmentWatcher construct task fail", zap.Int64("segID", segID), zap.Error(err)) return } fsw.updateInternalTaskState(segID, indexTaskInProgress) @@ -265,8 +264,6 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) { } case indexTaskDone: if err := fsw.removeFlushedSegment(t); err != nil { - log.Ctx(fsw.ctx).RatedWarn(10, "IndexCoord flushSegmentWatcher removeFlushedSegment fail", - zap.Int64("segID", segID), zap.Error(err)) return } fsw.deleteInternalTask(segID) @@ -310,17 +307,15 @@ func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error { // send to indexBuilder have, buildID, err := fsw.ic.createIndexForSegment(segIdx) if err != nil { - log.Ctx(fsw.ctx).Warn("IndexCoord create index for segment fail", zap.Int64("segID", t.segmentInfo.ID), - zap.Int64("indexID", index.IndexID), zap.Error(err)) return err } if !have { fsw.builder.enqueue(buildID) } + log.Ctx(fsw.ctx).Info("flushedSegmentWatcher construct task success", zap.Int64("segID", t.segmentInfo.ID), + zap.Int64("buildID", buildID), zap.Bool("already have index task", have)) } fsw.handoff.enqueue(t.segmentInfo) - log.Ctx(fsw.ctx).Debug("flushedSegmentWatcher construct task success", zap.Int64("segID", t.segmentInfo.ID), - zap.Int("segments num", len(fieldIndexes))) return nil } @@ -328,25 +323,24 @@ func (fsw *flushedSegmentWatcher) removeFlushedSegment(t *internalTask) error { deletedKeys := fmt.Sprintf("%s/%d/%d/%d", util.FlushedSegmentPrefix, t.segmentInfo.CollectionID, t.segmentInfo.PartitionID, t.segmentInfo.ID) err := fsw.kvClient.RemoveWithPrefix(deletedKeys) if err != nil { - log.Ctx(fsw.ctx).Warn("IndexCoord remove flushed segment fail", zap.Int64("collID", t.segmentInfo.CollectionID), + log.Ctx(fsw.ctx).Warn("IndexCoord remove flushed segment key fail", zap.Int64("collID", t.segmentInfo.CollectionID), zap.Int64("partID", t.segmentInfo.PartitionID), zap.Int64("segID", t.segmentInfo.ID), zap.Error(err)) return err } - log.Ctx(fsw.ctx).Info("IndexCoord remove flushed segment success", zap.Int64("collID", t.segmentInfo.CollectionID), + log.Ctx(fsw.ctx).Info("IndexCoord remove flushed segment key success", zap.Int64("collID", t.segmentInfo.CollectionID), zap.Int64("partID", t.segmentInfo.PartitionID), zap.Int64("segID", t.segmentInfo.ID)) return nil } func (fsw *flushedSegmentWatcher) prepare(segID UniqueID) error { defer fsw.internalNotifyFunc() - log.Debug("prepare flushed segment task", zap.Int64("segID", segID)) t := fsw.getInternalTask(segID) if t.segmentInfo != nil { return nil } info, err := fsw.ic.pullSegmentInfo(fsw.ctx, segID) if err != nil { - log.Error("flushedSegmentWatcher get segment info fail", zap.Int64("segID", segID), + log.Warn("flushedSegmentWatcher get segment info fail", zap.Int64("segID", segID), zap.Error(err)) if errors.Is(err, ErrSegmentNotFound) { fsw.deleteInternalTask(segID) @@ -355,5 +349,6 @@ func (fsw *flushedSegmentWatcher) prepare(segID UniqueID) error { return err } fsw.setInternalTaskSegmentInfo(segID, info) + log.Ctx(fsw.ctx).Info("flushedSegmentWatcher prepare task success", zap.Int64("segID", segID)) return nil } diff --git a/internal/indexcoord/handoff.go b/internal/indexcoord/handoff.go index 440504365..a514cb64b 100644 --- a/internal/indexcoord/handoff.go +++ b/internal/indexcoord/handoff.go @@ -95,7 +95,7 @@ func (hd *handoff) enqueue(segment *datapb.SegmentInfo) { // note: don't reset state if the task contains state hd.segments[segment.GetID()] = segment - log.Ctx(hd.ctx).Info("segment need to write handoff", + log.Ctx(hd.ctx).Info("handoff task enqueue successfully", zap.Int64("segID", segment.GetID()), zap.Bool("isFake", segment.GetIsFake()), ) @@ -151,13 +151,13 @@ func (hd *handoff) run() { if len(segIDs) > 0 { log.Ctx(hd.ctx).Debug("handoff process...", zap.Int("task num", len(segIDs))) } - for i, segID := range segIDs { - hd.process(segID, i == 0) + for _, segID := range segIDs { + hd.process(segID) } } -func (hd *handoff) handoffFakedSegment(segment *datapb.SegmentInfo, front bool) { - if front || hd.allParentsDone(segment.GetCompactionFrom()) { +func (hd *handoff) handoffFakedSegment(segment *datapb.SegmentInfo) { + if hd.allParentsDone(segment.GetCompactionFrom()) { handoffSegment := &querypb.SegmentInfo{ SegmentID: segment.GetID(), CollectionID: segment.GetCollectionID(), @@ -180,24 +180,24 @@ func (hd *handoff) handoffFakedSegment(segment *datapb.SegmentInfo, front bool) } } -func (hd *handoff) process(segID UniqueID, front bool) { +func (hd *handoff) process(segID UniqueID) { hd.taskMutex.RLock() segment, ok := hd.segments[segID] hd.taskMutex.RUnlock() if !ok { - log.Ctx(hd.ctx).Warn("handoff get segment fail", zap.Int64("segID", segID)) + log.Ctx(hd.ctx).Warn("handoff get task fail", zap.Int64("segID", segID)) return } if segment.GetIsFake() { - hd.handoffFakedSegment(segment, front) + hd.handoffFakedSegment(segment) return } state := hd.meta.GetSegmentIndexState(segID) log.Ctx(hd.ctx).RatedDebug(30, "handoff task is process", zap.Int64("segID", segID), - zap.Bool("front", front), zap.String("state", state.state.String())) + zap.String("state", state.state.String())) if state.state == commonpb.IndexState_Failed { log.Ctx(hd.ctx).Error("build index failed, may be need manual intervention", zap.Int64("segID", segID), zap.String("fail reason", state.failReason)) @@ -210,7 +210,7 @@ func (hd *handoff) process(segID UniqueID, front bool) { info, err := hd.ic.pullSegmentInfo(hd.ctx, segID) if err != nil { if errors.Is(err, ErrSegmentNotFound) { - log.Ctx(hd.ctx).Error("handoff get segment fail", zap.Error(err)) + log.Ctx(hd.ctx).Warn("handoff get segment fail, remove task", zap.Error(err)) hd.deleteTask(segID) return } @@ -221,12 +221,13 @@ func (hd *handoff) process(segID UniqueID, front bool) { log.Debug("segment is importing, can't write handoff event", zap.Int64("segID", segID)) return } - if front || hd.allParentsDone(info.CompactionFrom) { - log.Ctx(hd.ctx).Debug("segment can write handoff event", zap.Int64("segID", segID), zap.Bool("front", front), + if hd.allParentsDone(info.CompactionFrom) { + log.Ctx(hd.ctx).Debug("segment can write handoff event", zap.Int64("segID", segID), zap.Int64s("compactionFrom", info.CompactionFrom)) indexInfos := hd.meta.GetSegmentIndexes(segID) if len(indexInfos) == 0 { - log.Ctx(hd.ctx).Warn("ready to write handoff, but there is no index, may be dropped", zap.Int64("segID", segID)) + log.Ctx(hd.ctx).Warn("ready to write handoff, but there is no index, may be dropped, remove task", + zap.Int64("segID", segID)) hd.deleteTask(segID) return } @@ -255,12 +256,12 @@ func (hd *handoff) process(segID UniqueID, front bool) { }) } - log.Ctx(hd.ctx).Info("write handoff task success", zap.Int64("segID", segID)) if !hd.meta.AlreadyWrittenHandoff(segID) { if err := hd.writeHandoffSegment(handoffTask); err != nil { log.Ctx(hd.ctx).Warn("write handoff task fail, need to retry", zap.Int64("segID", segID), zap.Error(err)) return } + log.Ctx(hd.ctx).Info("write handoff success", zap.Int64("segID", segID)) if err := hd.meta.MarkSegmentWriteHandoff(segID); err != nil { log.Ctx(hd.ctx).Warn("mark segment as write handoff fail, need to retry", zap.Int64("segID", segID), zap.Error(err)) return @@ -271,6 +272,8 @@ func (hd *handoff) process(segID UniqueID, front bool) { hd.deleteTask(segID) return } + log.Ctx(hd.ctx).RatedDebug(5, "the handoff of the parent segment has not been written yet", + zap.Int64("segID", segID), zap.Int64s("compactionFrom", info.CompactionFrom)) } } diff --git a/internal/indexcoord/handoff_test.go b/internal/indexcoord/handoff_test.go index 24355aeb4..cbe699a7d 100644 --- a/internal/indexcoord/handoff_test.go +++ b/internal/indexcoord/handoff_test.go @@ -177,7 +177,7 @@ func Test_newHandoff(t *testing.T) { func Test_process(t *testing.T) { t.Run("not found segment", func(t *testing.T) { hd := &handoff{segments: map[UniqueID]*datapb.SegmentInfo{}} - hd.process(segID, true) + hd.process(segID) assert.Equal(t, 0, hd.Len()) }) @@ -197,7 +197,7 @@ func Test_process(t *testing.T) { }, } - hd.process(segID, true) + hd.process(segID) assert.Equal(t, 0, hd.Len()) }) } @@ -240,7 +240,7 @@ func Test_handoff_error(t *testing.T) { }, }, } - hd.process(segID, true) + hd.process(segID) assert.Equal(t, 1, hd.Len()) hd.ic.dataCoordClient = &DataCoordMock{ @@ -248,7 +248,7 @@ func Test_handoff_error(t *testing.T) { return nil, errSegmentNotFound(segID) }, } - hd.process(segID, true) + hd.process(segID) assert.Equal(t, 0, hd.Len()) }) @@ -305,7 +305,7 @@ func Test_handoff_error(t *testing.T) { }, } - hd.process(segID, true) + hd.process(segID) assert.Equal(t, 1, hd.Len()) }) @@ -343,7 +343,7 @@ func Test_handoff_error(t *testing.T) { }, } - hd.process(segID, true) + hd.process(segID) assert.Equal(t, 0, hd.Len()) }) @@ -386,7 +386,7 @@ func Test_handoff_error(t *testing.T) { }, } - hd.process(segID, true) + hd.process(segID) assert.Equal(t, 1, hd.Len()) }) @@ -407,7 +407,7 @@ func Test_handoff_error(t *testing.T) { }, } - hd.process(segID, true) + hd.process(segID) assert.Equal(t, 1, hd.Len()) }) @@ -450,7 +450,7 @@ func Test_handoff_error(t *testing.T) { }, } - hd.process(segID, true) + hd.process(segID) assert.Equal(t, 1, hd.Len()) }) } diff --git a/internal/indexcoord/index_builder.go b/internal/indexcoord/index_builder.go index e52f24a18..583d14430 100644 --- a/internal/indexcoord/index_builder.go +++ b/internal/indexcoord/index_builder.go @@ -132,6 +132,7 @@ func (ib *indexBuilder) enqueue(buildID UniqueID) { if _, ok := ib.tasks[buildID]; !ok { ib.tasks[buildID] = indexTaskInit } + log.Info("indexBuilder enqueue task", zap.Int64("buildID", buildID)) } func (ib *indexBuilder) schedule() { @@ -174,7 +175,7 @@ func (ib *indexBuilder) run() { for _, buildID := range buildIDs { ok := ib.process(buildID) if !ok { - log.Ctx(ib.ctx).Debug("there is no IndexNode available or etcd is not serviceable, wait a minute...") + log.Ctx(ib.ctx).Info("there is no IndexNode available or etcd is not serviceable, wait a minute...") break } } @@ -197,10 +198,9 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { delete(ib.tasks, buildID) } - log.Ctx(ib.ctx).RatedDebug(10, "index task is processing", zap.Int64("buildID", buildID), zap.String("task state", state.String())) meta, exist := ib.meta.GetMeta(buildID) if !exist { - log.Ctx(ib.ctx).RatedDebug(10, "index task has not exist in meta table, remove task", zap.Int64("buildID", buildID)) + log.Ctx(ib.ctx).RatedDebug(5, "index task has not exist in meta table, remove task", zap.Int64("buildID", buildID)) deleteFunc(buildID) return true } @@ -208,7 +208,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { switch state { case indexTaskInit: if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) { - log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID)) + log.Ctx(ib.ctx).RatedDebug(5, "task is no need to build index, remove it", zap.Int64("buildID", buildID)) deleteFunc(buildID) return true } @@ -223,7 +223,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { SerializedSize: 0, FailReason: "", }); err != nil { - log.Ctx(ib.ctx).RatedWarn(10, "IndexCoord update index state fail", zap.Int64("buildID", buildID), zap.Error(err)) + log.Ctx(ib.ctx).Warn("IndexCoord update index state fail", zap.Int64("buildID", buildID), zap.Error(err)) return false } updateStateFunc(buildID, indexTaskDone) @@ -233,25 +233,25 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { // if all IndexNodes are executing task, wait for one of them to finish the task. nodeID, client := ib.ic.nodeManager.PeekClient(meta) if client == nil { - log.Ctx(ib.ctx).RatedDebug(10, "index builder peek client error, there is no available") + log.Ctx(ib.ctx).RatedInfo(5, "index builder peek client error, there is no available") return false } // update version and set nodeID if err := ib.meta.UpdateVersion(buildID, nodeID); err != nil { - log.Ctx(ib.ctx).RatedWarn(10, "index builder update index version failed", zap.Int64("build", buildID), zap.Error(err)) + log.Ctx(ib.ctx).Warn("index builder update index version failed", zap.Int64("build", buildID), zap.Error(err)) return false } // acquire lock if err := ib.ic.tryAcquireSegmentReferLock(ib.ctx, buildID, nodeID, []UniqueID{meta.SegmentID}); err != nil { - log.Ctx(ib.ctx).RatedWarn(10, "index builder acquire segment reference lock failed", zap.Int64("buildID", buildID), + log.Ctx(ib.ctx).Warn("index builder acquire segment reference lock failed", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID), zap.Error(err)) updateStateFunc(buildID, indexTaskRetry) return false } info, err := ib.ic.pullSegmentInfo(ib.ctx, meta.SegmentID) if err != nil { - log.Ctx(ib.ctx).RatedWarn(10, "IndexCoord get segment info from DataCoord fail", zap.Int64("segID", meta.SegmentID), + log.Ctx(ib.ctx).Warn("IndexCoord get segment info from DataCoord fail", zap.Int64("segID", meta.SegmentID), zap.Int64("buildID", buildID), zap.Error(err)) if errors.Is(err, ErrSegmentNotFound) { updateStateFunc(buildID, indexTaskDeleted) @@ -303,7 +303,6 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { TypeParams: typeParams, NumRows: meta.NumRows, } - log.Ctx(ib.ctx).RatedDebug(10, "assign task to indexNode", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) if err := ib.ic.assignTask(client, req); err != nil { // need to release lock then reassign, so set task state to retry log.Ctx(ib.ctx).RatedWarn(10, "index builder assign task to IndexNode failed", zap.Int64("buildID", buildID), @@ -311,21 +310,22 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { updateStateFunc(buildID, indexTaskRetry) return false } + log.Ctx(ib.ctx).Info("index task assigned successfully", zap.Int64("buildID", buildID), + zap.Int64("segID", meta.SegmentID), zap.Int64("nodeID", nodeID)) // update index meta state to InProgress if err := ib.meta.BuildIndex(buildID); err != nil { // need to release lock then reassign, so set task state to retry - log.Ctx(ib.ctx).RatedWarn(10, "index builder update index meta to InProgress failed", zap.Int64("buildID", buildID), + log.Ctx(ib.ctx).Warn("index builder update index meta to InProgress failed", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID), zap.Error(err)) updateStateFunc(buildID, indexTaskRetry) return false } - log.Ctx(ib.ctx).RatedDebug(10, "index task assigned success", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) updateStateFunc(buildID, indexTaskInProgress) case indexTaskDone: - log.Ctx(ib.ctx).RatedDebug(10, "index task has done", zap.Int64("buildID", buildID)) if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) { - log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID)) + log.Ctx(ib.ctx).Info("task is no need to build index, remove it", zap.Int64("buildID", buildID), + zap.Int64("segID", meta.SegmentID)) updateStateFunc(buildID, indexTaskDeleted) return true } @@ -335,30 +335,26 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { } if err := ib.releaseLockAndResetNode(buildID, meta.NodeID); err != nil { // release lock failed, no need to modify state, wait to retry - log.Ctx(ib.ctx).RatedWarn(10, "index builder try to release reference lock failed", zap.Error(err)) return false } deleteFunc(buildID) case indexTaskRetry: if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) { - log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID)) + log.Ctx(ib.ctx).Info("task is no need to build index, remove it", zap.Int64("buildID", buildID)) updateStateFunc(buildID, indexTaskDeleted) return true } if !ib.dropIndexTask(buildID, meta.NodeID) { - log.Ctx(ib.ctx).RatedWarn(5, "drop index task fail, need retry") return true } if err := ib.releaseLockAndResetTask(buildID, meta.NodeID); err != nil { // release lock failed, no need to modify state, wait to retry - log.Ctx(ib.ctx).RatedWarn(10, "index builder try to release reference lock failed", zap.Error(err)) return false } updateStateFunc(buildID, indexTaskInit) case indexTaskDeleted: log.Ctx(ib.ctx).Debug("index task state is deleted, try to release reference lock", zap.Int64("buildID", buildID)) - // TODO: delete after QueryCoordV2 if err := ib.meta.MarkSegmentsIndexAsDeletedByBuildID([]int64{buildID}); err != nil { return false } @@ -370,7 +366,6 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { } if err := ib.releaseLockAndResetNode(buildID, meta.NodeID); err != nil { // release lock failed, no need to modify state, wait to retry - log.Ctx(ib.ctx).RatedWarn(10, "index builder try to release reference lock failed", zap.Error(err)) return false } } @@ -378,8 +373,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { deleteFunc(buildID) default: - log.Ctx(ib.ctx).Debug("index task is in progress", zap.Int64("buildID", buildID), - zap.String("state", meta.IndexState.String())) + // state: in_progress if !ib.meta.NeedIndex(meta.CollectionID, meta.IndexID) { log.Ctx(ib.ctx).Debug("task is no need to build index, remove it", zap.Int64("buildID", buildID)) updateStateFunc(buildID, indexTaskDeleted) @@ -391,7 +385,6 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { } func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState { - log.Ctx(ib.ctx).Info("IndexCoord indexBuilder get index task state", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) client, exist := ib.ic.nodeManager.GetClientByID(nodeID) if exist { ctx1, cancel := context.WithTimeout(ib.ctx, reqTimeoutInterval) @@ -401,12 +394,12 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState { BuildIDs: []int64{buildID}, }) if err != nil { - log.Ctx(ib.ctx).Error("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID), + log.Ctx(ib.ctx).Warn("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID), zap.Error(err)) return indexTaskInProgress } if response.Status.ErrorCode != commonpb.ErrorCode_Success { - log.Ctx(ib.ctx).Error("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID), + log.Ctx(ib.ctx).Warn("IndexCoord get jobs info from IndexNode fail", zap.Int64("nodeID", nodeID), zap.Int64("buildID", buildID), zap.String("fail reason", response.Status.Reason)) return indexTaskInProgress } @@ -418,7 +411,7 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState { log.Ctx(ib.ctx).Info("this task has been finished", zap.Int64("buildID", info.BuildID), zap.String("index state", info.State.String())) if err := ib.meta.FinishTask(info); err != nil { - log.Ctx(ib.ctx).Error("IndexCoord update index state fail", zap.Int64("buildID", info.BuildID), + log.Ctx(ib.ctx).Warn("IndexCoord update index state fail", zap.Int64("buildID", info.BuildID), zap.String("index state", info.State.String()), zap.Error(err)) return indexTaskInProgress } @@ -435,11 +428,12 @@ func (ib *indexBuilder) getTaskState(buildID, nodeID UniqueID) indexTaskState { return indexTaskRetry } // !exist --> node down + log.Ctx(ib.ctx).Info("this task should be retry, indexNode is no longer exist", zap.Int64("buildID", buildID), + zap.Int64("nodeID", nodeID)) return indexTaskRetry } func (ib *indexBuilder) dropIndexTask(buildID, nodeID UniqueID) bool { - log.Ctx(ib.ctx).Info("IndexCoord notify IndexNode drop the index task", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) client, exist := ib.ic.nodeManager.GetClientByID(nodeID) if exist { ctx1, cancel := context.WithTimeout(ib.ctx, reqTimeoutInterval) @@ -458,21 +452,24 @@ func (ib *indexBuilder) dropIndexTask(buildID, nodeID UniqueID) bool { zap.Int64("nodeID", nodeID), zap.String("fail reason", status.Reason)) return false } + log.Ctx(ib.ctx).Info("IndexCoord notify IndexNode drop the index task success", + zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) return true } + log.Ctx(ib.ctx).Info("IndexNode no longer exist, no need to drop index task", + zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) return true } func (ib *indexBuilder) releaseLockAndResetNode(buildID UniqueID, nodeID UniqueID) error { - log.Ctx(ib.ctx).Info("release segment reference lock and reset nodeID", zap.Int64("buildID", buildID), - zap.Int64("nodeID", nodeID)) if err := ib.ic.tryReleaseSegmentReferLock(ib.ctx, buildID, nodeID); err != nil { // release lock failed, no need to modify state, wait to retry - log.Ctx(ib.ctx).Error("index builder try to release reference lock failed", zap.Error(err)) + log.Ctx(ib.ctx).Warn("index builder try to release reference lock failed", zap.Int64("buildID", buildID), + zap.Int64("nodeID", nodeID), zap.Error(err)) return err } if err := ib.meta.ResetNodeID(buildID); err != nil { - log.Ctx(ib.ctx).Error("index builder try to reset nodeID failed", zap.Error(err)) + log.Ctx(ib.ctx).Warn("index builder try to reset nodeID failed", zap.Error(err)) return err } log.Ctx(ib.ctx).Info("release segment reference lock and reset nodeID success", zap.Int64("buildID", buildID), @@ -481,17 +478,15 @@ func (ib *indexBuilder) releaseLockAndResetNode(buildID UniqueID, nodeID UniqueI } func (ib *indexBuilder) releaseLockAndResetTask(buildID UniqueID, nodeID UniqueID) error { - log.Ctx(ib.ctx).Info("release segment reference lock and reset task", zap.Int64("buildID", buildID), - zap.Int64("nodeID", nodeID)) if nodeID != 0 { if err := ib.ic.tryReleaseSegmentReferLock(ib.ctx, buildID, nodeID); err != nil { // release lock failed, no need to modify state, wait to retry - log.Ctx(ib.ctx).Error("index builder try to release reference lock failed", zap.Error(err)) + log.Ctx(ib.ctx).Warn("index builder try to release reference lock failed", zap.Error(err)) return err } } if err := ib.meta.ResetMeta(buildID); err != nil { - log.Ctx(ib.ctx).Error("index builder try to reset task failed", zap.Error(err)) + log.Ctx(ib.ctx).Warn("index builder try to reset task failed", zap.Error(err)) return err } log.Ctx(ib.ctx).Info("release segment reference lock and reset task success", zap.Int64("buildID", buildID), diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index c95b17d66..985fc68fc 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -1287,11 +1287,11 @@ func (i *IndexCoord) pullSegmentInfo(ctx context.Context, segmentID UniqueID) (* IncludeUnHealthy: false, }) if err != nil { - log.Error("IndexCoord get segment info fail", zap.Int64("segID", segmentID), zap.Error(err)) + log.Warn("IndexCoord get segment info fail", zap.Int64("segID", segmentID), zap.Error(err)) return nil, err } if resp.Status.GetErrorCode() != commonpb.ErrorCode_Success { - log.Error("IndexCoord get segment info fail", zap.Int64("segID", segmentID), + log.Warn("IndexCoord get segment info fail", zap.Int64("segID", segmentID), zap.String("fail reason", resp.Status.GetReason())) if resp.Status.GetReason() == msgSegmentNotFound(segmentID) { return nil, errSegmentNotFound(segmentID) @@ -1304,6 +1304,6 @@ func (i *IndexCoord) pullSegmentInfo(ctx context.Context, segmentID UniqueID) (* } } errMsg := msgSegmentNotFound(segmentID) - log.Error(errMsg) + log.Warn(errMsg) return nil, errSegmentNotFound(segmentID) } diff --git a/internal/indexcoord/meta_table.go b/internal/indexcoord/meta_table.go index e99f4e697..76d9fc34c 100644 --- a/internal/indexcoord/meta_table.go +++ b/internal/indexcoord/meta_table.go @@ -1038,6 +1038,8 @@ func (mt *metaTable) FinishTask(taskInfo *indexpb.IndexTaskInfo) error { } mt.updateIndexTasksMetrics() + log.Info("finish index task success", zap.Int64("buildID", taskInfo.BuildID), + zap.String("state", taskInfo.GetState().String()), zap.String("fail reason", taskInfo.GetFailReason())) return nil } -- GitLab