未验证 提交 6886cc3f 编写于 作者: J Jiquan Long 提交者: GitHub

Fix rpc log of Proxy (#20409)

Signed-off-by: Nlongjiquan <jiquan.long@zilliz.com>
Signed-off-by: Nlongjiquan <jiquan.long@zilliz.com>
上级 2aba154a
......@@ -98,7 +98,7 @@ func (ticker *channelsTimeTickerImpl) tick() error {
stats, err2 := ticker.getStatisticsFunc()
if err2 != nil {
log.Debug("Proxy channelsTimeTickerImpl failed to getStatistics", zap.Error(err2))
log.Warn("failed to get tt statistics", zap.Error(err))
return nil
}
......
......@@ -1851,7 +1851,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get
defer sp.Finish()
traceID, _, _ := trace.InfoFromSpan(sp)
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
logger.Info(
logger.Debug(
rpcReceived(method),
zap.String("traceID", traceID),
zap.Any("request", request))
......@@ -1899,7 +1899,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get
}
}
logger.Info(
logger.Debug(
rpcDone(method),
zap.String("traceID", traceID),
zap.Any("request", request))
......@@ -2464,8 +2464,8 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
defer sp.Finish()
traceID, _, _ := trace.InfoFromSpan(sp)
log.Info("Start processing insert request in Proxy", zap.String("traceID", traceID))
defer log.Info("Finish processing insert request in Proxy", zap.String("traceID", traceID))
log.Debug("Start processing insert request in Proxy", zap.String("traceID", traceID))
defer log.Debug("Finish processing insert request in Proxy", zap.String("traceID", traceID))
if !node.checkHealthy() {
return &milvuspb.MutationResult{
......@@ -2538,7 +2538,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
zap.String("traceID", traceID))
if err := node.sched.dmQueue.Enqueue(it); err != nil {
log.Debug("Failed to enqueue insert task: " + err.Error())
log.Warn("Failed to enqueue insert task: " + err.Error())
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.AbandonLabel).Inc()
return constructFailedResponse(err), nil
......@@ -2556,7 +2556,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
zap.String("traceID", traceID))
if err := it.WaitToFinish(); err != nil {
log.Debug("Failed to execute insert task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
log.Warn("Failed to execute insert task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.FailLabel).Inc()
return constructFailedResponse(err), nil
......@@ -2592,8 +2592,8 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
defer sp.Finish()
traceID, _, _ := trace.InfoFromSpan(sp)
log.Info("Start processing delete request in Proxy", zap.String("traceID", traceID))
defer log.Info("Finish processing delete request in Proxy", zap.String("traceID", traceID))
log.Debug("Start processing delete request in Proxy", zap.String("traceID", traceID))
defer log.Debug("Finish processing delete request in Proxy", zap.String("traceID", traceID))
receiveSize := proto.Size(request)
rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(receiveSize))
......@@ -2722,7 +2722,7 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
travelTs := request.TravelTimestamp
guaranteeTs := request.GuaranteeTimestamp
log.Ctx(ctx).Info(
log.Ctx(ctx).Debug(
rpcReceived(method),
zap.String("role", typeutil.ProxyRole),
zap.String("db", request.DbName),
......@@ -2970,7 +2970,7 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.TotalLabel).Inc()
log.Ctx(ctx).Info(
log.Ctx(ctx).Debug(
rpcReceived(method),
zap.String("role", typeutil.ProxyRole),
zap.String("db", request.DbName),
......@@ -3595,14 +3595,14 @@ func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milv
// TODO(wxyu): change name RequestType to Request
drt, err := parseDummyRequestType(req.RequestType)
if err != nil {
log.Debug("Failed to parse dummy request type")
log.Warn("Failed to parse dummy request type", zap.Error(err))
return failedResponse, nil
}
if drt.RequestType == "query" {
drr, err := parseDummyQueryRequest(req.RequestType)
if err != nil {
log.Debug("Failed to parse dummy query request")
log.Warn("Failed to parse dummy query request", zap.Error(err))
return failedResponse, nil
}
......@@ -3615,7 +3615,7 @@ func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milv
_, err = node.Query(ctx, request)
if err != nil {
log.Debug("Failed to execute dummy query")
log.Warn("Failed to execute dummy query", zap.Error(err))
return failedResponse, err
}
......@@ -3723,7 +3723,7 @@ func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReque
return metrics, nil
}
log.Debug("Proxy.GetMetrics failed, request metric type is not implemented yet",
log.Warn("Proxy.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("node_id", paramtable.GetNodeID()),
zap.String("req", req.Request),
zap.String("metric_type", metricType))
......@@ -3800,7 +3800,7 @@ func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetrics
return proxyMetrics, nil
}
log.Debug("Proxy.GetProxyMetrics failed, request metric type is not implemented yet",
log.Warn("Proxy.GetProxyMetrics failed, request metric type is not implemented yet",
zap.Int64("node_id", paramtable.GetNodeID()),
zap.String("req", req.Request),
zap.String("metric_type", metricType))
......@@ -3864,7 +3864,7 @@ func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceReq
// GetReplicas gets replica info
func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
log.Info("received get replicas request")
log.Debug("received get replicas request", zap.Int64("collection", req.GetCollectionID()), zap.Bool("with shard nodes", req.GetWithShardNodes()))
resp := &milvuspb.GetReplicasResponse{}
if !node.checkHealthy() {
resp.Status = unhealthyStatus()
......@@ -3883,13 +3883,13 @@ func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReq
resp.Status.Reason = err.Error()
return resp, nil
}
log.Info("received get replicas response", zap.Any("resp", resp), zap.Error(err))
log.Debug("received get replicas response", zap.Any("resp", resp), zap.Error(err))
return resp, nil
}
// GetCompactionState gets the compaction state of multiple segments
func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
log.Info("received GetCompactionState request", zap.Int64("compactionID", req.GetCompactionID()))
log.Debug("received GetCompactionState request", zap.Int64("compactionID", req.GetCompactionID()))
resp := &milvuspb.GetCompactionStateResponse{}
if !node.checkHealthy() {
resp.Status = unhealthyStatus()
......@@ -3897,7 +3897,7 @@ func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetComp
}
resp, err := node.dataCoord.GetCompactionState(ctx, req)
log.Info("received GetCompactionState response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
log.Debug("received GetCompactionState response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
return resp, err
}
......@@ -3917,7 +3917,7 @@ func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCom
// GetCompactionStateWithPlans returns the compactions states with the given plan ID
func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
log.Info("received GetCompactionStateWithPlans request", zap.Int64("compactionID", req.GetCompactionID()))
log.Debug("received GetCompactionStateWithPlans request", zap.Int64("compactionID", req.GetCompactionID()))
resp := &milvuspb.GetCompactionPlansResponse{}
if !node.checkHealthy() {
resp.Status = unhealthyStatus()
......@@ -3925,27 +3925,27 @@ func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvusp
}
resp, err := node.dataCoord.GetCompactionStateWithPlans(ctx, req)
log.Info("received GetCompactionStateWithPlans response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
log.Debug("received GetCompactionStateWithPlans response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
return resp, err
}
// GetFlushState gets the flush state of multiple segments
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
log.Info("received get flush state request", zap.Any("request", req))
log.Debug("received get flush state request", zap.Any("request", req))
var err error
resp := &milvuspb.GetFlushStateResponse{}
if !node.checkHealthy() {
resp.Status = unhealthyStatus()
log.Info("unable to get flush state because of closed server")
log.Warn("unable to get flush state because of closed server")
return resp, nil
}
resp, err = node.dataCoord.GetFlushState(ctx, req)
if err != nil {
log.Info("failed to get flush state response", zap.Error(err))
log.Warn("failed to get flush state response", zap.Error(err))
return nil, err
}
log.Info("received get flush state response", zap.Any("response", resp))
log.Debug("received get flush state response", zap.Any("response", resp))
return resp, err
}
......@@ -4015,7 +4015,7 @@ func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*mi
// GetImportState checks import task state from RootCoord.
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
log.Info("received get import state request", zap.Int64("taskID", req.GetTask()))
log.Debug("received get import state request", zap.Int64("taskID", req.GetTask()))
resp := &milvuspb.GetImportStateResponse{}
if !node.checkHealthy() {
resp.Status = unhealthyStatus()
......@@ -4035,7 +4035,7 @@ func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportSt
return resp, nil
}
log.Info("successfully received get import state response", zap.Int64("taskID", req.GetTask()), zap.Any("resp", resp), zap.Error(err))
log.Debug("successfully received get import state response", zap.Int64("taskID", req.GetTask()), zap.Any("resp", resp), zap.Error(err))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return resp, nil
......@@ -4043,7 +4043,7 @@ func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportSt
// ListImportTasks get id array of all import tasks from rootcoord
func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
log.Info("received list import tasks request")
log.Debug("received list import tasks request")
resp := &milvuspb.ListImportTasksResponse{}
if !node.checkHealthy() {
resp.Status = unhealthyStatus()
......@@ -4062,7 +4062,7 @@ func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImport
return resp, nil
}
log.Info("successfully received list import tasks response", zap.String("collection", req.CollectionName), zap.Any("tasks", resp.Tasks))
log.Debug("successfully received list import tasks response", zap.String("collection", req.CollectionName), zap.Any("tasks", resp.Tasks))
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return resp, err
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册