未验证 提交 5b22697e 编写于 作者: C congqixia 提交者: GitHub

Add segment manager unit tests (#7525)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 32650570
......@@ -12,6 +12,7 @@ package datacoord
import (
"context"
"errors"
"sync/atomic"
"time"
......@@ -53,6 +54,17 @@ func (m *MockAllocator) allocID(ctx context.Context) (UniqueID, error) {
return val, nil
}
// FailsAllocator allocator that fails
type FailsAllocator struct{}
func (a *FailsAllocator) allocTimestamp(_ context.Context) (Timestamp, error) {
return 0, errors.New("always fail")
}
func (a *FailsAllocator) allocID(_ context.Context) (UniqueID, error) {
return 0, errors.New("always fail")
}
func newMockAllocator() *MockAllocator {
return &MockAllocator{}
}
......
......@@ -49,9 +49,7 @@ func AllocatePolicyV1(segments []*SegmentInfo, count int64,
existedSegmentAllocations := make([]*Allocation, 0)
// create new segment if count >= max num
for count >= maxCountPerSegment {
allocation := &Allocation{
NumOfRows: maxCountPerSegment,
}
allocation := getAllocation(maxCountPerSegment)
newSegmentAllocations = append(newSegmentAllocations, allocation)
count -= maxCountPerSegment
}
......@@ -69,18 +67,14 @@ func AllocatePolicyV1(segments []*SegmentInfo, count int64,
if free < count {
continue
}
allocation := &Allocation{
SegmentID: segment.GetID(),
NumOfRows: count,
}
allocation := getAllocation(count)
allocation.SegmentID = segment.GetID()
existedSegmentAllocations = append(existedSegmentAllocations, allocation)
return newSegmentAllocations, existedSegmentAllocations
}
// allocate new segment for remaining count
allocation := &Allocation{
NumOfRows: count,
}
allocation := getAllocation(count)
newSegmentAllocations = append(newSegmentAllocations, allocation)
return newSegmentAllocations, existedSegmentAllocations
}
......@@ -122,6 +116,9 @@ func getChannelOpenSegCapacityPolicy(limit int) channelSealPolicy {
}
sortSegmentsByLastExpires(segs)
offLen := len(segs) - limit
if offLen > len(segs) {
offLen = len(segs)
}
return segs[0:offLen]
}
}
......
......@@ -28,6 +28,37 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
)
var (
allocPool = sync.Pool{
New: func() interface{} {
return &Allocation{}
},
}
)
// getAllocation unified way to retrieve allocation struct
func getAllocation(numOfRows int64) *Allocation {
v := allocPool.Get()
a, ok := v.(*Allocation)
if !ok {
a = &Allocation{}
}
if a == nil {
return &Allocation{
NumOfRows: numOfRows,
}
}
a.NumOfRows = numOfRows
a.ExpireTime = 0
a.SegmentID = 0
return a
}
// putAllocation put allocation for recycling
func putAllocation(a *Allocation) {
allocPool.Put(a)
}
const segmentMaxLifetime = 24 * time.Hour
// Manager manage segment related operations.
......@@ -63,7 +94,6 @@ type SegmentManager struct {
segmentSealPolicies []segmentSealPolicy
channelSealPolicies []channelSealPolicy
flushPolicy flushPolicy
allocPool sync.Pool
}
type allocHelper struct {
......@@ -157,11 +187,6 @@ func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *Se
segmentSealPolicies: defaultSegmentSealPolicy(), // default only segment size policy
channelSealPolicies: []channelSealPolicy{}, // no default channel seal policy
flushPolicy: defaultFlushPolicy(),
allocPool: sync.Pool{
New: func() interface{} {
return &Allocation{}
},
},
}
for _, opt := range opts {
opt.apply(manager)
......@@ -180,27 +205,6 @@ func (s *SegmentManager) loadSegmentsFromMeta() {
s.segments = segmentsID
}
// getAllocation unified way to retrieve allocation struct
func (s *SegmentManager) getAllocation(numOfRows int64) *Allocation {
v := s.allocPool.Get()
if v == nil {
return &Allocation{
NumOfRows: numOfRows,
}
}
a, ok := v.(*Allocation)
if !ok {
a = &Allocation{}
}
a.NumOfRows = numOfRows
return a
}
// putAllocation put allocation for recycling
func (s *SegmentManager) putAllocation(a *Allocation) {
s.allocPool.Put(a)
}
// AllocSegment allocate segment per request collcation, partication, channel and rows
func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID,
partitionID UniqueID, channelName string, requestRows int64) ([]*Allocation, error) {
......@@ -339,7 +343,7 @@ func (s *SegmentManager) DropSegment(ctx context.Context, segmentID UniqueID) {
}
s.meta.SetAllocations(segmentID, []*Allocation{})
for _, allocation := range segment.allocations {
s.putAllocation(allocation)
putAllocation(allocation)
}
}
......@@ -407,7 +411,7 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) error {
for i := 0; i < len(segment.allocations); i++ {
if segment.allocations[i].ExpireTime <= ts {
a := segment.allocations[i]
s.putAllocation(a)
putAllocation(a)
} else {
allocations = append(allocations, segment.allocations[i])
}
......
......@@ -11,9 +11,15 @@ package datacoord
import (
"context"
"errors"
"fmt"
"math"
"sync"
"testing"
"time"
"github.com/milvus-io/milvus/internal/kv"
memkv "github.com/milvus-io/milvus/internal/kv/mem"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
......@@ -21,6 +27,67 @@ import (
"github.com/stretchr/testify/assert"
)
func TestManagerOptions(t *testing.T) {
// ctx := context.Background()
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
segmentManager := newSegmentManager(meta, mockAllocator)
t.Run("test with alloc helper", func(t *testing.T) {
opt := withAllocHelper(allocHelper{})
opt.apply(segmentManager)
assert.True(t, segmentManager.helper.afterCreateSegment == nil)
})
t.Run("test withCalUpperLimitPolicy", func(t *testing.T) {
opt := withCalUpperLimitPolicy(defaultCalUpperLimitPolicy())
assert.NotNil(t, opt)
//manual set nil``
segmentManager.estimatePolicy = nil
opt.apply(segmentManager)
assert.True(t, segmentManager.estimatePolicy != nil)
})
t.Run("test withAllocPolicy", func(t *testing.T) {
opt := withAllocPolicy(defaultAlocatePolicy())
assert.NotNil(t, opt)
// manual set nil
segmentManager.allocPolicy = nil
opt.apply(segmentManager)
assert.True(t, segmentManager.allocPolicy != nil)
})
t.Run("test withSegmentSealPolicy", func(t *testing.T) {
opt := withSegmentSealPolices(defaultSegmentSealPolicy()...)
assert.NotNil(t, opt)
// manual set nil
segmentManager.segmentSealPolicies = []segmentSealPolicy{}
opt.apply(segmentManager)
assert.True(t, len(segmentManager.segmentSealPolicies) > 0)
})
t.Run("test withChannelSealPolicies", func(t *testing.T) {
opt := withChannelSealPolices(getChannelOpenSegCapacityPolicy(1000))
assert.NotNil(t, opt)
// manaul set nil
segmentManager.channelSealPolicies = []channelSealPolicy{}
opt.apply(segmentManager)
assert.True(t, len(segmentManager.channelSealPolicies) > 0)
})
t.Run("test withFlushPolicy", func(t *testing.T) {
opt := withFlushPolicy(defaultFlushPolicy())
assert.NotNil(t, opt)
// manual set nil
segmentManager.flushPolicy = nil
opt.apply(segmentManager)
assert.True(t, segmentManager.flushPolicy != nil)
})
}
func TestAllocSegment(t *testing.T) {
ctx := context.Background()
Params.Init()
......@@ -42,6 +109,13 @@ func TestAllocSegment(t *testing.T) {
assert.NotEqualValues(t, 0, allocations[0].SegmentID)
assert.NotEqualValues(t, 0, allocations[0].ExpireTime)
})
t.Run("allocation fails", func(t *testing.T) {
failsAllocator := &FailsAllocator{}
segmentManager := newSegmentManager(meta, failsAllocator)
_, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100)
assert.NotNil(t, err)
})
}
func TestLoadSegmentsFromMeta(t *testing.T) {
......@@ -241,3 +315,190 @@ func TestGetFlushableSegments(t *testing.T) {
assert.EqualValues(t, allocations[0].SegmentID, ids[0])
})
}
// a mock kv that always fail when do `Save`
type saveFailKv struct {
kv.TxnKV
}
// LoadWithPrefix override behavior
func (kv *saveFailKv) Save(key, value string) error {
fmt.Println("here")
return errors.New("mocked fail")
}
func TestTryToSealSegment(t *testing.T) {
t.Run("normal seal with segment policies", func(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
schema := newTestSchema()
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
ts, err := segmentManager.allocator.allocTimestamp(context.Background())
assert.Nil(t, err)
err = segmentManager.tryToSealSegment(ts, "c1")
assert.Nil(t, err)
for _, seg := range segmentManager.meta.segments.segments {
assert.Equal(t, commonpb.SegmentState_Sealed, seg.GetState())
}
})
t.Run("normal seal with channel seal policies", func(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
schema := newTestSchema()
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
ts, err := segmentManager.allocator.allocTimestamp(context.Background())
assert.Nil(t, err)
err = segmentManager.tryToSealSegment(ts, "c1")
assert.Nil(t, err)
for _, seg := range segmentManager.meta.segments.segments {
assert.Equal(t, commonpb.SegmentState_Sealed, seg.GetState())
}
})
t.Run("normal seal with both segment & channel seal policy", func(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
schema := newTestSchema()
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator,
withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64)),
withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
ts, err := segmentManager.allocator.allocTimestamp(context.Background())
assert.Nil(t, err)
err = segmentManager.tryToSealSegment(ts, "c1")
assert.Nil(t, err)
for _, seg := range segmentManager.meta.segments.segments {
assert.Equal(t, commonpb.SegmentState_Sealed, seg.GetState())
}
})
t.Run("seal with segment policy with kv fails", func(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
memoryKV := memkv.NewMemoryKV()
fkv := &saveFailKv{TxnKV: memoryKV}
meta, err := NewMeta(memoryKV)
assert.Nil(t, err)
schema := newTestSchema()
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, withSegmentSealPolices(sealByLifetimePolicy(math.MinInt64))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
segmentManager.meta.client = fkv
ts, err := segmentManager.allocator.allocTimestamp(context.Background())
assert.Nil(t, err)
err = segmentManager.tryToSealSegment(ts, "c1")
assert.NotNil(t, err)
})
t.Run("seal with channel policy with kv fails", func(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
memoryKV := memkv.NewMemoryKV()
fkv := &saveFailKv{TxnKV: memoryKV}
meta, err := NewMeta(memoryKV)
assert.Nil(t, err)
schema := newTestSchema()
collID, err := mockAllocator.allocID(context.Background())
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
segmentManager := newSegmentManager(meta, mockAllocator, withChannelSealPolices(getChannelOpenSegCapacityPolicy(-1))) //always seal
allocations, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "c1", 2)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocations))
segmentManager.meta.client = fkv
ts, err := segmentManager.allocator.allocTimestamp(context.Background())
assert.Nil(t, err)
err = segmentManager.tryToSealSegment(ts, "c1")
assert.NotNil(t, err)
})
}
func TestAllocationPool(t *testing.T) {
t.Run("normal get&put", func(t *testing.T) {
allocPool = sync.Pool{
New: func() interface{} {
return &Allocation{}
},
}
allo := getAllocation(100)
assert.EqualValues(t, 100, allo.NumOfRows)
assert.EqualValues(t, 0, allo.ExpireTime)
assert.EqualValues(t, 0, allo.SegmentID)
putAllocation(allo)
})
t.Run("put nil", func(t *testing.T) {
var allo *Allocation = nil
allocPool = sync.Pool{
New: func() interface{} {
return &Allocation{}
},
}
putAllocation(allo)
allo = getAllocation(100)
assert.EqualValues(t, 100, allo.NumOfRows)
assert.EqualValues(t, 0, allo.ExpireTime)
assert.EqualValues(t, 0, allo.SegmentID)
})
t.Run("put something else", func(t *testing.T) {
allocPool = sync.Pool{
New: func() interface{} {
return &Allocation{}
},
}
allocPool.Put(&struct{}{})
allo := getAllocation(100)
assert.EqualValues(t, 100, allo.NumOfRows)
assert.EqualValues(t, 0, allo.ExpireTime)
assert.EqualValues(t, 0, allo.SegmentID)
})
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册