diff --git a/internal/rootcoord/proxy_client_manager.go b/internal/rootcoord/proxy_client_manager.go index 92dd1802f6f9b47fdf0476054fa6556a164e36c1..696af9ea86d68027018af1f1650b66d74eda1af7 100644 --- a/internal/rootcoord/proxy_client_manager.go +++ b/internal/rootcoord/proxy_client_manager.go @@ -26,61 +26,73 @@ import ( type proxyClientManager struct { core *Core - lock sync.Mutex + lock sync.RWMutex proxyClient map[int64]types.Proxy + helper proxyClientManagerHelper +} + +type proxyClientManagerHelper struct { + afterConnect func() +} + +var defaultClientManagerHelper = proxyClientManagerHelper{ + afterConnect: func() {}, } func newProxyClientManager(c *Core) *proxyClientManager { return &proxyClientManager{ core: c, - lock: sync.Mutex{}, proxyClient: make(map[int64]types.Proxy), + helper: defaultClientManagerHelper, } } -func (p *proxyClientManager) GetProxyClients(sess []*sessionutil.Session) { - p.lock.Lock() - defer p.lock.Unlock() - var pl map[int64]*sessionutil.Session - for _, s := range sess { - if _, ok := p.proxyClient[s.ServerID]; ok { - continue - } - if len(pl) > 0 { - if _, ok := pl[s.ServerID]; !ok { - continue - } - } - - pc, err := p.core.NewProxyClient(s) - if err != nil { - log.Debug("create proxy client failed", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID), zap.Error(err)) - pl, _ = listProxyInEtcd(p.core.ctx, p.core.etcdCli) - continue - } - p.proxyClient[s.ServerID] = pc - log.Debug("create proxy client", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID)) +func (p *proxyClientManager) GetProxyClients(sessions []*sessionutil.Session) { + for _, session := range sessions { + p.AddProxyClient(session) } } -func (p *proxyClientManager) AddProxyClient(s *sessionutil.Session) { - p.lock.Lock() - defer p.lock.Unlock() - if _, ok := p.proxyClient[s.ServerID]; ok { +func (p *proxyClientManager) AddProxyClient(session *sessionutil.Session) { + p.lock.RLock() + _, ok := p.proxyClient[session.ServerID] + p.lock.RUnlock() + if ok { return } - pc, err := p.core.NewProxyClient(s) + + go p.connect(session) +} + +func (p *proxyClientManager) connect(session *sessionutil.Session) { + pc, err := p.core.NewProxyClient(session) if err != nil { - log.Debug("create proxy client", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID), zap.Error(err)) + log.Warn("failed to create proxy client", zap.String("address", session.Address), zap.Int64("serverID", session.ServerID), zap.Error(err)) return } - p.proxyClient[s.ServerID] = pc - log.Debug("create proxy client", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID)) + + p.lock.Lock() + defer p.lock.Unlock() + + _, ok := p.proxyClient[session.ServerID] + if ok { + pc.Stop() + return + } + p.proxyClient[session.ServerID] = pc + log.Debug("succeed to create proxy client", zap.String("address", session.Address), zap.Int64("serverID", session.ServerID)) + p.helper.afterConnect() } func (p *proxyClientManager) DelProxyClient(s *sessionutil.Session) { p.lock.Lock() defer p.lock.Unlock() + + cli, ok := p.proxyClient[s.ServerID] + if ok { + cli.Stop() + } + delete(p.proxyClient, s.ServerID) log.Debug("remove proxy client", zap.String("proxy address", s.Address), zap.Int64("proxy id", s.ServerID)) } diff --git a/internal/rootcoord/proxy_client_manager_test.go b/internal/rootcoord/proxy_client_manager_test.go index 8fb88ac05129870552a55e0625b249570d7a4012..bf864c07746cb5dd7287bd3fa987cce8db6cb598 100644 --- a/internal/rootcoord/proxy_client_manager_test.go +++ b/internal/rootcoord/proxy_client_manager_test.go @@ -115,6 +115,11 @@ func TestProxyClientManager_ReleaseDQLMessageStream(t *testing.T) { pcm := newProxyClientManager(core) + ch := make(chan struct{}) + pcm.helper = proxyClientManagerHelper{ + afterConnect: func() { ch <- struct{}{} }, + } + pcm.ReleaseDQLMessageStream(ctx, nil) core.SetNewProxyClient( @@ -129,6 +134,7 @@ func TestProxyClientManager_ReleaseDQLMessageStream(t *testing.T) { } pcm.AddProxyClient(session) + <-ch assert.Panics(t, func() { pcm.ReleaseDQLMessageStream(ctx, nil) }) } diff --git a/internal/rootcoord/proxy_manager.go b/internal/rootcoord/proxy_manager.go index 46989b4d9c974ab3ab3643209cae4a223aba40d3..e0406b2549c6c114b8cf0684c784eebf93163c0d 100644 --- a/internal/rootcoord/proxy_manager.go +++ b/internal/rootcoord/proxy_manager.go @@ -73,100 +73,121 @@ func (p *proxyManager) DelSession(fns ...func(*sessionutil.Session)) { // WatchProxy starts a goroutine to watch proxy session changes on etcd func (p *proxyManager) WatchProxy() error { - ctx2, cancel := context.WithTimeout(p.ctx, RequestTimeout) + ctx, cancel := context.WithTimeout(p.ctx, RequestTimeout) defer cancel() - resp, err := p.etcdCli.Get( - ctx2, + + sessions, rev, err := p.getSessionsOnEtcd(ctx) + if err != nil { + return err + } + log.Debug("succeed to get sessions on etcd", zap.Any("sessions", sessions), zap.Int64("revision", rev)) + for _, f := range p.getSessions { + f(sessions) + } + + eventCh := p.etcdCli.Watch( + p.ctx, path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole), clientv3.WithPrefix(), - clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), + clientv3.WithCreatedNotify(), + clientv3.WithPrevKV(), + clientv3.WithRev(rev+1), ) - if err != nil { - return fmt.Errorf("proxyManager, watch proxy failed, error = %w", err) - } + go p.startWatchEtcd(p.ctx, eventCh) + return nil +} - go func() { - sessions := []*sessionutil.Session{} - for _, v := range resp.Kvs { - sess := new(sessionutil.Session) - err := json.Unmarshal(v.Value, sess) - if err != nil { - log.Debug("unmarshal SvrSession failed", zap.Error(err)) - continue +func (p *proxyManager) startWatchEtcd(ctx context.Context, eventCh clientv3.WatchChan) { + log.Debug("start to watch etcd") + for { + select { + case <-ctx.Done(): + log.Warn("stop watching etcd loop") + return + case event, ok := <-eventCh: + if !ok { + log.Warn("stop watching etcd loop due to closed etcd event channel") + return } - sessions = append(sessions, sess) - } - for _, f := range p.getSessions { - f(sessions) - } - for _, s := range sessions { - metrics.RootCoordProxyLister.WithLabelValues(metricProxy(s.ServerID)).Set(1) - log.Debug("Get proxy", zap.Int64("id", s.ServerID), zap.String("addr", s.Address), zap.String("name", s.ServerName)) - } - - rch := p.etcdCli.Watch( - p.ctx, - path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole), - clientv3.WithPrefix(), - clientv3.WithCreatedNotify(), - clientv3.WithPrevKV(), - clientv3.WithRev(resp.Header.Revision+1), - ) - for { - select { - case <-p.ctx.Done(): - log.Debug("context done", zap.Error(p.ctx.Err())) + if err := event.Err(); err != nil { + log.Error("received error event from etcd watcher", zap.Error(err)) return - case wresp, ok := <-rch: - if !ok { - log.Debug("watch proxy failed") - return + } + for _, e := range event.Events { + var err error + switch e.Type { + case mvccpb.PUT: + err = p.handlePutEvent(e) + case mvccpb.DELETE: + err = p.handleDeleteEvent(e) } - pl, _ := listProxyInEtcd(p.ctx, p.etcdCli) - for _, ev := range wresp.Events { - switch ev.Type { - case mvccpb.PUT: - sess := new(sessionutil.Session) - err := json.Unmarshal(ev.Kv.Value, sess) - if err != nil { - log.Debug("watch proxy, unmarshal failed", zap.Error(err)) - continue - } - if len(pl) > 0 { - if _, ok := pl[sess.ServerID]; !ok { - continue - } - } - p.lock.Lock() - log.Debug("watchProxy detect PUT event", zap.Int64("serverID", sess.ServerID)) - for _, f := range p.addSessions { - f(sess) - } - p.lock.Unlock() - metrics.RootCoordProxyLister.WithLabelValues(metricProxy(sess.ServerID)).Set(1) - case mvccpb.DELETE: - sess := new(sessionutil.Session) - err := json.Unmarshal(ev.PrevKv.Value, sess) - if err != nil { - log.Debug("watch proxy, unmarshal failed", zap.Error(err)) - continue - } - p.lock.Lock() - log.Debug("watchProxy detect DELETE event", zap.Int64("serverID", sess.ServerID)) - for _, f := range p.delSessions { - f(sess) - } - p.lock.Unlock() - metrics.RootCoordProxyLister.WithLabelValues(metricProxy(sess.ServerID)).Set(0) - } + if err != nil { + log.Warn("failed to handle proxy event", zap.Any("event", e), zap.Error(err)) } } } - }() + } +} + +func (p *proxyManager) handlePutEvent(e *clientv3.Event) error { + session, err := p.parseSession(e.Kv.Value) + if err != nil { + return err + } + log.Debug("received proxy put event with session", zap.Any("session", session)) + for _, f := range p.addSessions { + f(session) + } + metrics.RootCoordProxyLister.WithLabelValues(metricProxy(session.ServerID)).Set(1) + return nil +} +func (p *proxyManager) handleDeleteEvent(e *clientv3.Event) error { + session, err := p.parseSession(e.PrevKv.Value) + if err != nil { + return err + } + log.Debug("received proxy delete event with session", zap.Any("session", session)) + for _, f := range p.delSessions { + f(session) + } + metrics.RootCoordProxyLister.WithLabelValues(metricProxy(session.ServerID)).Set(0) return nil } +func (p *proxyManager) parseSession(value []byte) (*sessionutil.Session, error) { + session := new(sessionutil.Session) + err := json.Unmarshal(value, session) + if err != nil { + return nil, err + } + return session, nil +} + +func (p *proxyManager) getSessionsOnEtcd(ctx context.Context) ([]*sessionutil.Session, int64, error) { + resp, err := p.etcdCli.Get( + ctx, + path.Join(Params.MetaRootPath, sessionutil.DefaultServiceRoot, typeutil.ProxyRole), + clientv3.WithPrefix(), + clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend), + ) + if err != nil { + return nil, 0, fmt.Errorf("proxy manager failed to watch proxy with error %w", err) + } + + var sessions []*sessionutil.Session + for _, v := range resp.Kvs { + session, err := p.parseSession(v.Value) + if err != nil { + log.Debug("failed to unmarshal session", zap.Error(err)) + continue + } + sessions = append(sessions, session) + } + + return sessions, resp.Header.Revision, nil +} + // Stop stops the proxyManager func (p *proxyManager) Stop() { p.cancel() diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index a280cda2a4eee95890b9e6d38c8ed25c84efda98..e85d76a8936e437617d781c9f30ae32c0ed479f4 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -58,6 +58,10 @@ type proxyMock struct { mutex sync.Mutex } +func (p *proxyMock) Stop() error { + return nil +} + func (p *proxyMock) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) { p.mutex.Lock() defer p.mutex.Unlock()