未验证 提交 5ce06de3 编写于 作者: C congqixia 提交者: GitHub

Add context usage and fix defer issue (#5796)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 f28403b9
......@@ -71,7 +71,6 @@ func (mr *MilvusRoles) Run(localMsg bool) {
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if mr.EnableMaster {
var ms *components.MasterService
......@@ -328,4 +327,7 @@ func (mr *MilvusRoles) Run(localMsg bool) {
syscall.SIGQUIT)
sig := <-sc
fmt.Printf("Get %s signal to exit\n", sig.String())
// some deferred Stop has race with context cancel
cancel()
}
......@@ -71,7 +71,7 @@ func NewIDAllocator(ctx context.Context, metaRoot string, etcdEndpoints []string
func (ia *IDAllocator) Start() error {
var err error
ia.masterClient, err = msc.NewClient(ia.metaRoot, ia.etcdEndpoints, 3*time.Second)
ia.masterClient, err = msc.NewClient(ia.Ctx, ia.metaRoot, ia.etcdEndpoints, 3*time.Second)
if err != nil {
panic(err)
}
......
......@@ -84,7 +84,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, erro
return datanodeclient.NewClient(addr, 3*time.Second)
}
s.masterClientCreator = func(addr string) (types.MasterService, error) {
return masterclient.NewClient(Params.MetaRootPath, Params.EtcdEndpoints, masterClientTimout)
return masterclient.NewClient(ctx, Params.MetaRootPath, Params.EtcdEndpoints, masterClientTimout)
}
return s, nil
......
......@@ -70,7 +70,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
msFactory: factory,
grpcErrChan: make(chan error),
newMasterServiceClient: func() (types.MasterService, error) {
return msc.NewClient(dn.Params.MetaRootPath, dn.Params.EtcdEndpoints, 3*time.Second)
return msc.NewClient(ctx1, dn.Params.MetaRootPath, dn.Params.EtcdEndpoints, 3*time.Second)
},
newDataServiceClient: func(etcdMetaRoot string, etcdEndpoints []string, timeout time.Duration) types.DataService {
return dsc.NewClient(etcdMetaRoot, etcdEndpoints, timeout)
......
......@@ -13,6 +13,7 @@ package grpcmasterserviceclient
import (
"context"
"errors"
"fmt"
"time"
......@@ -61,8 +62,13 @@ func getMasterServiceAddr(sess *sessionutil.Session) (string, error) {
return ms.Address, nil
}
func NewClient(metaRoot string, etcdEndpoints []string, timeout time.Duration) (*GrpcClient, error) {
sess := sessionutil.NewSession(context.Background(), metaRoot, etcdEndpoints)
// NewClient create master service client with specified ectd info and timeout
// ctx execution control context
// metaRoot is the path in etcd for master registration
// etcdEndpoints are the address list for etcd end points
// timeout is default setting for each grpc call
func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string, timeout time.Duration) (*GrpcClient, error) {
sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints)
if sess == nil {
err := fmt.Errorf("new session error, maybe can not connect to etcd")
log.Debug("MasterServiceClient NewClient failed", zap.Error(err))
......@@ -72,7 +78,7 @@ func NewClient(metaRoot string, etcdEndpoints []string, timeout time.Duration) (
return &GrpcClient{
grpcClient: nil,
conn: nil,
ctx: context.Background(),
ctx: ctx,
timeout: timeout,
reconnTry: 300,
recallTry: 3,
......@@ -84,7 +90,17 @@ func (c *GrpcClient) connect() error {
tracer := opentracing.GlobalTracer()
var err error
getMasterServiceAddrFn := func() error {
ch := make(chan struct{}, 1)
var err error
go func() {
c.addr, err = getMasterServiceAddr(c.sess)
ch <- struct{}{}
}()
select {
case <-c.ctx.Done():
return retry.NoRetryError(errors.New("context canceled"))
case <-ch:
}
if err != nil {
return err
}
......@@ -99,16 +115,26 @@ func (c *GrpcClient) connect() error {
log.Debug("MasterServiceClient try reconnect ", zap.String("address", c.addr))
ctx, cancelFunc := context.WithTimeout(c.ctx, c.timeout)
defer cancelFunc()
conn, err := grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
var conn *grpc.ClientConn
var err error
ch := make(chan struct{}, 1)
go func() {
conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer)),
grpc.WithStreamInterceptor(
otgrpc.OpenTracingStreamClientInterceptor(tracer)))
if err != nil {
return err
ch <- struct{}{}
}()
select {
case <-c.ctx.Done():
return retry.NoRetryError(errors.New("context canceled"))
case <-ch:
}
if err == nil {
c.conn = conn
return nil
}
return err
}
err = retry.Retry(c.reconnTry, 500*time.Millisecond, connectGrpcFunc)
......@@ -143,18 +169,31 @@ func (c *GrpcClient) Register() error {
}
func (c *GrpcClient) recall(caller func() (interface{}, error)) (interface{}, error) {
ret, err := caller()
ch := make(chan struct{}, 1)
var ret interface{}
var err error
go func() {
ret, err = caller()
if err == nil {
return ret, nil
ch <- struct{}{}
return
}
for i := 0; i < c.recallTry; i++ {
err = c.connect()
if err == nil {
ret, err = caller()
if err == nil {
return ret, nil
ch <- struct{}{}
return
}
}
}
ch <- struct{}{}
}()
select {
case <-c.ctx.Done():
return nil, errors.New("context canceled")
case <-ch:
}
return ret, err
}
......
......@@ -255,7 +255,7 @@ func TestGrpcService(t *testing.T) {
svr.masterService.UpdateStateCode(internalpb.StateCode_Healthy)
cli, err := grpcmasterserviceclient.NewClient(cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second)
cli, err := grpcmasterserviceclient.NewClient(context.Background(), cms.Params.MetaRootPath, cms.Params.EtcdEndpoints, 3*time.Second)
assert.Nil(t, err)
err = cli.Init()
......
......@@ -127,7 +127,6 @@ func (s *Server) Run() error {
}
func (s *Server) init() error {
ctx := context.Background()
var err error
Params.Init()
if !funcutil.CheckPortAvailable(Params.Port) {
......@@ -171,7 +170,7 @@ func (s *Server) init() error {
masterServiceAddr := Params.MasterAddress
log.Debug("ProxyNode", zap.String("master address", masterServiceAddr))
timeout := 3 * time.Second
s.masterServiceClient, err = grpcmasterserviceclient.NewClient(proxynode.Params.MetaRootPath, proxynode.Params.EtcdEndpoints, timeout)
s.masterServiceClient, err = grpcmasterserviceclient.NewClient(s.ctx, proxynode.Params.MetaRootPath, proxynode.Params.EtcdEndpoints, timeout)
if err != nil {
log.Debug("ProxyNode new masterServiceClient failed ", zap.Error(err))
return err
......@@ -181,7 +180,7 @@ func (s *Server) init() error {
log.Debug("ProxyNode new masterServiceClient Init ", zap.Error(err))
return err
}
err = funcutil.WaitForComponentHealthy(ctx, s.masterServiceClient, "MasterService", 1000000, time.Millisecond*200)
err = funcutil.WaitForComponentHealthy(s.ctx, s.masterServiceClient, "MasterService", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("ProxyNode WaitForComponentHealthy master service failed ", zap.Error(err))
......
......@@ -78,7 +78,6 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
}
func (s *Server) init() error {
ctx := context.Background()
Params.Init()
Params.LoadFromEnv()
Params.LoadFromArgs()
......@@ -122,7 +121,7 @@ func (s *Server) init() error {
}
log.Debug("QueryNode start to wait for QueryService ready")
err = funcutil.WaitForComponentInitOrHealthy(ctx, queryService, "QueryService", 1000000, time.Millisecond*200)
err = funcutil.WaitForComponentInitOrHealthy(s.ctx, queryService, "QueryService", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("QueryNode wait for QueryService ready failed", zap.Error(err))
panic(err)
......@@ -138,7 +137,7 @@ func (s *Server) init() error {
addr := Params.MasterAddress
log.Debug("QueryNode start to new MasterServiceClient", zap.Any("QueryServiceAddress", addr))
masterService, err := msc.NewClient(qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second)
masterService, err := msc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints, 3*time.Second)
if err != nil {
log.Debug("QueryNode new MasterServiceClient failed", zap.Error(err))
panic(err)
......@@ -154,7 +153,7 @@ func (s *Server) init() error {
panic(err)
}
log.Debug("QueryNode start to wait for MasterService ready")
err = funcutil.WaitForComponentHealthy(ctx, masterService, "MasterService", 1000000, time.Millisecond*200)
err = funcutil.WaitForComponentHealthy(s.ctx, masterService, "MasterService", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("QueryNode wait for MasterService ready failed", zap.Error(err))
panic(err)
......@@ -180,7 +179,7 @@ func (s *Server) init() error {
}
// wait IndexService healthy
log.Debug("QueryNode start to wait for IndexService ready")
err = funcutil.WaitForComponentHealthy(ctx, indexService, "IndexService", 1000000, time.Millisecond*200)
err = funcutil.WaitForComponentHealthy(s.ctx, indexService, "IndexService", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("QueryNode wait for IndexService ready failed", zap.Error(err))
panic(err)
......@@ -203,7 +202,7 @@ func (s *Server) init() error {
panic(err)
}
log.Debug("QueryNode start to wait for DataService ready")
err = funcutil.WaitForComponentInitOrHealthy(ctx, dataService, "DataService", 1000000, time.Millisecond*200)
err = funcutil.WaitForComponentInitOrHealthy(s.ctx, dataService, "DataService", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("QueryNode wait for DataService ready failed", zap.Error(err))
panic(err)
......
......@@ -88,7 +88,6 @@ func (s *Server) Run() error {
}
func (s *Server) init() error {
ctx := context.Background()
Params.Init()
qs.Params.Init()
qs.Params.Port = Params.Port
......@@ -109,7 +108,7 @@ func (s *Server) init() error {
// --- Master Server Client ---
log.Debug("QueryService try to new MasterService client", zap.Any("MasterServiceAddress", Params.MasterAddress))
masterService, err := msc.NewClient(qs.Params.MetaRootPath, qs.Params.EtcdEndpoints, 3*time.Second)
masterService, err := msc.NewClient(s.loopCtx, qs.Params.MetaRootPath, qs.Params.EtcdEndpoints, 3*time.Second)
if err != nil {
log.Debug("QueryService try to new MasterService client failed", zap.Error(err))
panic(err)
......@@ -126,7 +125,7 @@ func (s *Server) init() error {
}
// wait for master init or healthy
log.Debug("QueryService try to wait for MasterService ready")
err = funcutil.WaitForComponentInitOrHealthy(ctx, masterService, "MasterService", 1000000, time.Millisecond*200)
err = funcutil.WaitForComponentInitOrHealthy(s.loopCtx, masterService, "MasterService", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("QueryService wait for MasterService ready failed", zap.Error(err))
panic(err)
......@@ -150,7 +149,7 @@ func (s *Server) init() error {
panic(err)
}
log.Debug("QueryService try to wait for DataService ready")
err = funcutil.WaitForComponentInitOrHealthy(ctx, dataService, "DataService", 1000000, time.Millisecond*200)
err = funcutil.WaitForComponentInitOrHealthy(s.loopCtx, dataService, "DataService", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("QueryService wait for DataService ready failed", zap.Error(err))
panic(err)
......
......@@ -101,12 +101,10 @@ func (node *ProxyNode) Register() error {
}
func (node *ProxyNode) Init() error {
ctx := context.Background()
// wait for dataservice state changed to Healthy
if node.dataService != nil {
log.Debug("ProxyNode wait for dataService ready")
err := funcutil.WaitForComponentHealthy(ctx, node.dataService, "DataService", 1000000, time.Millisecond*200)
err := funcutil.WaitForComponentHealthy(node.ctx, node.dataService, "DataService", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("ProxyNode wait for dataService ready failed", zap.Error(err))
return err
......@@ -117,7 +115,7 @@ func (node *ProxyNode) Init() error {
// wait for queryService state changed to Healthy
if node.queryService != nil {
log.Debug("ProxyNode wait for queryService ready")
err := funcutil.WaitForComponentHealthy(ctx, node.queryService, "QueryService", 1000000, time.Millisecond*200)
err := funcutil.WaitForComponentHealthy(node.ctx, node.queryService, "QueryService", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("ProxyNode wait for queryService ready failed", zap.Error(err))
return err
......@@ -128,7 +126,7 @@ func (node *ProxyNode) Init() error {
// wait for indexservice state changed to Healthy
if node.indexService != nil {
log.Debug("ProxyNode wait for indexService ready")
err := funcutil.WaitForComponentHealthy(ctx, node.indexService, "IndexService", 1000000, time.Millisecond*200)
err := funcutil.WaitForComponentHealthy(node.ctx, node.indexService, "IndexService", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("ProxyNode wait for indexService ready failed", zap.Error(err))
return err
......@@ -137,7 +135,7 @@ func (node *ProxyNode) Init() error {
}
if node.queryService != nil {
resp, err := node.queryService.CreateQueryChannel(ctx, &querypb.CreateQueryChannelRequest{})
resp, err := node.queryService.CreateQueryChannel(node.ctx, &querypb.CreateQueryChannelRequest{})
if err != nil {
log.Debug("ProxyNode CreateQueryChannel failed", zap.Error(err))
return err
......@@ -187,7 +185,7 @@ func (node *ProxyNode) Init() error {
node.idAllocator = idAllocator
node.idAllocator.PeerID = Params.ProxyID
tsoAllocator, err := NewTimestampAllocator(node.masterService, Params.ProxyID)
tsoAllocator, err := NewTimestampAllocator(node.ctx, node.masterService, Params.ProxyID)
if err != nil {
return err
}
......
......@@ -22,12 +22,14 @@ import (
)
type TimestampAllocator struct {
ctx context.Context
masterService types.MasterService
peerID UniqueID
}
func NewTimestampAllocator(master types.MasterService, peerID UniqueID) (*TimestampAllocator, error) {
func NewTimestampAllocator(ctx context.Context, master types.MasterService, peerID UniqueID) (*TimestampAllocator, error) {
a := &TimestampAllocator{
ctx: ctx,
peerID: peerID,
masterService: master,
}
......@@ -35,7 +37,7 @@ func NewTimestampAllocator(master types.MasterService, peerID UniqueID) (*Timest
}
func (ta *TimestampAllocator) Alloc(count uint32) ([]Timestamp, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(ta.ctx, 5*time.Second)
req := &masterpb.AllocTimestampRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_RequestTSO,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册