diff --git a/internal/datacoord/index_builder.go b/internal/datacoord/index_builder.go index 1a3b298e422f04659233dfce79b8821097d8ca5a..0e660cd6357c380d9d4637b74920a39dbd497211 100644 --- a/internal/datacoord/index_builder.go +++ b/internal/datacoord/index_builder.go @@ -247,7 +247,7 @@ func (ib *indexBuilder) process(buildID UniqueID) bool { // if all IndexNodes are executing task, wait for one of them to finish the task. nodeID, client := ib.nodeManager.PeekClient(meta) if client == nil { - log.Ctx(ib.ctx).RatedInfo(5, "index builder peek client error, there is no available") + log.Ctx(ib.ctx).WithRateGroup("dc.indexBuilder", 1, 60).RatedInfo(5, "index builder peek client error, there is no available") return false } // update version and set nodeID diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 2126c7c10ce8c5052f15540cc845add7662d4dee..9e734ee11f01f26954fad82ec87b00571a3a1b6a 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -632,6 +632,7 @@ func (s *Server) handleDataNodeTimetickMsgstream(ctx context.Context, ttMsgStrea } func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.DataNodeTtMsg) error { + log := log.Ctx(ctx).WithRateGroup("dc.handleTimetick", 1, 60) ch := ttMsg.GetChannelName() ts := ttMsg.GetTimestamp() physical, _ := tsoutil.ParseTS(ts) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 6ea858f2921bb53eb9c4da7c494a36676b124b1c..1626b43a9d0662fd1197ffd47f67eb56c343690a 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1139,6 +1139,7 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq // GetFlushState gets the flush state of multiple segments func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) { + log := log.Ctx(ctx).WithRateGroup("dc.GetFlushState", 1, 60) resp := &milvuspb.GetFlushStateResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}} if s.isClosed() { log.Warn("DataCoord receive GetFlushState request, server closed", @@ -1159,7 +1160,7 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR } if len(unflushed) != 0 { - log.RatedInfo(10, "DataCoord receive GetFlushState request, Flushed is false", zap.Int64s("segmentIDs", unflushed), zap.Int("len", len(unflushed))) + log.RatedInfo(10, "DataCoord receive GetFlushState request, Flushed is false", zap.Int64s("unflushed", unflushed), zap.Int("len", len(unflushed))) resp.Flushed = false } else { log.Info("DataCoord receive GetFlushState request, Flushed is true", zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.Int("len", len(req.GetSegmentIDs()))) diff --git a/internal/indexnode/indexnode_service.go b/internal/indexnode/indexnode_service.go index ce2d00bd0eaee77a0c7d713f230cf78ab2a37487..e7af4dbeb0730147004bb11feca2bf8439dddcb4 100644 --- a/internal/indexnode/indexnode_service.go +++ b/internal/indexnode/indexnode_service.go @@ -117,9 +117,12 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest } func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest) (*indexpb.QueryJobsResponse, error) { + log := log.Ctx(ctx).With( + zap.String("ClusterID", req.GetClusterID()), + ).WithRateGroup("in.queryJobs", 1, 60) if !i.lifetime.Add(commonpbutil.IsHealthyOrStopping) { stateCode := i.lifetime.GetState() - log.Ctx(ctx).Warn("index node not ready", zap.String("state", stateCode.String()), zap.String("ClusterID", req.ClusterID)) + log.Warn("index node not ready", zap.String("state", stateCode.String())) return &indexpb.QueryJobsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -159,7 +162,7 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest ret.IndexInfos[i].IndexFileKeys = info.fileKeys ret.IndexInfos[i].SerializedSize = info.serializedSize ret.IndexInfos[i].FailReason = info.failReason - log.RatedDebug(5, "querying index build task", zap.String("ClusterID", req.ClusterID), + log.RatedDebug(5, "querying index build task", zap.Int64("IndexBuildID", buildID), zap.String("state", info.state.String()), zap.String("fail reason", info.failReason)) } diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 8213b1962d0a76cc364945a7a6208c7a82f8980d..6a665f34572d2072b70291b12e3eb47414cef179 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -17,6 +17,7 @@ package balance import ( + "context" "sort" "github.com/samber/lo" @@ -105,6 +106,7 @@ func (b *RowCountBasedBalancer) Balance() ([]SegmentAssignPlan, []ChannelAssignP } func (b *RowCountBasedBalancer) balanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan) { + log := log.Ctx(context.Background()).WithRateGroup("qcv2.rowCountBalancer", 1.0, 60.0) nodes := replica.GetNodes() if len(nodes) == 0 { return nil, nil diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index d59509a7ae34c546c31a4d6db363c657ab24bcd1..2a3e0c35726fc2fdf8d159aa43c73d8bd7f42e21 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -56,7 +56,7 @@ type distHandler struct { func (dh *distHandler) start(ctx context.Context) { defer dh.wg.Done() - log := log.Ctx(ctx).With(zap.Int64("nodeID", dh.nodeID)).WithRateGroup("qnv2.distHandler", 1, 60) + log := log.Ctx(ctx).With(zap.Int64("nodeID", dh.nodeID)).WithRateGroup("qcv2.distHandler", 1, 60) log.Info("start dist handler") ticker := time.NewTicker(Params.QueryCoordCfg.DistPullInterval.GetAsDuration(time.Millisecond)) defer ticker.Stop() diff --git a/internal/querycoordv2/observers/replica_observer.go b/internal/querycoordv2/observers/replica_observer.go index 693ebe2742cd140c25a94377394029679ddf19e7..a8c775371580e4edabfead51a24504ecf0e4683f 100644 --- a/internal/querycoordv2/observers/replica_observer.go +++ b/internal/querycoordv2/observers/replica_observer.go @@ -81,6 +81,7 @@ func (ob *ReplicaObserver) schedule(ctx context.Context) { } func (ob *ReplicaObserver) checkNodesInReplica() { + log := log.Ctx(context.Background()).WithRateGroup("qcv2.replicaObserver", 1, 60) collections := ob.meta.GetAll() for _, collectionID := range collections { removedNodes := make([]int64, 0) diff --git a/internal/tso/tso.go b/internal/tso/tso.go index 8a5e0389c8b6397b9a0fd24fc6997275ed1d5ce2..17d7b3ba90412bec02005012a62bc02092771e48 100644 --- a/internal/tso/tso.go +++ b/internal/tso/tso.go @@ -30,6 +30,7 @@ package tso import ( + "context" "sync/atomic" "time" "unsafe" @@ -174,7 +175,7 @@ func (t *timestampOracle) UpdateTimestamp() error { jetLag := typeutil.SubTimeByWallClock(now, prev.physical) if jetLag > 3*UpdateTimestampStep { - log.RatedWarn(60.0, "clock offset is huge, check network latency and clock skew", zap.Duration("jet-lag", jetLag), + log.Ctx(context.TODO()).WithRateGroup("tso", 1, 60).RatedWarn(60.0, "clock offset is huge, check network latency and clock skew", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now)) }