From e3e3ac1525e0f8b210a0ff2df6f62f3256791e11 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 3 Apr 2023 16:10:24 +0800 Subject: [PATCH] Use peek segment info in querynodev2 (#23183) Signed-off-by: Congqi Xia --- internal/querynodev2/delegator/delegator.go | 7 +- .../delegator/delegator_data_test.go | 14 +-- .../querynodev2/delegator/delegator_test.go | 8 +- .../querynodev2/delegator/distribution.go | 23 +++-- .../delegator/distribution_test.go | 95 +++++++++++++++++++ .../querynodev2/delegator/mock_delegator.go | 37 +++++--- internal/querynodev2/delegator/snapshot.go | 18 ++-- internal/querynodev2/services.go | 4 +- internal/querynodev2/services_test.go | 9 +- 9 files changed, 160 insertions(+), 55 deletions(-) diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 357e5786a..9a6f7f9f5 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -75,7 +75,7 @@ func newLifetime() *lifetime { type ShardDelegator interface { Collection() int64 Version() int64 - GetDistribution() *distribution + GetSegmentInfo() (sealed []SnapshotItem, growing []SegmentEntry) SyncDistribution(ctx context.Context, entries ...SegmentEntry) Search(ctx context.Context, req *querypb.SearchRequest) ([]*internalpb.SearchResults, error) Query(ctx context.Context, req *querypb.QueryRequest) ([]*internalpb.RetrieveResults, error) @@ -161,8 +161,9 @@ func (sd *shardDelegator) Version() int64 { return sd.version } -func (sd *shardDelegator) GetDistribution() *distribution { - return sd.distribution +// GetSegmentInfo returns current segment distribution snapshot. +func (sd *shardDelegator) GetSegmentInfo() ([]SnapshotItem, []SegmentEntry) { + return sd.distribution.Peek() } // SyncDistribution revises distribution. diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 051501c38..93788041f 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -319,7 +319,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() { }) s.NoError(err) - sealed, _, _ := s.delegator.GetDistribution().GetCurrent() + sealed, _ := s.delegator.GetSegmentInfo() s.Require().Equal(1, len(sealed)) s.Equal(int64(1), sealed[0].NodeID) s.ElementsMatch([]SegmentEntry{ @@ -395,7 +395,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() { }) s.NoError(err) - sealed, _, _ := s.delegator.GetDistribution().GetCurrent() + sealed, _ := s.delegator.GetSegmentInfo() s.Require().Equal(1, len(sealed)) s.Equal(int64(1), sealed[0].NodeID) s.ElementsMatch([]SegmentEntry{ @@ -410,7 +410,6 @@ func (s *DelegatorDataSuite) TestLoadSegments() { PartitionID: 500, }, }, sealed[0].Segments) - }) s.Run("get_worker_fail", func() { @@ -599,8 +598,7 @@ func (s *DelegatorDataSuite) TestReleaseSegment() { }) s.Require().NoError(err) - sealed, growing, version := s.delegator.GetDistribution().GetCurrent() - s.delegator.GetDistribution().FinishUsage(version) + sealed, growing := s.delegator.GetSegmentInfo() s.Require().Equal(1, len(sealed)) s.Equal(int64(1), sealed[0].NodeID) s.ElementsMatch([]SegmentEntry{ @@ -627,8 +625,7 @@ func (s *DelegatorDataSuite) TestReleaseSegment() { }, false) s.NoError(err) - sealed, _, version = s.delegator.GetDistribution().GetCurrent() - s.delegator.GetDistribution().FinishUsage(version) + sealed, _ = s.delegator.GetSegmentInfo() s.Equal(0, len(sealed)) err = s.delegator.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{ @@ -639,8 +636,7 @@ func (s *DelegatorDataSuite) TestReleaseSegment() { }, false) s.NoError(err) - _, growing, version = s.delegator.GetDistribution().GetCurrent() - s.delegator.GetDistribution().FinishUsage(version) + _, growing = s.delegator.GetSegmentInfo() s.Equal(0, len(growing)) err = s.delegator.ReleaseSegments(ctx, &querypb.ReleaseSegmentsRequest{ diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index 5f61dc5c0..67d33d20a 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -142,9 +142,8 @@ func (s *DelegatorSuite) TestBasicInfo() { s.True(s.delegator.Serviceable()) } -func (s *DelegatorSuite) TestDistribution() { - sealed, growing, version := s.delegator.GetDistribution().GetCurrent() - s.delegator.GetDistribution().FinishUsage(version) +func (s *DelegatorSuite) TestGetSegmentInfo() { + sealed, growing := s.delegator.GetSegmentInfo() s.Equal(0, len(sealed)) s.Equal(0, len(growing)) @@ -155,7 +154,7 @@ func (s *DelegatorSuite) TestDistribution() { Version: 2001, }) - sealed, growing, version = s.delegator.GetDistribution().GetCurrent() + sealed, growing = s.delegator.GetSegmentInfo() s.EqualValues([]SnapshotItem{ { NodeID: 1, @@ -170,7 +169,6 @@ func (s *DelegatorSuite) TestDistribution() { }, }, sealed) s.Equal(0, len(growing)) - s.delegator.GetDistribution().FinishUsage(version) } func (s *DelegatorSuite) TestSearch() { diff --git a/internal/querynodev2/delegator/distribution.go b/internal/querynodev2/delegator/distribution.go index fb6dbd18c..682a98b0b 100644 --- a/internal/querynodev2/delegator/distribution.go +++ b/internal/querynodev2/delegator/distribution.go @@ -45,7 +45,7 @@ type distribution struct { snapshots *typeutil.ConcurrentMap[int64, *snapshot] // current is the snapshot for quick usage for search/query // generated for each change of distribution - current *snapshot + current *atomic.Pointer[snapshot] // protects current & segments mut sync.RWMutex } @@ -65,6 +65,7 @@ func NewDistribution() *distribution { growingSegments: make(map[UniqueID]SegmentEntry), sealedSegments: make(map[UniqueID]SegmentEntry), snapshots: typeutil.NewConcurrentMap[int64, *snapshot](), + current: atomic.NewPointer[snapshot](nil), } dist.genSnapshot() @@ -76,8 +77,9 @@ func (d *distribution) GetCurrent(partitions ...int64) (sealed []SnapshotItem, g d.mut.RLock() defer d.mut.RUnlock() - sealed, growing = d.current.Get(partitions...) - version = d.current.version + current := d.current.Load() + sealed, growing = current.Get(partitions...) + version = current.version return } @@ -89,6 +91,14 @@ func (d *distribution) FinishUsage(version int64) { } } +// Peek returns current snapshot without increasing inuse count +// show only used by GetDataDistribution. +func (d *distribution) Peek(partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry) { + current := d.current.Load() + sealed, growing = current.Peek(partitions...) + return sealed, growing +} + // Serviceable returns wether current snapshot is serviceable. func (d *distribution) Serviceable() bool { return d.serviceable.Load() @@ -215,12 +225,13 @@ func (d *distribution) genSnapshot() chan struct{} { // stores last snapshot // ok to be nil - last := d.current + last := d.current.Load() // increase version d.version++ - d.current = NewSnapshot(dist, growing, last, d.version) + newSnapShot := NewSnapshot(dist, growing, last, d.version) + d.current.Store(newSnapShot) // shall be a new one - d.snapshots.GetOrInsert(d.version, d.current) + d.snapshots.GetOrInsert(d.version, newSnapShot) // first snapshot, return closed chan if last == nil { diff --git a/internal/querynodev2/delegator/distribution_test.go b/internal/querynodev2/delegator/distribution_test.go index 878c8b3fd..2e624c3dc 100644 --- a/internal/querynodev2/delegator/distribution_test.go +++ b/internal/querynodev2/delegator/distribution_test.go @@ -390,6 +390,101 @@ func (s *DistributionSuite) TestRemoveDistribution() { } } +func (s *DistributionSuite) TestPeek() { + type testCase struct { + tag string + input []SegmentEntry + expected []SnapshotItem + } + + cases := []testCase{ + { + tag: "one node", + input: []SegmentEntry{ + { + NodeID: 1, + SegmentID: 1, + }, + { + NodeID: 1, + SegmentID: 2, + }, + }, + expected: []SnapshotItem{ + { + NodeID: 1, + Segments: []SegmentEntry{ + { + NodeID: 1, + SegmentID: 1, + }, + { + NodeID: 1, + SegmentID: 2, + }, + }, + }, + }, + }, + { + tag: "multiple nodes", + input: []SegmentEntry{ + { + NodeID: 1, + SegmentID: 1, + }, + { + NodeID: 2, + SegmentID: 2, + }, + { + NodeID: 1, + SegmentID: 3, + }, + }, + expected: []SnapshotItem{ + { + NodeID: 1, + Segments: []SegmentEntry{ + { + NodeID: 1, + SegmentID: 1, + }, + + { + NodeID: 1, + SegmentID: 3, + }, + }, + }, + { + NodeID: 2, + Segments: []SegmentEntry{ + { + NodeID: 2, + SegmentID: 2, + }, + }, + }, + }, + }, + } + + for _, tc := range cases { + s.Run(tc.tag, func() { + s.SetupTest() + defer s.TearDownTest() + + // peek during lock + s.dist.AddDistributions(tc.input...) + s.dist.mut.Lock() + sealed, _ := s.dist.Peek() + s.compareSnapshotItems(tc.expected, sealed) + s.dist.mut.Unlock() + }) + } +} + func TestDistributionSuite(t *testing.T) { suite.Run(t, new(DistributionSuite)) } diff --git a/internal/querynodev2/delegator/mock_delegator.go b/internal/querynodev2/delegator/mock_delegator.go index 0f986e1b1..bfeb28b5b 100644 --- a/internal/querynodev2/delegator/mock_delegator.go +++ b/internal/querynodev2/delegator/mock_delegator.go @@ -87,41 +87,50 @@ func (_c *MockShardDelegator_Collection_Call) Return(_a0 int64) *MockShardDelega return _c } -// GetDistribution provides a mock function with given fields: -func (_m *MockShardDelegator) GetDistribution() *distribution { +// GetSegmentInfo provides a mock function with given fields: +func (_m *MockShardDelegator) GetSegmentInfo() ([]SnapshotItem, []SegmentEntry) { ret := _m.Called() - var r0 *distribution - if rf, ok := ret.Get(0).(func() *distribution); ok { + var r0 []SnapshotItem + if rf, ok := ret.Get(0).(func() []SnapshotItem); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(*distribution) + r0 = ret.Get(0).([]SnapshotItem) } } - return r0 + var r1 []SegmentEntry + if rf, ok := ret.Get(1).(func() []SegmentEntry); ok { + r1 = rf() + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).([]SegmentEntry) + } + } + + return r0, r1 } -// MockShardDelegator_GetDistribution_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDistribution' -type MockShardDelegator_GetDistribution_Call struct { +// MockShardDelegator_GetSegmentInfo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentInfo' +type MockShardDelegator_GetSegmentInfo_Call struct { *mock.Call } -// GetDistribution is a helper method to define mock.On call -func (_e *MockShardDelegator_Expecter) GetDistribution() *MockShardDelegator_GetDistribution_Call { - return &MockShardDelegator_GetDistribution_Call{Call: _e.mock.On("GetDistribution")} +// GetSegmentInfo is a helper method to define mock.On call +func (_e *MockShardDelegator_Expecter) GetSegmentInfo() *MockShardDelegator_GetSegmentInfo_Call { + return &MockShardDelegator_GetSegmentInfo_Call{Call: _e.mock.On("GetSegmentInfo")} } -func (_c *MockShardDelegator_GetDistribution_Call) Run(run func()) *MockShardDelegator_GetDistribution_Call { +func (_c *MockShardDelegator_GetSegmentInfo_Call) Run(run func()) *MockShardDelegator_GetSegmentInfo_Call { _c.Call.Run(func(args mock.Arguments) { run() }) return _c } -func (_c *MockShardDelegator_GetDistribution_Call) Return(_a0 *distribution) *MockShardDelegator_GetDistribution_Call { - _c.Call.Return(_a0) +func (_c *MockShardDelegator_GetSegmentInfo_Call) Return(sealed []SnapshotItem, growing []SegmentEntry) *MockShardDelegator_GetSegmentInfo_Call { + _c.Call.Return(sealed, growing) return _c } diff --git a/internal/querynodev2/delegator/snapshot.go b/internal/querynodev2/delegator/snapshot.go index e48c8f9e8..24c73a6f9 100644 --- a/internal/querynodev2/delegator/snapshot.go +++ b/internal/querynodev2/delegator/snapshot.go @@ -76,6 +76,15 @@ func (s *snapshot) Expire(cleanup snapshotCleanup) { func (s *snapshot) Get(partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry) { s.inUse.Inc() + return s.filter(partitions...) +} + +// Peek returns segment distributions without increasing inUse. +func (s *snapshot) Peek(partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry) { + return s.filter(partitions...) +} + +func (s *snapshot) filter(partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry) { filter := func(entry SegmentEntry, idx int) bool { return len(partitions) == 0 || funcutil.SliceContain(partitions, entry.PartitionID) } @@ -121,12 +130,3 @@ func (s *snapshot) checkCleared(cleanup snapshotCleanup) { }) } } - -func inList(list []int64, target int64) bool { - for _, i := range list { - if i == target { - return true - } - } - return false -} diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 5377074c6..241266135 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -902,9 +902,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get Version: value.Version(), }) - sealed, growing, version := value.GetDistribution().GetCurrent() - value.GetDistribution().FinishUsage(version) - + sealed, growing := value.GetSegmentInfo() sealedSegments := make(map[int64]*querypb.SegmentDist) for _, item := range sealed { for _, segment := range item.Segments { diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 806bfcc92..7af79914b 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -1190,9 +1190,8 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() { delegator, ok := suite.node.delegators.Get(suite.vchannel) suite.True(ok) - sealedSegments, _, version := delegator.GetDistribution().GetCurrent() + sealedSegments, _ := delegator.GetSegmentInfo() suite.Len(sealedSegments[0].Segments, 3) - delegator.GetDistribution().FinishUsage(version) // data req := &querypb.SyncDistributionRequest{ @@ -1215,9 +1214,8 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() { status, err := suite.node.SyncDistribution(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode) - sealedSegments, _, version = delegator.GetDistribution().GetCurrent() + sealedSegments, _ = delegator.GetSegmentInfo() suite.Len(sealedSegments[0].Segments, 3) - delegator.GetDistribution().FinishUsage(version) releaseAction = &querypb.SyncAction{ Type: querypb.SyncType_Remove, @@ -1230,9 +1228,8 @@ func (suite *ServiceSuite) TestSyncDistribution_ReleaseResultCheck() { status, err = suite.node.SyncDistribution(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode) - sealedSegments, _, version = delegator.GetDistribution().GetCurrent() + sealedSegments, _ = delegator.GetSegmentInfo() suite.Len(sealedSegments[0].Segments, 2) - delegator.GetDistribution().FinishUsage(version) } func (suite *ServiceSuite) TestSyncDistribution_Failed() { -- GitLab