diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go index 7b7b429eb6e24776d668bf952112a35f371bab9e..2d1d27d541324c2baec573d6ac876b2840b83fec 100644 --- a/cmd/masterservice/main.go +++ b/cmd/masterservice/main.go @@ -41,7 +41,7 @@ func main() { syscall.SIGTERM, syscall.SIGQUIT) sig := <-sc - log.Info("Get signal to exit", zap.String("signal", sig.String())) + log.Debug("Get signal to exit", zap.String("signal", sig.String())) err = ms.Stop() if err != nil { panic(err) diff --git a/cmd/queryservice/queryservice.go b/cmd/queryservice/queryservice.go index 1cc29da53fdcbfe2a7c978828db00c8c50a83f58..db1d212d2d5711424aecd39de2df75145af8b075 100644 --- a/cmd/queryservice/queryservice.go +++ b/cmd/queryservice/queryservice.go @@ -2,19 +2,30 @@ package main import ( "context" - "log" "os" "os/signal" "syscall" + "go.uber.org/zap" + distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" + "github.com/zilliztech/milvus-distributed/internal/queryservice" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + queryservice.Params.Init() + log.SetupLogger(&queryservice.Params.Log) + defer func() { + if err := log.Sync(); err != nil { + panic(err) + } + }() + msFactory := pulsarms.NewFactory() svr, err := distributed.NewQueryService(ctx, msFactory) @@ -33,7 +44,7 @@ func main() { syscall.SIGTERM, syscall.SIGQUIT) sig := <-sc - log.Printf("Got %s signal to exit", sig.String()) + log.Debug("Get signal to exit", zap.String("signal", sig.String())) if err := svr.Stop(); err != nil { panic(err) diff --git a/configs/advanced/query_service.yaml b/configs/advanced/query_service.yaml new file mode 100644 index 0000000000000000000000000000000000000000..5c94293a1f21a2d891d0687b6f4257276bd3da8e --- /dev/null +++ b/configs/advanced/query_service.yaml @@ -0,0 +1,13 @@ +# Copyright (C) 2019-2020 Zilliz. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under the License. + +queryService: + nodeID: 200 diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 466bcd3bbd02cb8f5982de5ec25fccaf3e6b14be..61e195a38686de3e3845f53cff57c6a35e1c859e 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -469,7 +469,7 @@ func (s *Server) RegisterNode(ctx context.Context, req *datapb.RegisterNodeReque ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, }, } - log.Info("DataService: RegisterNode:", zap.String("IP", req.Address.Ip), zap.Int64("Port", req.Address.Port)) + log.Debug("DataService: RegisterNode:", zap.String("IP", req.Address.Ip), zap.Int64("Port", req.Address.Port)) node, err := s.newDataNode(req.Address.Ip, req.Address.Port, req.Base.SourceID) if err != nil { return nil, err diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index 0db2e0ca88a406c9a2e897985a904ee5182ebc51..2eba5e33db6906dae1d14b4d9a91be04a3e86e20 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -4,15 +4,17 @@ import ( "context" "fmt" "io" - "strconv" - "time" - "net" + "strconv" "sync" + "time" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go/config" + "go.uber.org/zap" + "google.golang.org/grpc" + dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client" isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client" @@ -25,8 +27,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/util/funcutil" - "go.uber.org/zap" - "google.golang.org/grpc" ) // grpc wrapper @@ -104,7 +104,7 @@ func (s *Server) init() error { Params.Init() ctx := context.Background() - log.Info("init params done") + log.Debug("init params done") err := s.startGrpc() if err != nil { @@ -114,7 +114,7 @@ func (s *Server) init() error { s.core.UpdateStateCode(internalpb2.StateCode_INITIALIZING) if s.connectProxyService { - log.Info("proxy service", zap.String("address", Params.ProxyServiceAddress)) + log.Debug("proxy service", zap.String("address", Params.ProxyServiceAddress)) proxyService := psc.NewClient(Params.ProxyServiceAddress) if err := proxyService.Init(); err != nil { panic(err) @@ -130,7 +130,7 @@ func (s *Server) init() error { } } if s.connectDataService { - log.Info("data service", zap.String("address", Params.DataServiceAddress)) + log.Debug("data service", zap.String("address", Params.DataServiceAddress)) dataService := dsc.NewClient(Params.DataServiceAddress) if err := dataService.Init(); err != nil { panic(err) @@ -148,7 +148,7 @@ func (s *Server) init() error { } } if s.connectIndexService { - log.Info("index service", zap.String("address", Params.IndexServiceAddress)) + log.Debug("index service", zap.String("address", Params.IndexServiceAddress)) indexService := isc.NewClient(Params.IndexServiceAddress) if err := indexService.Init(); err != nil { panic(err) @@ -175,7 +175,7 @@ func (s *Server) init() error { } } cms.Params.Init() - log.Info("grpc init done ...") + log.Debug("grpc init done ...") if err := s.core.Init(); err != nil { return err @@ -195,10 +195,10 @@ func (s *Server) startGrpcLoop(grpcPort int) { defer s.wg.Done() - log.Info("start grpc ", zap.Int("port", grpcPort)) + log.Debug("start grpc ", zap.Int("port", grpcPort)) lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort)) if err != nil { - log.Warn("GrpcServer:failed to listen", zap.String("error", err.Error())) + log.Error("GrpcServer:failed to listen", zap.String("error", err.Error())) s.grpcErrChan <- err return } @@ -221,7 +221,7 @@ func (s *Server) startGrpcLoop(grpcPort int) { } func (s *Server) start() error { - log.Info("Master Core start ...") + log.Debug("Master Core start ...") if err := s.core.Start(); err != nil { return err } diff --git a/internal/distributed/queryservice/client/client.go b/internal/distributed/queryservice/client/client.go index ee968aa62b1be4283dda4bd87feac6c23a6bcc76..69c90c0f52ea047add6307dc63bd1d5083851027 100644 --- a/internal/distributed/queryservice/client/client.go +++ b/internal/distributed/queryservice/client/client.go @@ -2,13 +2,14 @@ package grpcqueryserviceclient import ( "context" - "log" "time" - "google.golang.org/grpc" - otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" + "go.uber.org/zap" + "google.golang.org/grpc" + + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" @@ -55,7 +56,7 @@ func (c *Client) Init() error { } c.grpcClient = querypb.NewQueryServiceClient(c.conn) - log.Printf("connected to queryService, queryService=%s", c.addr) + log.Debug("connected to queryService", zap.String("queryService", c.addr)) return nil } diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index d9156283722eab4a66ad26ae5fd047b88e253a6f..a51b99a8af615e65189dbf321d95261f672c8610 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -2,7 +2,6 @@ package grpcqueryservice import ( "context" - "log" "net" "strconv" "sync" @@ -10,18 +9,19 @@ import ( otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing/opentracing-go" - dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client" - msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client" - "github.com/zilliztech/milvus-distributed/internal/util/funcutil" - + "go.uber.org/zap" "google.golang.org/grpc" + dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client" + msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" qs "github.com/zilliztech/milvus-distributed/internal/queryservice" + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" ) type Server struct { @@ -62,7 +62,7 @@ func (s *Server) Run() error { if err := s.init(); err != nil { return err } - log.Println("queryservice init done ...") + log.Debug("queryservice init done ...") if err := s.start(); err != nil { return err @@ -82,8 +82,8 @@ func (s *Server) init() error { } // --- Master Server Client --- - log.Println("Master service address:", Params.MasterAddress) - log.Println("Init master service client ...") + log.Debug("Master service", zap.String("address", Params.MasterAddress)) + log.Debug("Init master service client ...") masterService, err := msc.NewClient(Params.MasterAddress, 20*time.Second) @@ -109,8 +109,8 @@ func (s *Server) init() error { } // --- Data service client --- - log.Println("DataService Address:", Params.DataServiceAddress) - log.Println("QueryService Init data service client ...") + log.Debug("DataService", zap.String("Address", Params.DataServiceAddress)) + log.Debug("QueryService Init data service client ...") dataService := dsc.NewClient(Params.DataServiceAddress) if err = dataService.Init(); err != nil { @@ -140,10 +140,10 @@ func (s *Server) startGrpcLoop(grpcPort int) { defer s.wg.Done() - log.Println("network port: ", grpcPort) + log.Debug("network", zap.String("port", strconv.Itoa(grpcPort))) lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort)) if err != nil { - log.Printf("GrpcServer:failed to listen: %v", err) + log.Debug("GrpcServer:failed to listen:", zap.String("error", err.Error())) s.grpcErrChan <- err return } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 5315bc1907d930ec49e899572abb1ea3d2c9895b..86cc81ffef0eba1426bacfd08a45eaf8c62f793a 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -8,6 +8,9 @@ import ( "sync/atomic" "time" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" + "github.com/zilliztech/milvus-distributed/internal/allocator" "github.com/zilliztech/milvus-distributed/internal/errors" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" @@ -25,8 +28,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/retry" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "go.etcd.io/etcd/clientv3" - "go.uber.org/zap" ) // internalpb2 -> internalpb @@ -266,9 +267,9 @@ func (c *Core) checkInit() error { return errors.Errorf("ReleaseCollection is nil") } - log.Info("master", zap.Int64("node id", int64(Params.NodeID))) - log.Info("master", zap.String("dd channel name", Params.DdChannel)) - log.Info("master", zap.String("time tick channel name", Params.TimeTickChannel)) + log.Debug("master", zap.Int64("node id", int64(Params.NodeID))) + log.Debug("master", zap.String("dd channel name", Params.DdChannel)) + log.Debug("master", zap.String("time tick channel name", Params.TimeTickChannel)) return nil } @@ -276,11 +277,11 @@ func (c *Core) startDdScheduler() { for { select { case <-c.ctx.Done(): - log.Info("close dd scheduler, exit task execution loop") + log.Debug("close dd scheduler, exit task execution loop") return case task, ok := <-c.ddReqQueue: if !ok { - log.Info("dd chan is closed, exit task execution loop") + log.Debug("dd chan is closed, exit task execution loop") return } ts, err := task.Ts() @@ -305,11 +306,11 @@ func (c *Core) startTimeTickLoop() { for { select { case <-c.ctx.Done(): - log.Info("close master time tick loop") + log.Debug("close master time tick loop") return case tt, ok := <-c.ProxyTimeTickChan: if !ok { - log.Info("proxyTimeTickStream is closed, exit time tick loop") + log.Warn("proxyTimeTickStream is closed, exit time tick loop") return } if tt <= c.lastTimeTick { @@ -328,11 +329,11 @@ func (c *Core) startDataServiceSegmentLoop() { for { select { case <-c.ctx.Done(): - log.Info("close data service segment loop") + log.Debug("close data service segment loop") return case seg, ok := <-c.DataServiceSegmentChan: if !ok { - log.Info("data service segment is closed, exit loop") + log.Debug("data service segment is closed, exit loop") return } if seg == nil { @@ -352,11 +353,11 @@ func (c *Core) startCreateIndexLoop() { for { select { case <-c.ctx.Done(): - log.Info("close create index loop") + log.Debug("close create index loop") return case t, ok := <-c.indexTaskQueue: if !ok { - log.Info("index task chan has closed, exit loop") + log.Debug("index task chan has closed, exit loop") return } if err := t.BuildIndex(); err != nil { @@ -372,11 +373,11 @@ func (c *Core) startSegmentFlushCompletedLoop() { for { select { case <-c.ctx.Done(): - log.Info("close segment flush completed loop") + log.Debug("close segment flush completed loop") return case seg, ok := <-c.DataNodeSegmentFlushCompletedChan: if !ok { - log.Info("data node segment flush completed chan has colsed, exit loop") + log.Debug("data node segment flush completed chan has colsed, exit loop") } coll, err := c.MetaTable.GetCollectionBySegmentID(seg) if err != nil { @@ -425,7 +426,7 @@ func (c *Core) tsLoop() { } case <-ctx.Done(): // Server is closed and it should return nil. - log.Info("tsLoop is closed") + log.Debug("tsLoop is closed") return } } @@ -645,7 +646,7 @@ func (c *Core) SetProxyService(ctx context.Context, s ProxyServiceInterface) err return err } Params.ProxyTimeTickChannel = rsp.Value - log.Info("proxy time tick", zap.String("channel name", Params.ProxyTimeTickChannel)) + log.Debug("proxy time tick", zap.String("channel name", Params.ProxyTimeTickChannel)) c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error { status, _ := s.InvalidateCollectionMetaCache(ctx, &proxypb.InvalidateCollMetaCacheRequest{ @@ -675,7 +676,7 @@ func (c *Core) SetDataService(ctx context.Context, s DataServiceInterface) error return err } Params.DataServiceSegmentChannel = rsp.Value - log.Info("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel)) + log.Debug("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel)) c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { ts, err := c.tsoAllocator.Alloc(1) @@ -797,7 +798,7 @@ func (c *Core) Init() error { initError = c.setMsgStreams() }) if initError == nil { - log.Info("Master service", zap.String("State Code", internalpb2.StateCode_name[int32(internalpb2.StateCode_INITIALIZING)])) + log.Debug("Master service", zap.String("State Code", internalpb2.StateCode_name[int32(internalpb2.StateCode_INITIALIZING)])) } return initError } @@ -815,7 +816,7 @@ func (c *Core) Start() error { go c.tsLoop() c.stateCode.Store(internalpb2.StateCode_HEALTHY) }) - log.Info("Master service", zap.String("State Code", internalpb2.StateCode_name[int32(internalpb2.StateCode_HEALTHY)])) + log.Debug("Master service", zap.String("State Code", internalpb2.StateCode_name[int32(internalpb2.StateCode_HEALTHY)])) return nil } @@ -827,7 +828,7 @@ func (c *Core) Stop() error { func (c *Core) GetComponentStates(ctx context.Context) (*internalpb2.ComponentStates, error) { code := c.stateCode.Load().(internalpb2.StateCode) - log.Info("GetComponentStates", zap.String("State Code", internalpb2.StateCode_name[int32(code)])) + log.Debug("GetComponentStates", zap.String("State Code", internalpb2.StateCode_name[int32(code)])) return &internalpb2.ComponentStates{ State: &internalpb2.ComponentInfo{ diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index 890736b03a665da0e8204efbe2cc13e99c726091..092efad014a40d4b44372b6dab54422c06692764 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -6,6 +6,8 @@ import ( "sync" "github.com/golang/protobuf/proto" + "go.uber.org/zap" + "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" "github.com/zilliztech/milvus-distributed/internal/log" @@ -14,7 +16,6 @@ import ( pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "go.uber.org/zap" ) const ( diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index 9fb62588051a4771e0f28ae857c65aea09e6b005..c83322db2bf04bb0089bcc209129931bf65f166d 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -4,6 +4,8 @@ import ( "fmt" "github.com/golang/protobuf/proto" + "go.uber.org/zap" + "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" @@ -12,7 +14,6 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" - "go.uber.org/zap" ) type reqTask interface { diff --git a/internal/queryservice/param_table.go b/internal/queryservice/param_table.go index 392da166e23acd89d0809d1a181e87fe7dcb9c3d..7383969a885818d9a50e3e309ad8ffb6753f14c4 100644 --- a/internal/queryservice/param_table.go +++ b/internal/queryservice/param_table.go @@ -1,8 +1,12 @@ package queryservice import ( + "fmt" + "path" + "strconv" "sync" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/util/paramtable" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) @@ -12,6 +16,8 @@ type UniqueID = typeutil.UniqueID type ParamTable struct { paramtable.BaseTable + NodeID uint64 + Address string QueryServiceID UniqueID @@ -20,6 +26,8 @@ type ParamTable struct { // timetick TimeTickChannelName string + + Log log.Config } var Params ParamTable @@ -33,17 +41,60 @@ func (p *ParamTable) Init() { panic(err) } + err = p.LoadYaml("advanced/query_service.yaml") + if err != nil { + panic(err) + } + err = p.LoadYaml("milvus.yaml") if err != nil { panic(err) } + p.initNodeID() + p.initLogCfg() + p.initStatsChannelName() p.initTimeTickChannelName() p.initQueryServiceAddress() }) } +func (p *ParamTable) initNodeID() { + p.NodeID = uint64(p.ParseInt64("queryService.nodeID")) +} + +func (p *ParamTable) initLogCfg() { + p.Log = log.Config{} + format, err := p.Load("log.format") + if err != nil { + panic(err) + } + p.Log.Format = format + level, err := p.Load("log.level") + if err != nil { + panic(err) + } + p.Log.Level = level + devStr, err := p.Load("log.dev") + if err != nil { + panic(err) + } + dev, err := strconv.ParseBool(devStr) + if err != nil { + panic(err) + } + p.Log.Development = dev + p.Log.File.MaxSize = p.ParseInt("log.file.maxSize") + p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups") + p.Log.File.MaxDays = p.ParseInt("log.file.maxAge") + rootPath, err := p.Load("log.file.rootPath") + if err != nil { + panic(err) + } + p.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("masterservice-%d.log", p.NodeID)) +} + func (p *ParamTable) initStatsChannelName() { channels, err := p.Load("msgChannel.chanNamePrefix.queryNodeStats") if err != nil { diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index fed410a6a473422eb8a91af840cfbc59054e0da6..e21881849a7f90851f568ef8ab76dbe744578a29 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -12,8 +12,11 @@ import ( "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go/config" + "go.uber.org/zap" + nodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/querynode/client" "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -142,7 +145,7 @@ func (qs *QueryService) GetStatisticsChannel(ctx context.Context) (*milvuspb.Str } func (qs *QueryService) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { - fmt.Println("register query node =", req.Address) + log.Debug("register query node", zap.String("address", req.Address.String())) // TODO:: add mutex nodeID := req.Base.SourceID if _, ok := qs.queryNodes[nodeID]; ok { @@ -201,7 +204,7 @@ func (qs *QueryService) RegisterNode(ctx context.Context, req *querypb.RegisterN func (qs *QueryService) ShowCollections(ctx context.Context, req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error) { dbID := req.DbID - fmt.Println("show collection start, dbID = ", dbID) + log.Debug("show collection start, dbID = ", zap.String("dbID", strconv.FormatInt(dbID, 10))) collections, err := qs.replica.getCollections(dbID) collectionIDs := make([]UniqueID, 0) for _, collection := range collections { @@ -215,7 +218,7 @@ func (qs *QueryService) ShowCollections(ctx context.Context, req *querypb.ShowCo }, }, err } - fmt.Println("show collection end") + log.Debug("show collection end") return &querypb.ShowCollectionResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -240,10 +243,10 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol } } - fmt.Println("load collection start, collectionID = ", collectionID) + log.Debug("load collection start", zap.String("collectionID", fmt.Sprintln(collectionID))) _, err := qs.replica.getCollectionByID(dbID, collectionID) if err == nil { - fmt.Println("load collection end, collection already exist, collectionID = ", collectionID) + log.Error("load collection end, collection already exist", zap.String("collectionID", fmt.Sprintln(collectionID))) return fn(nil), nil } @@ -284,17 +287,17 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol status, err := qs.LoadPartitions(ctx, loadPartitionsRequest) - fmt.Println("load collection end, collectionID = ", collectionID) + log.Debug("load collection end", zap.String("collectionID", fmt.Sprintln(collectionID))) return status, err } func (qs *QueryService) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { dbID := req.DbID collectionID := req.CollectionID - fmt.Println("release collection start, collectionID = ", collectionID) + log.Debug("release collection start", zap.String("collectionID", fmt.Sprintln(collectionID))) _, err := qs.replica.getCollectionByID(dbID, collectionID) if err != nil { - fmt.Println("release collection end, query service don't have the log of collection ", collectionID) + log.Error("release collection end, query service don't have the log of", zap.String("collectionID", fmt.Sprintln(collectionID))) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, }, nil @@ -303,7 +306,7 @@ func (qs *QueryService) ReleaseCollection(ctx context.Context, req *querypb.Rele for nodeID, node := range qs.queryNodes { status, err := node.ReleaseCollection(ctx, req) if err != nil { - fmt.Println("release collection end, node ", nodeID, " occur error") + log.Error("release collection end, node occur error", zap.String("nodeID", fmt.Sprintln(nodeID))) return status, err } } @@ -316,7 +319,7 @@ func (qs *QueryService) ReleaseCollection(ctx context.Context, req *querypb.Rele }, err } - fmt.Println("release collection end") + log.Debug("release collection end") //TODO:: queryNode cancel subscribe dmChannels return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -365,7 +368,7 @@ func (qs *QueryService) LoadPartitions(ctx context.Context, req *querypb.LoadPar ErrorCode: commonpb.ErrorCode_SUCCESS, } } - fmt.Println("load partitions start, partitionIDs = ", partitionIDs) + log.Debug("load partitions start", zap.String("partitionIDs", fmt.Sprintln(partitionIDs))) if len(partitionIDs) == 0 { err := errors.New("partitionIDs are empty") @@ -430,7 +433,7 @@ func (qs *QueryService) LoadPartitions(ctx context.Context, req *querypb.LoadPar return fn(err), err } for _, state := range resp.States { - fmt.Println("segment ", state.SegmentID, " 's state is ", state.StartPosition) + log.Error("segment ", zap.String("state.SegmentID", fmt.Sprintln(state.SegmentID)), zap.String("state", fmt.Sprintln(state.StartPosition))) segmentID := state.SegmentID segmentStates[segmentID] = state channelName := state.StartPosition.ChannelName @@ -475,7 +478,7 @@ func (qs *QueryService) LoadPartitions(ctx context.Context, req *querypb.LoadPar qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory) } - fmt.Println("load partitions end, partitionIDs = ", partitionIDs) + log.Debug("load partitions end", zap.String("partitionIDs", fmt.Sprintln(partitionIDs))) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, }, nil @@ -485,7 +488,7 @@ func (qs *QueryService) ReleasePartitions(ctx context.Context, req *querypb.Rele dbID := req.DbID collectionID := req.CollectionID partitionIDs := req.PartitionIDs - fmt.Println("start release partitions start, partitionIDs = ", partitionIDs) + log.Debug("start release partitions start", zap.String("partitionIDs", fmt.Sprintln(partitionIDs))) toReleasedPartitionID := make([]UniqueID, 0) for _, partitionID := range partitionIDs { _, err := qs.replica.getPartitionByID(dbID, collectionID, partitionID) @@ -513,7 +516,7 @@ func (qs *QueryService) ReleasePartitions(ctx context.Context, req *querypb.Rele } } - fmt.Println("start release partitions end") + log.Debug("start release partitions end") //TODO:: queryNode cancel subscribe dmChannels return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -535,9 +538,9 @@ func (qs *QueryService) CreateQueryChannel(ctx context.Context) (*querypb.Create RequestChannelID: allocatedQueryChannel, ResultChannelID: allocatedQueryResultChannel, } - fmt.Println("query service create query channel, queryChannelName = ", allocatedQueryChannel) + log.Debug("query service create query channel", zap.String("queryChannelName", allocatedQueryChannel)) for nodeID, node := range qs.queryNodes { - fmt.Println("node ", nodeID, " watch query channel") + log.Debug("node watch query channel", zap.String("nodeID", fmt.Sprintln(nodeID))) fn := func() error { _, err := node.AddQueryChannel(ctx, addQueryChannelsRequest) return err @@ -713,7 +716,7 @@ func (qs *QueryService) watchDmChannels(dbID UniqueID, collectionID UniqueID) er if err != nil { return err } - fmt.Println("query node ", nodeID, "watch channels = ", channels) + log.Debug("query node ", zap.String("nodeID", strconv.FormatInt(nodeID, 10)), zap.String("watch channels", fmt.Sprintln(channels))) node.AddDmChannels(channels) }