diff --git a/internal/datacoord/channel_checker.go b/internal/datacoord/channel_checker.go index 8915bf52828e5ab7bbc68da965048c0bf6a02bdc..4246e588507fd6a4a45fe9abe17c2dcb5d153ef1 100644 --- a/internal/datacoord/channel_checker.go +++ b/internal/datacoord/channel_checker.go @@ -101,14 +101,16 @@ func (c *channelStateTimer) startOne(watchState datapb.ChannelWatchState, channe select { case <-time.NewTimer(time.Until(timeoutT)).C: log.Info("timeout and stop timer: wait for channel ACK timeout", - zap.String("state", watchState.String()), + zap.String("watch state", watchState.String()), + zap.Int64("nodeID", nodeID), zap.String("channel name", channelName), zap.Time("timeout time", timeoutT)) ackType := getAckType(watchState) c.notifyTimeoutWatcher(&ackEvent{ackType, channelName, nodeID}) case <-stop: log.Debug("stop timer before timeout", - zap.String("state", watchState.String()), + zap.String("watch state", watchState.String()), + zap.Int64("nodeID", nodeID), zap.String("channel name", channelName), zap.Time("timeout time", timeoutT)) } diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 7313500106e44e8f4fda33da78d61d033434c0dd..4f42e4de746fc239858ddd6a40a45be22c276642 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -121,7 +121,8 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error { } // Process watch states for old nodes. - if err := c.checkOldNodes(oNodes); err != nil { + oldOnLines := c.getOldOnlines(nodes, oNodes) + if err := c.checkOldNodes(oldOnLines); err != nil { return err } @@ -152,8 +153,9 @@ func (c *ChannelManager) Startup(ctx context.Context, nodes []int64) error { } log.Info("cluster start up", - zap.Any("nodes", nodes), - zap.Any("oNodes", oNodes), + zap.Int64s("nodes", nodes), + zap.Int64s("oNodes", oNodes), + zap.Int64s("old onlines", oldOnLines), zap.Int64s("new onlines", newOnLines), zap.Int64s("offLines", offLines)) return nil @@ -176,6 +178,10 @@ func (c *ChannelManager) checkOldNodes(nodes []UniqueID) error { for _, info := range watchInfos { channelName := info.GetVchan().GetChannelName() + log.Debug("processing watch info", + zap.String("watch state", info.GetState().String()), + zap.String("channel name", channelName)) + switch info.GetState() { case datapb.ChannelWatchState_ToWatch, datapb.ChannelWatchState_Uncomplete: c.stateTimer.startOne(datapb.ChannelWatchState_ToWatch, channelName, nodeID, info.GetTimeoutTs()) @@ -257,6 +263,21 @@ func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) { } } +// getOldOnlines returns a list of old online node ids in `old` and in `curr`. +func (c *ChannelManager) getOldOnlines(curr []int64, old []int64) []int64 { + mcurr := make(map[int64]struct{}) + ret := make([]int64, 0, len(old)) + for _, n := range curr { + mcurr[n] = struct{}{} + } + for _, n := range old { + if _, found := mcurr[n]; found { + ret = append(ret, n) + } + } + return ret +} + // getNewOnLines returns a list of new online node ids in `curr` but not in `old`. func (c *ChannelManager) getNewOnLines(curr []int64, old []int64) []int64 { mold := make(map[int64]struct{}) @@ -571,21 +592,21 @@ func (c *ChannelManager) processAck(e *ackEvent) { err := c.Release(e.nodeID, e.channelName) if err != nil { log.Warn("fail to set channels to release for watch failure ACKs", - zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName)) + zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err)) } case releaseFailAck, releaseTimeoutAck: // failure acks from toRelease err := c.cleanUpAndDelete(e.nodeID, e.channelName) if err != nil { log.Warn("fail to clean and delete channels for release failure ACKs", - zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName)) + zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err)) } case releaseSuccessAck: err := c.toDelete(e.nodeID, e.channelName) if err != nil { log.Warn("fail to response to release success ACK", - zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName)) + zap.Int64("nodeID", e.nodeID), zap.String("channel name", e.channelName), zap.Error(err)) } } } diff --git a/internal/datacoord/channel_manager_test.go b/internal/datacoord/channel_manager_test.go index 1a212faec084d533ffc0b2643f9d1945760b8132..ea04a44eec3f1958fcfe8dd8e85b44f8a5fba4e9 100644 --- a/internal/datacoord/channel_manager_test.go +++ b/internal/datacoord/channel_manager_test.go @@ -788,3 +788,31 @@ func TestChannelManager_RemoveChannel(t *testing.T) { }) } } + +func TestChannelManager_HelperFunc(t *testing.T) { + c := &ChannelManager{} + t.Run("test getOldOnlines", func(t *testing.T) { + tests := []struct { + nodes []int64 + oNodes []int64 + + expectedOut []int64 + desription string + }{ + {[]int64{}, []int64{}, []int64{}, "empty both"}, + {[]int64{1}, []int64{}, []int64{}, "empty oNodes"}, + {[]int64{}, []int64{1}, []int64{}, "empty nodes"}, + {[]int64{1}, []int64{1}, []int64{1}, "same one"}, + {[]int64{1, 2}, []int64{1}, []int64{1}, "same one 2"}, + {[]int64{1}, []int64{1, 2}, []int64{1}, "same one 3"}, + {[]int64{1, 2}, []int64{1, 2}, []int64{1, 2}, "same two"}, + } + + for _, test := range tests { + t.Run(test.desription, func(t *testing.T) { + nodes := c.getOldOnlines(test.nodes, test.oNodes) + assert.ElementsMatch(t, test.expectedOut, nodes) + }) + } + }) +} diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 699894e9b5b3536bdf9a6afaa07110dbed5564ea..d3ee112b568a1e33e5314cff0607d4292c3b067c 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -117,6 +117,11 @@ func (s *Session) Init(serverName, address string, exclusive bool, triggerKill b s.ServerID = serverID } +// String makes Session struct able to be logged by zap +func (s *Session) String() string { + return fmt.Sprintf("Session:", s.ServerID, s.ServerName) +} + // Register will process keepAliveResponse to keep alive with etcd. func (s *Session) Register() { ch, err := s.registerService() diff --git a/internal/util/sessionutil/session_util_test.go b/internal/util/sessionutil/session_util_test.go index f5451717919668fe8934e205a36519615754ea87..523805a0560429127820959485f94846f991c08d 100644 --- a/internal/util/sessionutil/session_util_test.go +++ b/internal/util/sessionutil/session_util_test.go @@ -12,10 +12,12 @@ import ( "time" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" @@ -399,3 +401,8 @@ func TestSession_Registered(t *testing.T) { session.UpdateRegistered(true) assert.True(t, session.Registered()) } + +func TestSession_String(t *testing.T) { + s := &Session{} + log.Debug("log session", zap.Any("session", s)) +}