From 5ce06de3d196b4a7d0180f274d67cbcf9d97039c Mon Sep 17 00:00:00 2001 From: congqixia Date: Wed, 16 Jun 2021 18:35:59 +0800 Subject: [PATCH] Add context usage and fix defer issue (#5796) Signed-off-by: Congqi Xia --- cmd/distributed/roles/roles.go | 4 +- internal/allocator/id.go | 2 +- internal/dataservice/server.go | 2 +- internal/distributed/datanode/service.go | 2 +- .../masterservice/client/client.go | 81 ++++++++++++++----- .../masterservice/masterservice_test.go | 2 +- internal/distributed/proxynode/service.go | 5 +- internal/distributed/querynode/service.go | 11 ++- internal/distributed/queryservice/service.go | 7 +- internal/proxynode/proxy_node.go | 12 ++- internal/proxynode/timestamp.go | 6 +- 11 files changed, 86 insertions(+), 48 deletions(-) diff --git a/cmd/distributed/roles/roles.go b/cmd/distributed/roles/roles.go index a33eb2793..1a4f2ac8e 100644 --- a/cmd/distributed/roles/roles.go +++ b/cmd/distributed/roles/roles.go @@ -71,7 +71,6 @@ func (mr *MilvusRoles) Run(localMsg bool) { } ctx, cancel := context.WithCancel(context.Background()) - defer cancel() if mr.EnableMaster { var ms *components.MasterService @@ -328,4 +327,7 @@ func (mr *MilvusRoles) Run(localMsg bool) { syscall.SIGQUIT) sig := <-sc fmt.Printf("Get %s signal to exit\n", sig.String()) + + // some deferred Stop has race with context cancel + cancel() } diff --git a/internal/allocator/id.go b/internal/allocator/id.go index 7d88ce1d4..46bbb3983 100644 --- a/internal/allocator/id.go +++ b/internal/allocator/id.go @@ -71,7 +71,7 @@ func NewIDAllocator(ctx context.Context, metaRoot string, etcdEndpoints []string func (ia *IDAllocator) Start() error { var err error - ia.masterClient, err = msc.NewClient(ia.metaRoot, ia.etcdEndpoints, 3*time.Second) + ia.masterClient, err = msc.NewClient(ia.Ctx, ia.metaRoot, ia.etcdEndpoints, 3*time.Second) if err != nil { panic(err) } diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index bbef29945..447512129 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -84,7 +84,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro return datanodeclient.NewClient(addr, 3*time.Second) } s.masterClientCreator = func(addr string) (types.MasterService, error) { - return masterclient.NewClient(Params.MetaRootPath, Params.EtcdEndpoints, masterClientTimout) + return masterclient.NewClient(ctx, Params.MetaRootPath, Params.EtcdEndpoints, masterClientTimout) } return s, nil diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 15ea7e279..4b6138e31 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -70,7 +70,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) msFactory: factory, grpcErrChan: make(chan error), newMasterServiceClient: func() (types.MasterService, error) { - return msc.NewClient(dn.Params.MetaRootPath, dn.Params.EtcdEndpoints, 3*time.Second) + return msc.NewClient(ctx1, dn.Params.MetaRootPath, dn.Params.EtcdEndpoints, 3*time.Second) }, newDataServiceClient: func(etcdMetaRoot string, etcdEndpoints []string, timeout time.Duration) types.DataService { return dsc.NewClient(etcdMetaRoot, etcdEndpoints, timeout) diff --git a/internal/distributed/masterservice/client/client.go b/internal/distributed/masterservice/client/client.go index 5de8df8b9..7a305e6ea 100644 --- a/internal/distributed/masterservice/client/client.go +++ b/internal/distributed/masterservice/client/client.go @@ -13,6 +13,7 @@ package grpcmasterserviceclient import ( "context" + "errors" "fmt" "time" @@ -61,8 +62,13 @@ func getMasterServiceAddr(sess *sessionutil.Session) (string, error) { return ms.Address, nil } -func NewClient(metaRoot string, etcdEndpoints []string, timeout time.Duration) (*GrpcClient, error) { - sess := sessionutil.NewSession(context.Background(), metaRoot, etcdEndpoints) +// NewClient create master service client with specified ectd info and timeout +// ctx execution control context +// metaRoot is the path in etcd for master registration +// etcdEndpoints are the address list for etcd end points +// timeout is default setting for each grpc call +func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, timeout time.Duration) (*GrpcClient, error) { + sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints) if sess == nil { err := fmt.Errorf("new session error, maybe can not connect to etcd") log.Debug("MasterServiceClient NewClient failed", zap.Error(err)) @@ -72,7 +78,7 @@ func NewClient(metaRoot string, etcdEndpoints []string, timeout time.Duration) ( return &GrpcClient{ grpcClient: nil, conn: nil, - ctx: context.Background(), + ctx: ctx, timeout: timeout, reconnTry: 300, recallTry: 3, @@ -84,7 +90,17 @@ func (c *GrpcClient) connect() error { tracer := opentracing.GlobalTracer() var err error getMasterServiceAddrFn := func() error { - c.addr, err = getMasterServiceAddr(c.sess) + ch := make(chan struct{}, 1) + var err error + go func() { + c.addr, err = getMasterServiceAddr(c.sess) + ch <- struct{}{} + }() + select { + case <-c.ctx.Done(): + return retry.NoRetryError(errors.New("context canceled")) + case <-ch: + } if err != nil { return err } @@ -99,16 +115,26 @@ func (c *GrpcClient) connect() error { log.Debug("MasterServiceClient try reconnect ", zap.String("address", c.addr)) ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout) defer cancelFunc() - conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), - grpc.WithUnaryInterceptor( - otgrpc.OpenTracingClientInterceptor(tracer)), - grpc.WithStreamInterceptor( - otgrpc.OpenTracingStreamClientInterceptor(tracer))) - if err != nil { - return err + var conn *grpc.ClientConn + var err error + ch := make(chan struct{}, 1) + go func() { + conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(), + grpc.WithUnaryInterceptor( + otgrpc.OpenTracingClientInterceptor(tracer)), + grpc.WithStreamInterceptor( + otgrpc.OpenTracingStreamClientInterceptor(tracer))) + ch <- struct{}{} + }() + select { + case <-c.ctx.Done(): + return retry.NoRetryError(errors.New("context canceled")) + case <-ch: } - c.conn = conn - return nil + if err == nil { + c.conn = conn + } + return err } err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc) @@ -143,18 +169,31 @@ func (c *GrpcClient) Register() error { } func (c *GrpcClient) recall(caller func() (interface{}, error)) (interface{}, error) { - ret, err := caller() - if err == nil { - return ret, nil - } - for i := 0; i < c.recallTry; i++ { - err = c.connect() + ch := make(chan struct{}, 1) + var ret interface{} + var err error + go func() { + ret, err = caller() if err == nil { - ret, err = caller() + ch <- struct{}{} + return + } + for i := 0; i < c.recallTry; i++ { + err = c.connect() if err == nil { - return ret, nil + ret, err = caller() + if err == nil { + ch <- struct{}{} + return + } } } + ch <- struct{}{} + }() + select { + case <-c.ctx.Done(): + return nil, errors.New("context canceled") + case <-ch: } return ret, err } diff --git a/internal/distributed/masterservice/masterservice_test.go b/internal/distributed/masterservice/masterservice_test.go index 0f98ff5ea..786197cbd 100644 --- a/internal/distributed/masterservice/masterservice_test.go +++ b/internal/distributed/masterservice/masterservice_test.go @@ -255,7 +255,7 @@ func TestGrpcService(t *testing.T) { svr.masterService.UpdateStateCode(internalpb.StateCode_Healthy) - cli, err := grpcmasterserviceclient.NewClient(cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second) + cli, err := grpcmasterserviceclient.NewClient(context.Background(), cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second) assert.Nil(t, err) err = cli.Init() diff --git a/internal/distributed/proxynode/service.go b/internal/distributed/proxynode/service.go index 571a0fa28..93abb7528 100644 --- a/internal/distributed/proxynode/service.go +++ b/internal/distributed/proxynode/service.go @@ -127,7 +127,6 @@ func (s *Server) Run() error { } func (s *Server) init() error { - ctx := context.Background() var err error Params.Init() if !funcutil.CheckPortAvailable(Params.Port) { @@ -171,7 +170,7 @@ func (s *Server) init() error { masterServiceAddr := Params.MasterAddress log.Debug("ProxyNode", zap.String("master address", masterServiceAddr)) timeout := 3 * time.Second - s.masterServiceClient, err = grpcmasterserviceclient.NewClient(proxynode.Params.MetaRootPath, proxynode.Params.EtcdEndpoints, timeout) + s.masterServiceClient, err = grpcmasterserviceclient.NewClient(s.ctx, proxynode.Params.MetaRootPath, proxynode.Params.EtcdEndpoints, timeout) if err != nil { log.Debug("ProxyNode new masterServiceClient failed ", zap.Error(err)) return err @@ -181,7 +180,7 @@ func (s *Server) init() error { log.Debug("ProxyNode new masterServiceClient Init ", zap.Error(err)) return err } - err = funcutil.WaitForComponentHealthy(ctx, s.masterServiceClient, "MasterService", 1000000, time.Millisecond*200) + err = funcutil.WaitForComponentHealthy(s.ctx, s.masterServiceClient, "MasterService", 1000000, time.Millisecond*200) if err != nil { log.Debug("ProxyNode WaitForComponentHealthy master service failed ", zap.Error(err)) diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index bfdc9949b..900a3405c 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -78,7 +78,6 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) } func (s *Server) init() error { - ctx := context.Background() Params.Init() Params.LoadFromEnv() Params.LoadFromArgs() @@ -122,7 +121,7 @@ func (s *Server) init() error { } log.Debug("QueryNode start to wait for QueryService ready") - err = funcutil.WaitForComponentInitOrHealthy(ctx, queryService, "QueryService", 1000000, time.Millisecond*200) + err = funcutil.WaitForComponentInitOrHealthy(s.ctx, queryService, "QueryService", 1000000, time.Millisecond*200) if err != nil { log.Debug("QueryNode wait for QueryService ready failed", zap.Error(err)) panic(err) @@ -138,7 +137,7 @@ func (s *Server) init() error { addr := Params.MasterAddress log.Debug("QueryNode start to new MasterServiceClient", zap.Any("QueryServiceAddress", addr)) - masterService, err := msc.NewClient(qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second) + masterService, err := msc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second) if err != nil { log.Debug("QueryNode new MasterServiceClient failed", zap.Error(err)) panic(err) @@ -154,7 +153,7 @@ func (s *Server) init() error { panic(err) } log.Debug("QueryNode start to wait for MasterService ready") - err = funcutil.WaitForComponentHealthy(ctx, masterService, "MasterService", 1000000, time.Millisecond*200) + err = funcutil.WaitForComponentHealthy(s.ctx, masterService, "MasterService", 1000000, time.Millisecond*200) if err != nil { log.Debug("QueryNode wait for MasterService ready failed", zap.Error(err)) panic(err) @@ -180,7 +179,7 @@ func (s *Server) init() error { } // wait IndexService healthy log.Debug("QueryNode start to wait for IndexService ready") - err = funcutil.WaitForComponentHealthy(ctx, indexService, "IndexService", 1000000, time.Millisecond*200) + err = funcutil.WaitForComponentHealthy(s.ctx, indexService, "IndexService", 1000000, time.Millisecond*200) if err != nil { log.Debug("QueryNode wait for IndexService ready failed", zap.Error(err)) panic(err) @@ -203,7 +202,7 @@ func (s *Server) init() error { panic(err) } log.Debug("QueryNode start to wait for DataService ready") - err = funcutil.WaitForComponentInitOrHealthy(ctx, dataService, "DataService", 1000000, time.Millisecond*200) + err = funcutil.WaitForComponentInitOrHealthy(s.ctx, dataService, "DataService", 1000000, time.Millisecond*200) if err != nil { log.Debug("QueryNode wait for DataService ready failed", zap.Error(err)) panic(err) diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index 985f230a6..355101a24 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -88,7 +88,6 @@ func (s *Server) Run() error { } func (s *Server) init() error { - ctx := context.Background() Params.Init() qs.Params.Init() qs.Params.Port = Params.Port @@ -109,7 +108,7 @@ func (s *Server) init() error { // --- Master Server Client --- log.Debug("QueryService try to new MasterService client", zap.Any("MasterServiceAddress", Params.MasterAddress)) - masterService, err := msc.NewClient(qs.Params.MetaRootPath, qs.Params.EtcdEndpoints, 3*time.Second) + masterService, err := msc.NewClient(s.loopCtx, qs.Params.MetaRootPath, qs.Params.EtcdEndpoints, 3*time.Second) if err != nil { log.Debug("QueryService try to new MasterService client failed", zap.Error(err)) panic(err) @@ -126,7 +125,7 @@ func (s *Server) init() error { } // wait for master init or healthy log.Debug("QueryService try to wait for MasterService ready") - err = funcutil.WaitForComponentInitOrHealthy(ctx, masterService, "MasterService", 1000000, time.Millisecond*200) + err = funcutil.WaitForComponentInitOrHealthy(s.loopCtx, masterService, "MasterService", 1000000, time.Millisecond*200) if err != nil { log.Debug("QueryService wait for MasterService ready failed", zap.Error(err)) panic(err) @@ -150,7 +149,7 @@ func (s *Server) init() error { panic(err) } log.Debug("QueryService try to wait for DataService ready") - err = funcutil.WaitForComponentInitOrHealthy(ctx, dataService, "DataService", 1000000, time.Millisecond*200) + err = funcutil.WaitForComponentInitOrHealthy(s.loopCtx, dataService, "DataService", 1000000, time.Millisecond*200) if err != nil { log.Debug("QueryService wait for DataService ready failed", zap.Error(err)) panic(err) diff --git a/internal/proxynode/proxy_node.go b/internal/proxynode/proxy_node.go index f7b4afd7b..ef8256806 100644 --- a/internal/proxynode/proxy_node.go +++ b/internal/proxynode/proxy_node.go @@ -101,12 +101,10 @@ func (node *ProxyNode) Register() error { } func (node *ProxyNode) Init() error { - ctx := context.Background() - // wait for dataservice state changed to Healthy if node.dataService != nil { log.Debug("ProxyNode wait for dataService ready") - err := funcutil.WaitForComponentHealthy(ctx, node.dataService, "DataService", 1000000, time.Millisecond*200) + err := funcutil.WaitForComponentHealthy(node.ctx, node.dataService, "DataService", 1000000, time.Millisecond*200) if err != nil { log.Debug("ProxyNode wait for dataService ready failed", zap.Error(err)) return err @@ -117,7 +115,7 @@ func (node *ProxyNode) Init() error { // wait for queryService state changed to Healthy if node.queryService != nil { log.Debug("ProxyNode wait for queryService ready") - err := funcutil.WaitForComponentHealthy(ctx, node.queryService, "QueryService", 1000000, time.Millisecond*200) + err := funcutil.WaitForComponentHealthy(node.ctx, node.queryService, "QueryService", 1000000, time.Millisecond*200) if err != nil { log.Debug("ProxyNode wait for queryService ready failed", zap.Error(err)) return err @@ -128,7 +126,7 @@ func (node *ProxyNode) Init() error { // wait for indexservice state changed to Healthy if node.indexService != nil { log.Debug("ProxyNode wait for indexService ready") - err := funcutil.WaitForComponentHealthy(ctx, node.indexService, "IndexService", 1000000, time.Millisecond*200) + err := funcutil.WaitForComponentHealthy(node.ctx, node.indexService, "IndexService", 1000000, time.Millisecond*200) if err != nil { log.Debug("ProxyNode wait for indexService ready failed", zap.Error(err)) return err @@ -137,7 +135,7 @@ func (node *ProxyNode) Init() error { } if node.queryService != nil { - resp, err := node.queryService.CreateQueryChannel(ctx, &querypb.CreateQueryChannelRequest{}) + resp, err := node.queryService.CreateQueryChannel(node.ctx, &querypb.CreateQueryChannelRequest{}) if err != nil { log.Debug("ProxyNode CreateQueryChannel failed", zap.Error(err)) return err @@ -187,7 +185,7 @@ func (node *ProxyNode) Init() error { node.idAllocator = idAllocator node.idAllocator.PeerID = Params.ProxyID - tsoAllocator, err := NewTimestampAllocator(node.masterService, Params.ProxyID) + tsoAllocator, err := NewTimestampAllocator(node.ctx, node.masterService, Params.ProxyID) if err != nil { return err } diff --git a/internal/proxynode/timestamp.go b/internal/proxynode/timestamp.go index 5a1a4540a..134fee0f6 100644 --- a/internal/proxynode/timestamp.go +++ b/internal/proxynode/timestamp.go @@ -22,12 +22,14 @@ import ( ) type TimestampAllocator struct { + ctx context.Context masterService types.MasterService peerID UniqueID } -func NewTimestampAllocator(master types.MasterService, peerID UniqueID) (*TimestampAllocator, error) { +func NewTimestampAllocator(ctx context.Context, master types.MasterService, peerID UniqueID) (*TimestampAllocator, error) { a := &TimestampAllocator{ + ctx: ctx, peerID: peerID, masterService: master, } @@ -35,7 +37,7 @@ func NewTimestampAllocator(master types.MasterService, peerID UniqueID) (*Timest } func (ta *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(ta.ctx, 5*time.Second) req := &masterpb.AllocTimestampRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_RequestTSO, -- GitLab