diff --git a/cmd/masterservice/main.go b/cmd/masterservice/main.go index 34b146ecd49bb83ba8c84f66df67f5aa48cc9e0f..7b7b429eb6e24776d668bf952112a35f371bab9e 100644 --- a/cmd/masterservice/main.go +++ b/cmd/masterservice/main.go @@ -2,19 +2,29 @@ package main import ( "context" - "log" "os" "os/signal" "syscall" distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components" + "github.com/zilliztech/milvus-distributed/internal/log" + "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" + "go.uber.org/zap" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + masterservice.Params.Init() + log.SetupLogger(&masterservice.Params.Log) + defer func() { + if err := log.Sync(); err != nil { + panic(err) + } + }() + msFactory := pulsarms.NewFactory() ms, err := distributed.NewMasterService(ctx, msFactory) if err != nil { @@ -31,7 +41,7 @@ func main() { syscall.SIGTERM, syscall.SIGQUIT) sig := <-sc - log.Printf("Got %s signal to exit", sig.String()) + log.Info("Get signal to exit", zap.String("signal", sig.String())) err = ms.Stop() if err != nil { panic(err) diff --git a/internal/distributed/masterservice/server.go b/internal/distributed/masterservice/server.go index 0b6dd5a2d511d70fb3f4054d8670b37366acfb60..da86d6506a1f44590e03c90812744251dfeacf7e 100644 --- a/internal/distributed/masterservice/server.go +++ b/internal/distributed/masterservice/server.go @@ -4,27 +4,27 @@ import ( "context" "fmt" "io" - "log" "strconv" "time" "net" "sync" + "github.com/opentracing/opentracing-go" + "github.com/uber/jaeger-client-go/config" dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client" isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" psc "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client" qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" - "github.com/zilliztech/milvus-distributed/internal/util/funcutil" - - "github.com/opentracing/opentracing-go" - "github.com/uber/jaeger-client-go/config" + "github.com/zilliztech/milvus-distributed/internal/log" cms "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/util/funcutil" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -102,7 +102,7 @@ func (s *Server) Run() error { func (s *Server) init() error { Params.Init() - log.Println("init params done") + log.Info("init params done") err := s.startGrpc() if err != nil { @@ -112,7 +112,7 @@ func (s *Server) init() error { s.core.UpdateStateCode(internalpb2.StateCode_INITIALIZING) if s.connectProxyService { - log.Printf("proxy service address : %s", Params.ProxyServiceAddress) + log.Info("proxy service", zap.String("address", Params.ProxyServiceAddress)) proxyService := psc.NewClient(Params.ProxyServiceAddress) if err := proxyService.Init(); err != nil { panic(err) @@ -128,7 +128,7 @@ func (s *Server) init() error { } } if s.connectDataService { - log.Printf("data service address : %s", Params.DataServiceAddress) + log.Info("data service", zap.String("address", Params.DataServiceAddress)) dataService := dsc.NewClient(Params.DataServiceAddress) if err := dataService.Init(); err != nil { panic(err) @@ -146,7 +146,7 @@ func (s *Server) init() error { } } if s.connectIndexService { - log.Printf("index service address : %s", Params.IndexServiceAddress) + log.Info("index service", zap.String("address", Params.IndexServiceAddress)) indexService := isc.NewClient(Params.IndexServiceAddress) if err := indexService.Init(); err != nil { panic(err) @@ -173,7 +173,7 @@ func (s *Server) init() error { } } cms.Params.Init() - log.Println("grpc init done ...") + log.Info("grpc init done ...") if err := s.core.Init(); err != nil { return err @@ -193,10 +193,10 @@ func (s *Server) startGrpcLoop(grpcPort int) { defer s.wg.Done() - log.Println("network port: ", grpcPort) + log.Info("start grpc ", zap.Int("port", grpcPort)) lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort)) if err != nil { - log.Printf("GrpcServer:failed to listen: %v", err) + log.Warn("GrpcServer:failed to listen", zap.String("error", err.Error())) s.grpcErrChan <- err return } @@ -215,7 +215,7 @@ func (s *Server) startGrpcLoop(grpcPort int) { } func (s *Server) start() error { - log.Println("Master Core start ...") + log.Info("Master Core start ...") if err := s.core.Start(); err != nil { return err } diff --git a/internal/masterservice/global_allocator.go b/internal/masterservice/global_allocator.go index 6859d2e1292c385300a92283b57303688006de57..b757e2bbd856ff266a567b2a4005e05abb2ff2d5 100644 --- a/internal/masterservice/global_allocator.go +++ b/internal/masterservice/global_allocator.go @@ -1,12 +1,12 @@ package masterservice import ( - "log" "sync/atomic" "time" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.uber.org/zap" @@ -77,7 +77,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { current := (*atomicObject)(atomic.LoadPointer(>a.tso.TSO)) if current == nil || current.physical.Equal(typeutil.ZeroTime) { // If it's leader, maybe SyncTimestamp hasn't completed yet - log.Println("sync hasn't completed yet, wait for a while") + log.Debug("sync hasn't completed yet, wait for a while") time.Sleep(200 * time.Millisecond) continue } @@ -85,8 +85,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (uint64, error) { physical = current.physical.UnixNano() / int64(time.Millisecond) logical = atomic.AddInt64(¤t.logical, int64(count)) if logical >= maxLogical { - log.Println("logical part outside of max logical interval, please check ntp time", - zap.Int("retry-count", i)) + log.Debug("logical part outside of max logical interval, please check ntp time", zap.Int("retry-count", i)) time.Sleep(UpdateTimestampStep) continue } diff --git a/internal/masterservice/master_service.go b/internal/masterservice/master_service.go index 1a7d804156fd48c853b4e9c3b68029d40425c913..90ea84c3d3d9f0f3da77d2f50b0a2d03ed96a439 100644 --- a/internal/masterservice/master_service.go +++ b/internal/masterservice/master_service.go @@ -3,7 +3,6 @@ package masterservice import ( "context" "fmt" - "log" "math/rand" "sync" "sync/atomic" @@ -11,6 +10,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/errors" etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd" + "github.com/zilliztech/milvus-distributed/internal/log" ms "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" @@ -23,6 +23,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" ) // internalpb2 -> internalpb @@ -262,9 +263,9 @@ func (c *Core) checkInit() error { return errors.Errorf("ReleaseCollection is nil") } - log.Printf("master node id = %d", Params.NodeID) - log.Printf("master dd channel name = %s", Params.DdChannel) - log.Printf("master time ticke channel name = %s", Params.TimeTickChannel) + log.Info("master", zap.Int64("node id", int64(Params.NodeID))) + log.Info("master", zap.String("dd channel name", Params.DdChannel)) + log.Info("master", zap.String("time tick channel name", Params.TimeTickChannel)) return nil } @@ -272,11 +273,11 @@ func (c *Core) startDdScheduler() { for { select { case <-c.ctx.Done(): - log.Printf("close dd scheduler, exit task execution loop") + log.Info("close dd scheduler, exit task execution loop") return case task, ok := <-c.ddReqQueue: if !ok { - log.Printf("dd chan is closed, exit task execution loopo") + log.Info("dd chan is closed, exit task execution loop") return } ts, err := task.Ts() @@ -301,18 +302,18 @@ func (c *Core) startTimeTickLoop() { for { select { case <-c.ctx.Done(): - log.Printf("close master time tick loop") + log.Info("close master time tick loop") return case tt, ok := <-c.ProxyTimeTickChan: if !ok { - log.Printf("proxyTimeTickStream is closed, exit time tick loop") + log.Info("proxyTimeTickStream is closed, exit time tick loop") return } if tt <= c.lastTimeTick { - log.Printf("master time tick go back, last time tick = %d, input time tick = %d", c.lastTimeTick, tt) + log.Warn("master time tick go back", zap.Uint64("last time tick", c.lastTimeTick), zap.Uint64("input time tick ", tt)) } if err := c.SendTimeTick(tt); err != nil { - log.Printf("master send time tick into dd and time_tick channel failed: %s", err.Error()) + log.Warn("master send time tick into dd and time_tick channel failed", zap.String("error", err.Error())) } c.lastTimeTick = tt } @@ -324,20 +325,20 @@ func (c *Core) startDataServiceSegmentLoop() { for { select { case <-c.ctx.Done(): - log.Printf("close data service segment loop") + log.Info("close data service segment loop") return case seg, ok := <-c.DataServiceSegmentChan: if !ok { - log.Printf("data service segment is closed, exit loop") + log.Info("data service segment is closed, exit loop") return } if seg == nil { - log.Printf("segment from data service is nill") + log.Warn("segment from data service is nil") } else if err := c.MetaTable.AddSegment(seg); err != nil { //what if master add segment failed, but data service success? - log.Printf("add segment info meta table failed ") + log.Warn("add segment info meta table failed ", zap.String("error", err.Error())) } else { - log.Printf("add segment, collection id = %d, partition id = %d, segment id = %d", seg.CollectionID, seg.PartitionID, seg.SegmentID) + log.Debug("add segment", zap.Int64("collection id", seg.CollectionID), zap.Int64("partition id", seg.PartitionID), zap.Int64("segment id", seg.SegmentID)) } } } @@ -348,17 +349,17 @@ func (c *Core) startCreateIndexLoop() { for { select { case <-c.ctx.Done(): - log.Printf("close create index loop") + log.Info("close create index loop") return case t, ok := <-c.indexTaskQueue: if !ok { - log.Printf("index task chan has closed, exit loop") + log.Info("index task chan has closed, exit loop") return } if err := t.BuildIndex(); err != nil { - log.Printf("create index failed, error = %s", err.Error()) + log.Warn("create index failed", zap.String("error", err.Error())) } else { - log.Printf("create index,index name = %s, field name = %s, segment id = %d", t.indexName, t.fieldSchema.Name, t.segmentID) + log.Debug("create index", zap.String("index name", t.indexName), zap.String("field name", t.fieldSchema.Name), zap.Int64("segment id", t.segmentID)) } } } @@ -368,21 +369,21 @@ func (c *Core) startSegmentFlushCompletedLoop() { for { select { case <-c.ctx.Done(): - log.Printf("close segment flush completed loop") + log.Info("close segment flush completed loop") return case seg, ok := <-c.DataNodeSegmentFlushCompletedChan: if !ok { - log.Printf("data node segment flush completed chan has colsed, exit loop") + log.Info("data node segment flush completed chan has colsed, exit loop") } coll, err := c.MetaTable.GetCollectionBySegmentID(seg) if err != nil { - log.Printf("GetCollectionBySegmentID, error = %s ", err.Error()) + log.Warn("GetCollectionBySegmentID", zap.String("error", err.Error())) break } for _, f := range coll.FieldIndexes { idxInfo, err := c.MetaTable.GetIndexByID(f.IndexID) if err != nil { - log.Printf("index id = %d not found", f.IndexID) + log.Warn("index not found", zap.Int64("index id", f.IndexID)) continue } @@ -412,16 +413,16 @@ func (c *Core) tsLoop() { select { case <-tsoTicker.C: if err := c.tsoAllocator.UpdateTSO(); err != nil { - log.Println("failed to update timestamp", err) + log.Warn("failed to update timestamp", zap.String("error", err.Error())) return } if err := c.idAllocator.UpdateID(); err != nil { - log.Println("failed to update id", err) + log.Warn("failed to update id", zap.String("error", err.Error())) return } case <-ctx.Done(): // Server is closed and it should return nil. - log.Println("tsLoop is closed") + log.Info("tsLoop is closed") return } } @@ -577,7 +578,7 @@ func (c *Core) setMsgStreams() error { return case ttmsgs, ok := <-proxyTimeTickStream.Chan(): if !ok { - log.Printf("proxy time tick msg stream closed") + log.Warn("proxy time tick msg stream closed") return } if len(ttmsgs.Msgs) > 0 { @@ -611,7 +612,7 @@ func (c *Core) setMsgStreams() error { return case segMsg, ok := <-dataServiceStream.Chan(): if !ok { - log.Printf("data service segment msg closed") + log.Warn("data service segment msg closed") } if len(segMsg.Msgs) > 0 { for _, segm := range segMsg.Msgs { @@ -623,7 +624,7 @@ func (c *Core) setMsgStreams() error { if ok { c.DataNodeSegmentFlushCompletedChan <- flushMsg.SegmentFlushCompletedMsg.SegmentID } else { - log.Printf("receive unexpected msg from data service stream, value = %v", segm) + log.Debug("receive unexpected msg from data service stream", zap.Stringer("segment", segInfoMsg.SegmentMsg.Segment)) } } } @@ -641,7 +642,7 @@ func (c *Core) SetProxyService(s ProxyServiceInterface) error { return err } Params.ProxyTimeTickChannel = rsp.Value - log.Printf("proxy time tick channel name = %s", Params.ProxyTimeTickChannel) + log.Info("proxy time tick", zap.String("channel name", Params.ProxyTimeTickChannel)) c.InvalidateCollectionMetaCache = func(ts typeutil.Timestamp, dbName string, collectionName string) error { status, _ := s.InvalidateCollectionMetaCache(&proxypb.InvalidateCollMetaCacheRequest{ @@ -671,7 +672,7 @@ func (c *Core) SetDataService(s DataServiceInterface) error { return err } Params.DataServiceSegmentChannel = rsp.Value - log.Printf("data service segment channel name = %s", Params.DataServiceSegmentChannel) + log.Info("data service segment", zap.String("channel name", Params.DataServiceSegmentChannel)) c.GetBinlogFilePathsFromDataServiceReq = func(segID typeutil.UniqueID, fieldID typeutil.UniqueID) ([]string, error) { ts, err := c.tsoAllocator.Alloc(1) @@ -787,7 +788,7 @@ func (c *Core) Init() error { initError = c.setMsgStreams() }) if initError == nil { - log.Printf("Master service State Code = %s", internalpb2.StateCode_name[int32(internalpb2.StateCode_INITIALIZING)]) + log.Info("Master service", zap.String("State Code", internalpb2.StateCode_name[int32(internalpb2.StateCode_INITIALIZING)])) } return initError } @@ -805,7 +806,7 @@ func (c *Core) Start() error { go c.tsLoop() c.stateCode.Store(internalpb2.StateCode_HEALTHY) }) - log.Printf("Master service State Code = %s", internalpb2.StateCode_name[int32(internalpb2.StateCode_HEALTHY)]) + log.Info("Master service", zap.String("State Code", internalpb2.StateCode_name[int32(internalpb2.StateCode_HEALTHY)])) return nil } @@ -817,7 +818,7 @@ func (c *Core) Stop() error { func (c *Core) GetComponentStates() (*internalpb2.ComponentStates, error) { code := c.stateCode.Load().(internalpb2.StateCode) - log.Printf("GetComponentStates : %s", internalpb2.StateCode_name[int32(code)]) + log.Info("GetComponentStates", zap.String("State Code", internalpb2.StateCode_name[int32(code)])) return &internalpb2.ComponentStates{ State: &internalpb2.ComponentInfo{ @@ -861,7 +862,7 @@ func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), }, nil } - log.Printf("CreateCollection : %s", in.CollectionName) + log.Debug("CreateCollection ", zap.String("name", in.CollectionName)) t := &CreateCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -872,11 +873,13 @@ func (c *Core) CreateCollection(in *milvuspb.CreateCollectionRequest) (*commonpb c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Debug("CreateCollection failed", zap.String("name", in.CollectionName)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: "Create collection failed: " + err.Error(), }, nil } + log.Debug("CreateCollection Success", zap.String("name", in.CollectionName)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -891,7 +894,7 @@ func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Sta Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), }, nil } - log.Printf("DropCollection : %s", in.CollectionName) + log.Debug("DropCollection", zap.String("name", in.CollectionName)) t := &DropCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -902,11 +905,13 @@ func (c *Core) DropCollection(in *milvuspb.DropCollectionRequest) (*commonpb.Sta c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Debug("DropCollection Failed", zap.String("name", in.CollectionName)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: "Drop collection failed: " + err.Error(), }, nil } + log.Debug("DropCollection Success", zap.String("name", in.CollectionName)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -924,7 +929,7 @@ func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolR Value: false, }, nil } - log.Printf("HasCollection : %s", in.CollectionName) + log.Debug("HasCollection", zap.String("name", in.CollectionName)) t := &HasCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -936,6 +941,7 @@ func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolR c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Debug("HasCollection Failed", zap.String("name", in.CollectionName)) return &milvuspb.BoolResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -944,6 +950,7 @@ func (c *Core) HasCollection(in *milvuspb.HasCollectionRequest) (*milvuspb.BoolR Value: false, }, nil } + log.Debug("HasCollection Success", zap.String("name", in.CollectionName)) return &milvuspb.BoolResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -965,7 +972,7 @@ func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milv CollectionID: 0, }, nil } - log.Printf("DescribeCollection : %s", in.CollectionName) + log.Debug("DescribeCollection", zap.String("name", in.CollectionName)) t := &DescribeCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -977,6 +984,7 @@ func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milv c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Debug("DescribeCollection Failed", zap.String("name", in.CollectionName)) return &milvuspb.DescribeCollectionResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -985,6 +993,7 @@ func (c *Core) DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milv Schema: nil, }, nil } + log.Debug("DescribeCollection Success", zap.String("name", in.CollectionName)) t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -1003,7 +1012,7 @@ func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.Sh CollectionNames: nil, }, nil } - log.Printf("ShowCollections : %s", in.DbName) + log.Debug("ShowCollections", zap.String("dbname", in.DbName)) t := &ShowCollectionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1017,6 +1026,7 @@ func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.Sh c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Debug("ShowCollections failed", zap.String("dbname", in.DbName)) return &milvuspb.ShowCollectionResponse{ CollectionNames: nil, Status: &commonpb.Status{ @@ -1025,6 +1035,7 @@ func (c *Core) ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.Sh }, }, nil } + log.Debug("ShowCollections Success", zap.String("dbname", in.DbName), zap.Strings("collection names", t.Rsp.CollectionNames)) t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -1040,7 +1051,7 @@ func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.S Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), }, nil } - log.Printf("CreatePartition : %s - %s", in.CollectionName, in.PartitionName) + log.Debug("CreatePartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) t := &CreatePartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1051,11 +1062,13 @@ func (c *Core) CreatePartition(in *milvuspb.CreatePartitionRequest) (*commonpb.S c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Debug("CreatePartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: "create partition failed: " + err.Error(), }, nil } + log.Debug("CreatePartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -1070,7 +1083,7 @@ func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Statu Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), }, nil } - log.Printf("DropPartition : %s - %s", in.CollectionName, in.PartitionName) + log.Debug("DropPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) t := &DropPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1081,11 +1094,13 @@ func (c *Core) DropPartition(in *milvuspb.DropPartitionRequest) (*commonpb.Statu c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Debug("DropPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: "DropPartition failed: " + err.Error(), }, nil } + log.Debug("DropPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -1103,7 +1118,7 @@ func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolRes Value: false, }, nil } - log.Printf("HasPartition : %s - %s", in.CollectionName, in.PartitionName) + log.Debug("HasPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) t := &HasPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1115,6 +1130,7 @@ func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolRes c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Debug("HasPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) return &milvuspb.BoolResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -1123,6 +1139,7 @@ func (c *Core) HasPartition(in *milvuspb.HasPartitionRequest) (*milvuspb.BoolRes Value: false, }, nil } + log.Debug("HasPartition Success", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName)) return &milvuspb.BoolResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -1144,7 +1161,7 @@ func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.Show PartitionIDs: nil, }, nil } - log.Printf("ShowPartitions : %s", in.CollectionName) + log.Debug("ShowPartitions", zap.String("collection name", in.CollectionName)) t := &ShowPartitionReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1167,6 +1184,7 @@ func (c *Core) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.Show }, }, nil } + log.Debug("ShowPartitions Success", zap.String("collection name", in.CollectionName), zap.Strings("partition names", t.Rsp.PartitionNames), zap.Int64s("partition ids", t.Rsp.PartitionIDs)) t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -1182,7 +1200,7 @@ func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, e Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), }, nil } - log.Printf("CreateIndex : %s - %s ", in.CollectionName, in.FieldName) + log.Debug("CreateIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName)) t := &CreateIndexReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1193,11 +1211,13 @@ func (c *Core) CreateIndex(in *milvuspb.CreateIndexRequest) (*commonpb.Status, e c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Debug("CreateIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: "CreateIndex failed, error = " + err.Error(), }, nil } + log.Debug("CreateIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -1215,7 +1235,7 @@ func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.Descr IndexDescriptions: nil, }, nil } - log.Printf("DescribeIndex : %s - %s", in.CollectionName, in.FieldName) + log.Debug("DescribeIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName)) t := &DescribeIndexReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1238,6 +1258,11 @@ func (c *Core) DescribeIndex(in *milvuspb.DescribeIndexRequest) (*milvuspb.Descr IndexDescriptions: nil, }, nil } + idxNames := make([]string, 0, len(t.Rsp.IndexDescriptions)) + for _, i := range t.Rsp.IndexDescriptions { + idxNames = append(idxNames, i.IndexName) + } + log.Debug("DescribeIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Strings("index names", idxNames)) t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -1253,7 +1278,7 @@ func (c *Core) DropIndex(in *milvuspb.DropIndexRequest) (*commonpb.Status, error Reason: fmt.Sprintf("state code = %s", internalpb2.StateCode_name[int32(code)]), }, nil } - log.Printf("DropIndex : collection : %s, filed : %s , index : %s", in.CollectionName, in.FieldName, in.IndexName) + log.Debug("DropIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName)) t := &DropIndexReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1264,11 +1289,13 @@ func (c *Core) DropIndex(in *milvuspb.DropIndexRequest) (*commonpb.Status, error c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Debug("DropIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, Reason: "DropIndex failed, error = %s" + err.Error(), }, nil } + log.Debug("DropIndex Success", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -1286,7 +1313,7 @@ func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.D IndexID: 0, }, nil } - log.Printf("DescribeSegment : %d - %d", in.CollectionID, in.SegmentID) + log.Debug("DescribeSegment", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID)) t := &DescribeSegmentReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1301,6 +1328,7 @@ func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.D c.ddReqQueue <- t err := t.WaitToFinish() if err != nil { + log.Debug("DescribeSegment Failed", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID)) return &milvuspb.DescribeSegmentResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -1309,6 +1337,7 @@ func (c *Core) DescribeSegment(in *milvuspb.DescribeSegmentRequest) (*milvuspb.D IndexID: 0, }, nil } + log.Debug("DescribeSegment Success", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID)) t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -1327,7 +1356,7 @@ func (c *Core) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegm SegmentIDs: nil, }, nil } - log.Printf("ShowSegments : %d - %d", in.CollectionID, in.PartitionID) + log.Debug("ShowSegments", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID)) t := &ShowSegmentReqTask{ baseReqTask: baseReqTask{ cv: make(chan error), @@ -1350,6 +1379,7 @@ func (c *Core) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegm SegmentIDs: nil, }, nil } + log.Debug("ShowSegments Success", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64s("segments ids", t.Rsp.SegmentIDs)) t.Rsp.Status = &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, Reason: "", @@ -1392,7 +1422,7 @@ func (c *Core) AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) { Count: in.Count, }, nil } - log.Printf("AllocID : %d", start) + log.Debug("AllocID", zap.Int64("id start", start), zap.Uint32("count", in.Count)) return &masterpb.IDResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, diff --git a/internal/masterservice/meta_table.go b/internal/masterservice/meta_table.go index bece3621b45ac9596e9bd35cc7c62bb724720ab6..890736b03a665da0e8204efbe2cc13e99c726091 100644 --- a/internal/masterservice/meta_table.go +++ b/internal/masterservice/meta_table.go @@ -1,7 +1,6 @@ package masterservice import ( - "log" "path" "strconv" "sync" @@ -9,11 +8,13 @@ import ( "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" pb "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "go.uber.org/zap" ) const ( @@ -127,7 +128,7 @@ func (mt *metaTable) reloadFromKV() error { } collID, ok := mt.partitionID2CollID[partitionInfo.PartitionID] if !ok { - log.Printf("partition id %d not belong to any collection", partitionInfo.PartitionID) + log.Warn("partition does not belong to any collection", zap.Int64("partition id", partitionInfo.PartitionID)) continue } mt.partitionID2Meta[partitionInfo.PartitionID] = partitionInfo @@ -235,14 +236,14 @@ func (mt *metaTable) DeleteCollection(collID typeutil.UniqueID) error { metaKeys = append(metaKeys, path.Join(PartitionMetaPrefix, strconv.FormatInt(partID, 10))) partMeta, ok := mt.partitionID2Meta[partID] if !ok { - log.Printf("partition id = %d not exist", partID) + log.Warn("partition id not exist", zap.Int64("partition id", partID)) continue } delete(mt.partitionID2Meta, partID) for _, segID := range partMeta.SegmentIDs { segIndexMeta, ok := mt.segID2IndexMeta[segID] if !ok { - log.Printf("segment id = %d not exist", segID) + log.Warn("segment id not exist", zap.Int64("segment id", segID)) continue } delete(mt.segID2IndexMeta, segID) @@ -346,7 +347,7 @@ func (mt *metaTable) AddPartition(collID typeutil.UniqueID, partitionName string for _, t := range coll.PartitionIDs { part, ok := mt.partitionID2Meta[t] if !ok { - log.Printf("partition id = %d not exist", t) + log.Warn("partition id not exist", zap.Int64("partition id", t)) continue } if part.PartitionName == partitionName { @@ -441,7 +442,7 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str for _, segID := range partMeta.SegmentIDs { segIndexMeta, ok := mt.segID2IndexMeta[segID] if !ok { - log.Printf("segment id = %d has no index meta", segID) + log.Warn("segment has no index meta", zap.Int64("segment id", segID)) continue } delete(mt.segID2IndexMeta, segID) @@ -449,7 +450,7 @@ func (mt *metaTable) DeletePartition(collID typeutil.UniqueID, partitionName str delMetaKeys = append(delMetaKeys, path.Join(SegmentIndexMetaPrefix, strconv.FormatInt(segID, 10), strconv.FormatInt(indexID, 10))) indexMeta, ok := mt.indexID2Meta[segIdxMeta.IndexID] if !ok { - log.Printf("index id = %d not exist", segIdxMeta.IndexID) + log.Warn("index id not exist", zap.Int64("index id", segIdxMeta.IndexID)) continue } delete(mt.indexID2Meta, segIdxMeta.IndexID) @@ -589,7 +590,7 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil. idxMeta, ok := mt.indexID2Meta[info.IndexID] if !ok { fieldIdxInfo = append(fieldIdxInfo, info) - log.Printf("index id = %d not has meta", info.IndexID) + log.Warn("index id not has meta", zap.Int64("index id", info.IndexID)) continue } if idxMeta.IndexName != indexName { @@ -601,7 +602,7 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil. break } if len(fieldIdxInfo) == len(collMeta.FieldIndexes) { - log.Printf("collection = %s, field = %s, index = %s not found", collName, fieldName, indexName) + log.Warn("drop index,index not found", zap.String("collection name", collName), zap.String("filed name", fieldName), zap.String("index name", indexName)) return 0, false, nil } collMeta.FieldIndexes = fieldIdxInfo @@ -614,7 +615,7 @@ func (mt *metaTable) DropIndex(collName, fieldName, indexName string) (typeutil. for _, partID := range collMeta.PartitionIDs { partMeta, ok := mt.partitionID2Meta[partID] if !ok { - log.Printf("partition id = %d not exist", partID) + log.Warn("partition not exist", zap.Int64("partition id", partID)) continue } for _, segID := range partMeta.SegmentIDs { diff --git a/internal/masterservice/param_table.go b/internal/masterservice/param_table.go index 2ac7b57f86f1c5e72f5fe3594b60b19c038e823d..a3d645f8663825f7a2253cb8dd34f6506ac734eb 100644 --- a/internal/masterservice/param_table.go +++ b/internal/masterservice/param_table.go @@ -1,8 +1,12 @@ package masterservice import ( + "fmt" + "path" + "strconv" "sync" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/util/paramtable" ) @@ -30,6 +34,8 @@ type ParamTable struct { DefaultIndexName string Timeout int + + Log log.Config } func (p *ParamTable) Init() { @@ -58,6 +64,8 @@ func (p *ParamTable) Init() { p.initDefaultIndexName() p.initTimeout() + + p.initLogCfg() }) } @@ -160,3 +168,34 @@ func (p *ParamTable) initDefaultIndexName() { func (p *ParamTable) initTimeout() { p.Timeout = p.ParseInt("master.timeout") } + +func (p *ParamTable) initLogCfg() { + p.Log = log.Config{} + format, err := p.Load("log.format") + if err != nil { + panic(err) + } + p.Log.Format = format + level, err := p.Load("log.level") + if err != nil { + panic(err) + } + p.Log.Level = level + devStr, err := p.Load("log.dev") + if err != nil { + panic(err) + } + dev, err := strconv.ParseBool(devStr) + if err != nil { + panic(err) + } + p.Log.Development = dev + p.Log.File.MaxSize = p.ParseInt("log.file.maxSize") + p.Log.File.MaxBackups = p.ParseInt("log.file.maxBackups") + p.Log.File.MaxDays = p.ParseInt("log.file.maxAge") + rootPath, err := p.Load("log.file.rootPath") + if err != nil { + panic(err) + } + p.Log.File.Filename = path.Join(rootPath, fmt.Sprintf("masterservice-%d.log", p.NodeID)) +} diff --git a/internal/masterservice/task.go b/internal/masterservice/task.go index 475b35cbc1726f681bfdb40ca4dd54bb55553550..372226ec663d73b2a6919e433538ed16e1da70cb 100644 --- a/internal/masterservice/task.go +++ b/internal/masterservice/task.go @@ -2,16 +2,17 @@ package masterservice import ( "fmt" - "log" "github.com/golang/protobuf/proto" "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "go.uber.org/zap" ) type reqTask interface { @@ -233,7 +234,7 @@ func (t *DropCollectionReqTask) Execute() error { //notify query service to release collection go func() { if err = t.core.ReleaseCollection(t.Req.Base.Timestamp, 0, collMeta.ID); err != nil { - log.Printf("%s", err.Error()) + log.Warn("ReleaseCollection failed", zap.String("error", err.Error())) } }() diff --git a/internal/masterservice/tso.go b/internal/masterservice/tso.go index 1caaf6d5a580288a39b846e6ce930281461f9d3f..e164a48386e29d217975e1f0e613afc8fdf3f45a 100644 --- a/internal/masterservice/tso.go +++ b/internal/masterservice/tso.go @@ -1,17 +1,16 @@ package masterservice import ( - "log" "sync/atomic" "time" "unsafe" - "go.uber.org/zap" - "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/kv" + "github.com/zilliztech/milvus-distributed/internal/log" "github.com/zilliztech/milvus-distributed/internal/util/tsoutil" "github.com/zilliztech/milvus-distributed/internal/util/typeutil" + "go.uber.org/zap" ) const ( @@ -143,7 +142,7 @@ func (t *timestampOracle) UpdateTimestamp() error { jetLag := typeutil.SubTimeByWallClock(now, prev.physical) if jetLag > 3*UpdateTimestampStep { - log.Print("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now)) + log.Debug("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now)) } var next time.Time @@ -154,7 +153,7 @@ func (t *timestampOracle) UpdateTimestamp() error { } else if prevLogical > maxLogical/2 { // The reason choosing maxLogical/2 here is that it's big enough for common cases. // Because there is enough timestamp can be allocated before next update. - log.Print("the logical time may be not enough", zap.Int64("prev-logical", prevLogical)) + log.Debug("the logical time may be not enough", zap.Int64("prev-logical", prevLogical)) next = prev.physical.Add(time.Millisecond) } else { // It will still use the previous physical time to alloc the timestamp. diff --git a/tests/python/requirements.txt b/tests/python/requirements.txt index ff2c6192aa725c5745420c09eeb05b3bd48f7f90..c42daceb418d91d434ef3aac837d5cf9b6e55964 100644 --- a/tests/python/requirements.txt +++ b/tests/python/requirements.txt @@ -2,7 +2,7 @@ grpcio==1.26.0 grpcio-tools==1.26.0 numpy==1.18.1 pytest-cov==2.8.1 -pymilvus-distributed==0.0.28 +pymilvus-distributed==0.0.29 sklearn==0.0 pytest==4.5.0 pytest-timeout==1.3.3 diff --git a/tests/python/test_index.py b/tests/python/test_index.py index 115d9812366f3b5e989d748f3566b5e5cc84d150..981bad32505f4a7cf2918e09da246742b5a184a9 100644 --- a/tests/python/test_index.py +++ b/tests/python/test_index.py @@ -378,7 +378,7 @@ class TestIndexBase: ****************************************************************** """ - @pytest.mark.skip("drop_index") + @pytest.mark.skip("get_collection_stats") def test_drop_index(self, connect, collection, get_simple_index): ''' target: test drop index interface @@ -392,7 +392,8 @@ class TestIndexBase: # assert stats["partitions"][0]["segments"][0]["index_name"] == default_index_type assert not stats["partitions"][0]["segments"] - @pytest.mark.skip("drop_index") + @pytest.mark.skip("get_collection_stats") + @pytest.mark.skip("drop_index raise exception") @pytest.mark.level(2) def test_drop_index_repeatly(self, connect, collection, get_simple_index): ''' @@ -409,7 +410,6 @@ class TestIndexBase: # assert stats["partitions"][0]["segments"][0]["index_name"] == default_index_type assert not stats["partitions"][0]["segments"] - @pytest.mark.skip("drop_index") @pytest.mark.level(2) def test_drop_index_without_connect(self, dis_connect, collection): ''' @@ -420,7 +420,6 @@ class TestIndexBase: with pytest.raises(Exception) as e: dis_connect.drop_index(collection, field_name) - @pytest.mark.skip("drop_index") def test_drop_index_collection_not_existed(self, connect): ''' target: test drop index interface when collection name not existed @@ -432,7 +431,6 @@ class TestIndexBase: with pytest.raises(Exception) as e: connect.drop_index(collection_name, field_name) - @pytest.mark.skip("drop_index") def test_drop_index_collection_not_create(self, connect, collection): ''' target: test drop index interface when index not created @@ -455,7 +453,7 @@ class TestIndexBase: connect.create_index(collection, field_name, get_simple_index) connect.drop_index(collection, field_name) - @pytest.mark.skip("drop_index") + @pytest.mark.skip("get_collection_stats") def test_drop_index_ip(self, connect, collection, get_simple_index): ''' target: test drop index interface @@ -470,7 +468,7 @@ class TestIndexBase: # assert stats["partitions"][0]["segments"][0]["index_name"] == default_index_type assert not stats["partitions"][0]["segments"] - @pytest.mark.skip("drop_index") + @pytest.mark.skip("get_collection_stats") @pytest.mark.level(2) def test_drop_index_repeatly_ip(self, connect, collection, get_simple_index): ''' @@ -488,7 +486,6 @@ class TestIndexBase: # assert stats["partitions"][0]["segments"][0]["index_name"] == default_index_type assert not stats["partitions"][0]["segments"] - @pytest.mark.skip("drop_index") @pytest.mark.level(2) def test_drop_index_without_connect_ip(self, dis_connect, collection): ''' @@ -499,7 +496,6 @@ class TestIndexBase: with pytest.raises(Exception) as e: dis_connect.drop_index(collection, field_name) - @pytest.mark.skip("drop_index") def test_drop_index_collection_not_create_ip(self, connect, collection): ''' target: test drop index interface when index not created @@ -511,6 +507,7 @@ class TestIndexBase: connect.drop_index(collection, field_name) @pytest.mark.skip("drop_index") + @pytest.mark.skip("can't create and drop") @pytest.mark.level(2) def test_create_drop_index_repeatly_ip(self, connect, collection, get_simple_index): ''' @@ -683,7 +680,7 @@ class TestIndexBinary: ****************************************************************** """ - @pytest.mark.skip("get_collection_stats and drop_index do not impl") + @pytest.mark.skip("get_collection_stats") def test_drop_index(self, connect, binary_collection, get_jaccard_index): ''' target: test drop index interface diff --git a/tests/python/test_load_collection.py b/tests/python/test_load_collection.py new file mode 100644 index 0000000000000000000000000000000000000000..2d5e63859594766261a44968d34951868ea4f506 --- /dev/null +++ b/tests/python/test_load_collection.py @@ -0,0 +1,21 @@ +from tests.utils import * +from tests.constants import * + +uniq_id = "load_collection" + +class TestLoadCollection: + """ + ****************************************************************** + The following cases are used to test `load_collection` function + ****************************************************************** + """ + def test_load_collection(self, connect, collection): + ''' + target: test load collection and wait for loading collection + method: insert then flush, when flushed, try load collection + expected: no errors + ''' + ids = connect.insert(collection, default_entities) + ids = connect.insert(collection, default_entity) + connect.flush([collection]) + connect.load_collection(collection) diff --git a/tests/python/test_load_partitions.py b/tests/python/test_load_partitions.py new file mode 100644 index 0000000000000000000000000000000000000000..128b04139cac80bf39f3bcb1dcd11cb17115f43b --- /dev/null +++ b/tests/python/test_load_partitions.py @@ -0,0 +1,26 @@ +from tests.utils import * +from tests.constants import * + +uniq_id = "load_partitions" + +class TestLoadPartitions: + """ + ****************************************************************** + The following cases are used to test `load_partitions` function + ****************************************************************** + """ + def test_load_partitions(self, connect, collection): + ''' + target: test load collection and wait for loading collection + method: insert then flush, when flushed, try load collection + expected: no errors + ''' + partition_tag = "lvn9pq34u8rasjk" + connect.create_partition(collection, partition_tag + "1") + ids = connect.insert(collection, default_entities, partition_tag=partition_tag + "1") + + connect.create_partition(collection, partition_tag + "2") + ids = connect.insert(collection, default_entity, partition_tag=partition_tag + "2") + + connect.flush([collection]) + connect.load_partitions(collection, [partition_tag + "2"])