未验证 提交 7dfc8fbf 编写于 作者: C congqixia 提交者: GitHub

Fix data race on keepAliveCancel (#26087)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 830f0678
......@@ -455,9 +455,7 @@ func (s *Session) processKeepAliveResponse(ch <-chan *clientv3.LeaseKeepAliveRes
select {
case <-s.ctx.Done():
log.Warn("keep alive", zap.Error(errors.New("context done")))
if s.keepAliveCancel != nil {
s.keepAliveCancel()
}
s.cancelKeepAlive()
return
case resp, ok := <-ch:
if !ok {
......@@ -803,16 +801,12 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
case <-ctx.Done():
log.Warn("liveness exits due to context done")
// cancel the etcd keepAlive context
if s.keepAliveCancel != nil {
s.keepAliveCancel()
}
s.cancelKeepAlive()
return
case resp, ok := <-s.watchSessionKeyCh:
if !ok {
log.Warn("watch session key channel closed")
if s.keepAliveCancel != nil {
s.keepAliveCancel()
}
s.cancelKeepAlive()
return
}
if resp.Err() != nil {
......@@ -820,17 +814,13 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
if resp.Err() != v3rpc.ErrCompacted {
//close event channel
log.Warn("Watch service found error", zap.Error(resp.Err()))
if s.keepAliveCancel != nil {
s.keepAliveCancel()
}
s.cancelKeepAlive()
return
}
log.Warn("Watch service found compacted error", zap.Error(resp.Err()))
getResp, err := s.etcdCli.Get(s.ctx, s.getSessionKey())
if err != nil || len(getResp.Kvs) == 0 {
if s.keepAliveCancel != nil {
s.keepAliveCancel()
}
s.cancelKeepAlive()
return
}
s.watchSessionKeyCh = s.etcdCli.Watch(s.ctx, s.getSessionKey(), clientv3.WithRev(getResp.Header.Revision))
......@@ -842,9 +832,7 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
log.Info("register session success", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key)))
case mvccpb.DELETE:
log.Info("session key is deleted, exit...", zap.String("role", s.ServerName), zap.String("key", string(event.Kv.Key)))
if s.keepAliveCancel != nil {
s.keepAliveCancel()
}
s.cancelKeepAlive()
}
}
}
......@@ -852,11 +840,17 @@ func (s *Session) LivenessCheck(ctx context.Context, callback func()) {
}()
}
func (s *Session) Stop() {
s.Revoke(time.Second)
func (s *Session) cancelKeepAlive() {
s.keepAliveLock.Lock()
defer s.keepAliveLock.Unlock()
if s.keepAliveCancel != nil {
s.keepAliveCancel()
}
}
func (s *Session) Stop() {
s.Revoke(time.Second)
s.cancelKeepAlive()
s.wg.Wait()
}
......
......@@ -625,7 +625,7 @@ func TestSessionProcessActiveStandBy(t *testing.T) {
s1.LivenessCheck(ctx1, func() {
flag = true
signal <- struct{}{}
s1.keepAliveCancel()
s1.cancelKeepAlive()
})
assert.False(t, s1.isStandby.Load().(bool))
......@@ -875,7 +875,7 @@ func (s *SessionSuite) TestKeepAliveRetryActiveCancel() {
session.processKeepAliveResponse(ch)
session.LivenessCheck(ctx, nil)
// active cancel, should not retry connect
session.keepAliveCancel()
session.cancelKeepAlive()
// sleep a while wait goroutine process
time.Sleep(time.Millisecond * 100)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册