未验证 提交 43a9e175 编写于 作者: C cai.zhang 提交者: GitHub

Exit component process when session key is deleted (#21658) (#22164)

Signed-off-by: Ncai.zhang <cai.zhang@zilliz.com>
上级 dc5abe08
......@@ -166,7 +166,7 @@ func (r *Runner) CheckSessions() error {
func (r *Runner) RegisterSession() error {
r.session.Register()
go r.session.LivenessCheck(r.ctx, func() {})
r.session.LivenessCheck(r.ctx, func() {})
return nil
}
......
......@@ -249,8 +249,9 @@ func (s *Server) Register() error {
return err
}
}
go s.session.LivenessCheck(s.serverLoopCtx, func() {
logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", paramtable.GetNodeID()))
s.session.LivenessCheck(s.serverLoopCtx, func() {
logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID))
if err := s.Stop(); err != nil {
logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err))
}
......@@ -928,13 +929,17 @@ func (s *Server) Stop() error {
s.cluster.Close()
s.garbageCollector.close()
s.stopServerLoop()
s.session.Revoke(time.Second)
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.stopCompactionTrigger()
s.stopCompactionHandler()
}
s.indexBuilder.Stop()
if s.session != nil {
s.session.Stop()
}
return nil
}
......
......@@ -3947,7 +3947,6 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {
var err error
paramtable.Get().Save(Params.CommonCfg.DataCoordTimeTick.Key, Params.CommonCfg.DataCoordTimeTick.GetValue()+strconv.Itoa(rand.Int()))
factory := dependency.NewDefaultFactory(true)
etcdCli, err := etcd.GetEtcdClient(
Params.EtcdCfg.UseEmbedEtcd.GetAsBool(),
Params.EtcdCfg.EtcdUseSSL.GetAsBool(),
......@@ -4077,10 +4076,6 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server {
assert.Nil(t, err)
err = svr.Start()
assert.Nil(t, err)
_, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix())
assert.Nil(t, err)
err = svr.Register()
assert.Nil(t, err)
......@@ -4196,6 +4191,7 @@ func Test_CheckHealth(t *testing.T) {
func Test_newChunkManagerFactory(t *testing.T) {
server := newTestServer2(t, nil)
paramtable.Get().Save(Params.DataCoordCfg.EnableGarbageCollection.Key, "true")
defer closeTestServer(t, server)
t.Run("err_minio_bad_address", func(t *testing.T) {
paramtable.Get().Save(Params.CommonCfg.StorageType.Key, "minio")
......@@ -4221,6 +4217,7 @@ func Test_initGarbageCollection(t *testing.T) {
defer paramtable.Get().Reset(Params.DataCoordCfg.EnableGarbageCollection.Key)
server := newTestServer2(t, nil)
defer closeTestServer(t, server)
t.Run("ok", func(t *testing.T) {
storageCli, err := server.newChunkManagerFactory()
......
......@@ -193,7 +193,7 @@ func (node *DataNode) Register() error {
node.session.Register()
// Start liveness check
go node.session.LivenessCheck(node.ctx, func() {
node.session.LivenessCheck(node.ctx, func() {
log.Error("Data Node disconnected from etcd, process will exit", zap.Int64("Server Id", node.GetSession().ServerID))
if err := node.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
......@@ -580,7 +580,9 @@ func (node *DataNode) Stop() error {
}
}
node.session.Revoke(time.Second)
if node.session != nil {
node.session.Stop()
}
return nil
}
......
......@@ -122,7 +122,7 @@ func (i *IndexNode) Register() error {
i.session.Register()
//start liveness check
go i.session.LivenessCheck(i.loopCtx, func() {
i.session.LivenessCheck(i.loopCtx, func() {
log.Error("Index Node disconnected from etcd, process will exit", zap.Int64("Server Id", i.session.ServerID))
if err := i.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
......@@ -240,7 +240,9 @@ func (i *IndexNode) Stop() error {
if i.sched != nil {
i.sched.Close()
}
i.session.Revoke(time.Second)
if i.session != nil {
i.session.Stop()
}
log.Info("Index node stopped.")
})
......
......@@ -132,7 +132,7 @@ func NewProxy(ctx context.Context, factory dependency.Factory) (*Proxy, error) {
// Register registers proxy at etcd
func (node *Proxy) Register() error {
node.session.Register()
go node.session.LivenessCheck(node.ctx, func() {
node.session.LivenessCheck(node.ctx, func() {
log.Error("Proxy disconnected from etcd, process will exit", zap.Int64("Server Id", node.session.ServerID))
if err := node.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
......@@ -420,7 +420,9 @@ func (node *Proxy) Stop() error {
cb()
}
node.session.Revoke(time.Second)
if node.session != nil {
node.session.Stop()
}
if node.shardMgr != nil {
node.shardMgr.Close()
......
......@@ -22,7 +22,6 @@ import (
"os"
"sync"
"syscall"
"time"
"github.com/cockroachdb/errors"
clientv3 "go.etcd.io/etcd/client/v3"
......@@ -134,8 +133,9 @@ func (s *Server) Register() error {
return err
}
}
go s.session.LivenessCheck(s.ctx, func() {
log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", paramtable.GetNodeID()))
s.session.LivenessCheck(s.ctx, func() {
log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.ServerID))
if err := s.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
}
......@@ -425,7 +425,7 @@ func (s *Server) startServerLoop() {
func (s *Server) Stop() error {
s.cancel()
if s.session != nil {
s.session.Revoke(time.Second)
s.session.Stop()
}
if s.session != nil {
......
......@@ -156,7 +156,7 @@ func (node *QueryNode) initSession() error {
func (node *QueryNode) Register() error {
node.session.Register()
// start liveness check
go node.session.LivenessCheck(node.ctx, func() {
node.session.LivenessCheck(node.ctx, func() {
log.Error("Query Node disconnected from etcd, process will exit", zap.Int64("Server Id", paramtable.GetNodeID()))
if err := node.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
......
......@@ -278,7 +278,7 @@ func (c *Core) Register() error {
}
}
log.Info("RootCoord Register Finished")
go c.session.LivenessCheck(c.ctx, func() {
c.session.LivenessCheck(c.ctx, func() {
log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID))
if err := c.Stop(); err != nil {
log.Fatal("failed to stop server", zap.Error(err))
......@@ -674,7 +674,7 @@ func (c *Core) cancelIfNotNil() {
func (c *Core) revokeSession() {
if c.session != nil {
// wait at most one second to revoke
c.session.Revoke(time.Second)
c.session.Stop()
log.Info("revoke rootcoord session")
}
}
......
......@@ -95,9 +95,11 @@ type Session struct {
TriggerKill bool
Version semver.Version `json:"Version,omitempty"`
liveCh <-chan bool
etcdCli *clientv3.Client
leaseID *clientv3.LeaseID
liveCh <-chan bool
etcdCli *clientv3.Client
leaseID *clientv3.LeaseID
watchSessionKeyCh clientv3.WatchChan
wg sync.WaitGroup
metaRoot string
......@@ -342,6 +344,22 @@ func (s *Session) getCompleteKey() string {
return path.Join(s.metaRoot, DefaultServiceRoot, key)
}
func (s *Session) getSessionKey() string {
key := s.ServerName
if !s.Exclusive {
key = fmt.Sprintf("%s-%d", key, s.ServerID)
}
return path.Join(s.metaRoot, DefaultServiceRoot, key)
}
func (s *Session) initWatchSessionCh() {
getResp, err := s.etcdCli.Get(context.Background(), s.getSessionKey())
if err != nil {
panic(err)
}
s.watchSessionKeyCh = s.etcdCli.Watch(context.Background(), s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision))
}
// registerService registers the service to etcd so that other services
// can find that the service is online and issue subsequent operations
// RegisterService will save a key-value in etcd
......@@ -396,10 +414,7 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er
log.Debug("put session key into etcd", zap.String("key", completeKey), zap.String("value", string(sessionJSON)))
keepAliveCtx, keepAliveCancel := context.WithCancel(context.Background())
s.keepAliveCancel = func() {
s.Revoke(time.Second)
keepAliveCancel()
}
s.keepAliveCancel = keepAliveCancel
ch, err = s.etcdCli.KeepAlive(keepAliveCtx, resp.ID)
if err != nil {
log.Warn("go error during keeping alive with etcd", zap.Error(err))
......@@ -419,7 +434,9 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er
// If keepAlive fails for unexpected error, it will send a signal to the channel.
func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveResponse) (failChannel <-chan bool) {
failCh := make(chan bool)
s.wg.Add(1)
go func() {
defer s.wg.Done()
for {
select {
case <-s.ctx.Done():
......@@ -689,28 +706,81 @@ func (w *sessionWatcher) handleWatchErr(err error) error {
// ch is the liveness signal channel, ch is closed only when the session is expired
// callback is the function to call when ch is closed, note that callback will not be invoked when loop exits due to context
func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
for {
select {
case _, ok := <-s.liveCh:
// ok, still alive
if ok {
continue
}
// not ok, connection lost
log.Warn("connection lost detected, shuting down")
if callback != nil {
go callback()
}
return
case <-ctx.Done():
log.Debug("liveness exits due to context done")
// cancel the etcd keepAlive context
if s.keepAliveCancel != nil {
s.keepAliveCancel()
s.initWatchSessionCh()
s.wg.Add(1)
go func() {
defer s.wg.Done()
for {
select {
case _, ok := <-s.liveCh:
// ok, still alive
if ok {
continue
}
// not ok, connection lost
log.Warn("connection lost detected, shuting down")
if callback != nil {
go callback()
}
return
case <-ctx.Done():
log.Debug("liveness exits due to context done")
// cancel the etcd keepAlive context
if s.keepAliveCancel != nil {
s.keepAliveCancel()
}
return
case resp, ok := <-s.watchSessionKeyCh:
if !ok {
log.Warn("watch session key channel closed")
if s.keepAliveCancel != nil {
s.keepAliveCancel()
}
return
}
if resp.Err() != nil {
// if not ErrCompacted, just close the channel
if resp.Err() != v3rpc.ErrCompacted {
//close event channel
log.Warn("Watch service found error", zap.Error(resp.Err()))
if s.keepAliveCancel != nil {
s.keepAliveCancel()
}
return
}
log.Warn("Watch service found compacted error", zap.Error(resp.Err()))
getResp, err := s.etcdCli.Get(s.ctx, s.getSessionKey())
if err != nil || len(getResp.Kvs) == 0 {
if s.keepAliveCancel != nil {
s.keepAliveCancel()
}
return
}
s.watchSessionKeyCh = s.etcdCli.Watch(s.ctx, s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision))
continue
}
for _, event := range resp.Events {
switch event.Type {
case mvccpb.PUT:
log.Info("register session success", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key)))
case mvccpb.DELETE:
log.Info("session key is deleted, exit...", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key)))
if s.keepAliveCancel != nil {
s.keepAliveCancel()
}
}
}
}
return
}
}()
}
func (s *Session) Stop() {
s.Revoke(time.Second)
if s.keepAliveCancel != nil {
s.keepAliveCancel()
}
s.wg.Wait()
}
// Revoke revokes the internal leaseID for the session key
......
......@@ -183,7 +183,19 @@ func TestUpdateSessions(t *testing.T) {
}
func TestSessionLivenessCheck(t *testing.T) {
s := &Session{}
paramtable.Init()
params := paramtable.Get()
endpoints := params.GetWithDefault("etcd.endpoints", paramtable.DefaultEtcdEndpoints)
metaRoot := fmt.Sprintf("%d/%s", rand.Int(), DefaultServiceRoot)
etcdEndpoints := strings.Split(endpoints, ",")
etcdCli, err := etcd.GetRemoteEtcdClient(etcdEndpoints)
require.NoError(t, err)
s := &Session{
etcdCli: etcdCli,
metaRoot: metaRoot,
}
ctx := context.Background()
ch := make(chan bool)
s.liveCh = ch
......@@ -191,7 +203,7 @@ func TestSessionLivenessCheck(t *testing.T) {
flag := false
go s.LivenessCheck(ctx, func() {
s.LivenessCheck(ctx, func() {
flag = true
signal <- struct{}{}
})
......@@ -211,7 +223,7 @@ func TestSessionLivenessCheck(t *testing.T) {
s.liveCh = ch
flag = false
go s.LivenessCheck(ctx, func() {
s.LivenessCheck(ctx, func() {
flag = true
signal <- struct{}{}
})
......@@ -648,7 +660,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) {
wg.Done()
return nil
})
go s1.LivenessCheck(ctx1, func() {
s1.LivenessCheck(ctx1, func() {
flag = true
signal <- struct{}{}
s1.keepAliveCancel()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册