diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 3c8f99c22978334ad77ec51f8f8e92c1204fa67d..976d79dc9b6deb149735dc948cc6a4c6756b5bcb 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -12,6 +12,7 @@ package datacoord import ( "context" + "errors" "sync/atomic" "time" @@ -53,6 +54,17 @@ func (m *MockAllocator) allocID(ctx context.Context) (UniqueID, error) { return val, nil } +// FailsAllocator allocator that fails +type FailsAllocator struct{} + +func (a *FailsAllocator) allocTimestamp(_ context.Context) (Timestamp, error) { + return 0, errors.New("always fail") +} + +func (a *FailsAllocator) allocID(_ context.Context) (UniqueID, error) { + return 0, errors.New("always fail") +} + func newMockAllocator() *MockAllocator { return &MockAllocator{} } diff --git a/internal/datacoord/segment_allocation_policy.go b/internal/datacoord/segment_allocation_policy.go index 842ca6c3d5942f705b0ceb635ee37981b755666f..7a48587ac1fd6620801420fc1aa7b7cc1eb79231 100644 --- a/internal/datacoord/segment_allocation_policy.go +++ b/internal/datacoord/segment_allocation_policy.go @@ -49,9 +49,7 @@ func AllocatePolicyV1(segments []*SegmentInfo, count int64, existedSegmentAllocations := make([]*Allocation, 0) // create new segment if count >= max num for count >= maxCountPerSegment { - allocation := &Allocation{ - NumOfRows: maxCountPerSegment, - } + allocation := getAllocation(maxCountPerSegment) newSegmentAllocations = append(newSegmentAllocations, allocation) count -= maxCountPerSegment } @@ -69,18 +67,14 @@ func AllocatePolicyV1(segments []*SegmentInfo, count int64, if free < count { continue } - allocation := &Allocation{ - SegmentID: segment.GetID(), - NumOfRows: count, - } + allocation := getAllocation(count) + allocation.SegmentID = segment.GetID() existedSegmentAllocations = append(existedSegmentAllocations, allocation) return newSegmentAllocations, existedSegmentAllocations } // allocate new segment for remaining count - allocation := &Allocation{ - NumOfRows: count, - } + allocation := getAllocation(count) newSegmentAllocations = append(newSegmentAllocations, allocation) return newSegmentAllocations, existedSegmentAllocations } @@ -122,6 +116,9 @@ func getChannelOpenSegCapacityPolicy(limit int) channelSealPolicy { } sortSegmentsByLastExpires(segs) offLen := len(segs) - limit + if offLen > len(segs) { + offLen = len(segs) + } return segs[0:offLen] } } diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index b68e441d2d9f4aae0ecce6ab72d9e8339516a657..207804daeeb44a51b237913632e1227a453178be 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -28,6 +28,37 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" ) +var ( + allocPool = sync.Pool{ + New: func() interface{} { + return &Allocation{} + }, + } +) + +// getAllocation unified way to retrieve allocation struct +func getAllocation(numOfRows int64) *Allocation { + v := allocPool.Get() + a, ok := v.(*Allocation) + if !ok { + a = &Allocation{} + } + if a == nil { + return &Allocation{ + NumOfRows: numOfRows, + } + } + a.NumOfRows = numOfRows + a.ExpireTime = 0 + a.SegmentID = 0 + return a +} + +// putAllocation put allocation for recycling +func putAllocation(a *Allocation) { + allocPool.Put(a) +} + const segmentMaxLifetime = 24 * time.Hour // Manager manage segment related operations. @@ -63,7 +94,6 @@ type SegmentManager struct { segmentSealPolicies []segmentSealPolicy channelSealPolicies []channelSealPolicy flushPolicy flushPolicy - allocPool sync.Pool } type allocHelper struct { @@ -157,11 +187,6 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *Se segmentSealPolicies: defaultSegmentSealPolicy(), // default only segment size policy channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy flushPolicy: defaultFlushPolicy(), - allocPool: sync.Pool{ - New: func() interface{} { - return &Allocation{} - }, - }, } for _, opt := range opts { opt.apply(manager) @@ -180,27 +205,6 @@ func (s *SegmentManager) loadSegmentsFromMeta() { s.segments = segmentsID } -// getAllocation unified way to retrieve allocation struct -func (s *SegmentManager) getAllocation(numOfRows int64) *Allocation { - v := s.allocPool.Get() - if v == nil { - return &Allocation{ - NumOfRows: numOfRows, - } - } - a, ok := v.(*Allocation) - if !ok { - a = &Allocation{} - } - a.NumOfRows = numOfRows - return a -} - -// putAllocation put allocation for recycling -func (s *SegmentManager) putAllocation(a *Allocation) { - s.allocPool.Put(a) -} - // AllocSegment allocate segment per request collcation, partication, channel and rows func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID, partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) { @@ -339,7 +343,7 @@ func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) { } s.meta.SetAllocations(segmentID, []*Allocation{}) for _, allocation := range segment.allocations { - s.putAllocation(allocation) + putAllocation(allocation) } } @@ -407,7 +411,7 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) error { for i := 0; i < len(segment.allocations); i++ { if segment.allocations[i].ExpireTime <= ts { a := segment.allocations[i] - s.putAllocation(a) + putAllocation(a) } else { allocations = append(allocations, segment.allocations[i]) } diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 3331c052ddacf82a4395500f2babab17bbb1aff8..57b21ef7cd22c0d7ea9f3b37efe2eef8d52546fc 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -11,9 +11,15 @@ package datacoord import ( "context" + "errors" + "fmt" + "math" + "sync" "testing" "time" + "github.com/milvus-io/milvus/internal/kv" + memkv "github.com/milvus-io/milvus/internal/kv/mem" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/schemapb" @@ -21,6 +27,67 @@ import ( "github.com/stretchr/testify/assert" ) +func TestManagerOptions(t *testing.T) { + // ctx := context.Background() + Params.Init() + mockAllocator := newMockAllocator() + meta, err := newMemoryMeta(mockAllocator) + assert.Nil(t, err) + segmentManager := newSegmentManager(meta, mockAllocator) + + t.Run("test with alloc helper", func(t *testing.T) { + opt := withAllocHelper(allocHelper{}) + opt.apply(segmentManager) + + assert.True(t, segmentManager.helper.afterCreateSegment == nil) + }) + + t.Run("test withCalUpperLimitPolicy", func(t *testing.T) { + opt := withCalUpperLimitPolicy(defaultCalUpperLimitPolicy()) + assert.NotNil(t, opt) + + //manual set nil`` + segmentManager.estimatePolicy = nil + opt.apply(segmentManager) + assert.True(t, segmentManager.estimatePolicy != nil) + }) + + t.Run("test withAllocPolicy", func(t *testing.T) { + opt := withAllocPolicy(defaultAlocatePolicy()) + assert.NotNil(t, opt) + // manual set nil + segmentManager.allocPolicy = nil + opt.apply(segmentManager) + assert.True(t, segmentManager.allocPolicy != nil) + }) + + t.Run("test withSegmentSealPolicy", func(t *testing.T) { + opt := withSegmentSealPolices(defaultSegmentSealPolicy()...) + assert.NotNil(t, opt) + // manual set nil + segmentManager.segmentSealPolicies = []segmentSealPolicy{} + opt.apply(segmentManager) + assert.True(t, len(segmentManager.segmentSealPolicies) > 0) + }) + + t.Run("test withChannelSealPolicies", func(t *testing.T) { + opt := withChannelSealPolices(getChannelOpenSegCapacityPolicy(1000)) + assert.NotNil(t, opt) + // manaul set nil + segmentManager.channelSealPolicies = []channelSealPolicy{} + opt.apply(segmentManager) + assert.True(t, len(segmentManager.channelSealPolicies) > 0) + }) + t.Run("test withFlushPolicy", func(t *testing.T) { + opt := withFlushPolicy(defaultFlushPolicy()) + assert.NotNil(t, opt) + // manual set nil + segmentManager.flushPolicy = nil + opt.apply(segmentManager) + assert.True(t, segmentManager.flushPolicy != nil) + }) +} + func TestAllocSegment(t *testing.T) { ctx := context.Background() Params.Init() @@ -42,6 +109,13 @@ func TestAllocSegment(t *testing.T) { assert.NotEqualValues(t, 0, allocations[0].SegmentID) assert.NotEqualValues(t, 0, allocations[0].ExpireTime) }) + + t.Run("allocation fails", func(t *testing.T) { + failsAllocator := &FailsAllocator{} + segmentManager := newSegmentManager(meta, failsAllocator) + _, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100) + assert.NotNil(t, err) + }) } func TestLoadSegmentsFromMeta(t *testing.T) { @@ -241,3 +315,190 @@ func TestGetFlushableSegments(t *testing.T) { assert.EqualValues(t, allocations[0].SegmentID, ids[0]) }) } + +// a mock kv that always fail when do `Save` +type saveFailKv struct { + kv.TxnKV +} + +// LoadWithPrefix override behavior +func (kv *saveFailKv) Save(key, value string) error { + fmt.Println("here") + return errors.New("mocked fail") +} + +func TestTryToSealSegment(t *testing.T) { + t.Run("normal seal with segment policies", func(t *testing.T) { + Params.Init() + mockAllocator := newMockAllocator() + meta, err := newMemoryMeta(mockAllocator) + assert.Nil(t, err) + + schema := newTestSchema() + collID, err := mockAllocator.allocID(context.Background()) + assert.Nil(t, err) + meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema}) + segmentManager := newSegmentManager(meta, mockAllocator, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal + allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) + assert.Nil(t, err) + assert.EqualValues(t, 1, len(allocations)) + + ts, err := segmentManager.allocator.allocTimestamp(context.Background()) + assert.Nil(t, err) + err = segmentManager.tryToSealSegment(ts, "c1") + assert.Nil(t, err) + + for _, seg := range segmentManager.meta.segments.segments { + assert.Equal(t, commonpb.SegmentState_Sealed, seg.GetState()) + } + }) + + t.Run("normal seal with channel seal policies", func(t *testing.T) { + Params.Init() + mockAllocator := newMockAllocator() + meta, err := newMemoryMeta(mockAllocator) + assert.Nil(t, err) + + schema := newTestSchema() + collID, err := mockAllocator.allocID(context.Background()) + assert.Nil(t, err) + meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema}) + segmentManager := newSegmentManager(meta, mockAllocator, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal + allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) + assert.Nil(t, err) + assert.EqualValues(t, 1, len(allocations)) + + ts, err := segmentManager.allocator.allocTimestamp(context.Background()) + assert.Nil(t, err) + err = segmentManager.tryToSealSegment(ts, "c1") + assert.Nil(t, err) + + for _, seg := range segmentManager.meta.segments.segments { + assert.Equal(t, commonpb.SegmentState_Sealed, seg.GetState()) + } + }) + + t.Run("normal seal with both segment & channel seal policy", func(t *testing.T) { + Params.Init() + mockAllocator := newMockAllocator() + meta, err := newMemoryMeta(mockAllocator) + assert.Nil(t, err) + + schema := newTestSchema() + collID, err := mockAllocator.allocID(context.Background()) + assert.Nil(t, err) + meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema}) + segmentManager := newSegmentManager(meta, mockAllocator, + withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64)), + withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal + allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) + assert.Nil(t, err) + assert.EqualValues(t, 1, len(allocations)) + + ts, err := segmentManager.allocator.allocTimestamp(context.Background()) + assert.Nil(t, err) + err = segmentManager.tryToSealSegment(ts, "c1") + assert.Nil(t, err) + + for _, seg := range segmentManager.meta.segments.segments { + assert.Equal(t, commonpb.SegmentState_Sealed, seg.GetState()) + } + }) + + t.Run("seal with segment policy with kv fails", func(t *testing.T) { + Params.Init() + mockAllocator := newMockAllocator() + memoryKV := memkv.NewMemoryKV() + fkv := &saveFailKv{TxnKV: memoryKV} + meta, err := NewMeta(memoryKV) + + assert.Nil(t, err) + + schema := newTestSchema() + collID, err := mockAllocator.allocID(context.Background()) + assert.Nil(t, err) + meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema}) + segmentManager := newSegmentManager(meta, mockAllocator, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal + allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) + assert.Nil(t, err) + assert.EqualValues(t, 1, len(allocations)) + + segmentManager.meta.client = fkv + + ts, err := segmentManager.allocator.allocTimestamp(context.Background()) + assert.Nil(t, err) + err = segmentManager.tryToSealSegment(ts, "c1") + assert.NotNil(t, err) + }) + + t.Run("seal with channel policy with kv fails", func(t *testing.T) { + Params.Init() + mockAllocator := newMockAllocator() + memoryKV := memkv.NewMemoryKV() + fkv := &saveFailKv{TxnKV: memoryKV} + meta, err := NewMeta(memoryKV) + + assert.Nil(t, err) + + schema := newTestSchema() + collID, err := mockAllocator.allocID(context.Background()) + assert.Nil(t, err) + meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema}) + segmentManager := newSegmentManager(meta, mockAllocator, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal + allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2) + assert.Nil(t, err) + assert.EqualValues(t, 1, len(allocations)) + + segmentManager.meta.client = fkv + + ts, err := segmentManager.allocator.allocTimestamp(context.Background()) + assert.Nil(t, err) + err = segmentManager.tryToSealSegment(ts, "c1") + assert.NotNil(t, err) + }) +} + +func TestAllocationPool(t *testing.T) { + t.Run("normal get&put", func(t *testing.T) { + allocPool = sync.Pool{ + New: func() interface{} { + return &Allocation{} + }, + } + + allo := getAllocation(100) + assert.EqualValues(t, 100, allo.NumOfRows) + assert.EqualValues(t, 0, allo.ExpireTime) + assert.EqualValues(t, 0, allo.SegmentID) + + putAllocation(allo) + }) + + t.Run("put nil", func(t *testing.T) { + var allo *Allocation = nil + allocPool = sync.Pool{ + New: func() interface{} { + return &Allocation{} + }, + } + putAllocation(allo) + allo = getAllocation(100) + assert.EqualValues(t, 100, allo.NumOfRows) + assert.EqualValues(t, 0, allo.ExpireTime) + assert.EqualValues(t, 0, allo.SegmentID) + }) + + t.Run("put something else", func(t *testing.T) { + allocPool = sync.Pool{ + New: func() interface{} { + return &Allocation{} + }, + } + allocPool.Put(&struct{}{}) + allo := getAllocation(100) + assert.EqualValues(t, 100, allo.NumOfRows) + assert.EqualValues(t, 0, allo.ExpireTime) + assert.EqualValues(t, 0, allo.SegmentID) + + }) +}