segment_allocator_test.go 3.9 KB
Newer Older
S
sunby 已提交
1 2 3
package dataservice

import (
S
sunby 已提交
4
	"log"
S
sunby 已提交
5 6 7 8 9
	"math"
	"strconv"
	"testing"
	"time"

S
sunby 已提交
10 11
	"github.com/zilliztech/milvus-distributed/internal/util/tsoutil"

S
sunby 已提交
12 13 14 15 16 17 18 19
	"github.com/stretchr/testify/assert"
)

func TestAllocSegment(t *testing.T) {
	Params.Init()
	mockAllocator := newMockAllocator()
	meta, err := newMemoryMeta(mockAllocator)
	assert.Nil(t, err)
Y
yukun 已提交
20
	segAllocator := newSegmentAllocator(meta, mockAllocator)
S
sunby 已提交
21

S
sunby 已提交
22
	schema := newTestSchema()
S
sunby 已提交
23
	collID, err := mockAllocator.allocID()
S
sunby 已提交
24
	assert.Nil(t, err)
S
sunby 已提交
25 26 27 28 29
	err = meta.AddCollection(&collectionInfo{
		ID:     collID,
		Schema: schema,
	})
	assert.Nil(t, err)
N
neza2017 已提交
30 31
	id, err := mockAllocator.allocID()
	assert.Nil(t, err)
S
sunby 已提交
32
	segmentInfo, err := BuildSegment(collID, 100, id, "c1")
S
sunby 已提交
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
	assert.Nil(t, err)
	err = meta.AddSegment(segmentInfo)
	assert.Nil(t, err)
	err = segAllocator.OpenSegment(segmentInfo)
	assert.Nil(t, err)

	cases := []struct {
		collectionID UniqueID
		partitionID  UniqueID
		channelName  string
		requestRows  int
		expectResult bool
	}{
		{collID, 100, "c1", 100, true},
		{collID + 1, 100, "c1", 100, false},
		{collID, 101, "c1", 100, false},
		{collID, 100, "c3", 100, false},
		{collID, 100, "c1", math.MaxInt64, false},
	}
	for _, c := range cases {
		id, count, expireTime, err := segAllocator.AllocSegment(c.collectionID, c.partitionID, c.channelName, c.requestRows)
		if c.expectResult {
			assert.Nil(t, err)
			assert.EqualValues(t, c.requestRows, count)
			assert.NotEqualValues(t, 0, id)
			assert.NotEqualValues(t, 0, expireTime)
		} else {
			assert.NotNil(t, err)
		}
	}
}

func TestSealSegment(t *testing.T) {
	Params.Init()
	mockAllocator := newMockAllocator()
	meta, err := newMemoryMeta(mockAllocator)
	assert.Nil(t, err)
Y
yukun 已提交
70
	segAllocator := newSegmentAllocator(meta, mockAllocator)
S
sunby 已提交
71

S
sunby 已提交
72
	schema := newTestSchema()
S
sunby 已提交
73
	collID, err := mockAllocator.allocID()
S
sunby 已提交
74
	assert.Nil(t, err)
S
sunby 已提交
75 76 77 78 79 80 81
	err = meta.AddCollection(&collectionInfo{
		ID:     collID,
		Schema: schema,
	})
	assert.Nil(t, err)
	var lastSegID UniqueID
	for i := 0; i < 10; i++ {
N
neza2017 已提交
82 83
		id, err := mockAllocator.allocID()
		assert.Nil(t, err)
S
sunby 已提交
84
		segmentInfo, err := BuildSegment(collID, 100, id, "c"+strconv.Itoa(i))
S
sunby 已提交
85 86 87 88 89 90 91 92 93 94
		assert.Nil(t, err)
		err = meta.AddSegment(segmentInfo)
		assert.Nil(t, err)
		err = segAllocator.OpenSegment(segmentInfo)
		assert.Nil(t, err)
		lastSegID = segmentInfo.SegmentID
	}

	err = segAllocator.SealSegment(lastSegID)
	assert.Nil(t, err)
N
neza2017 已提交
95
	segAllocator.SealAllSegments(collID)
S
sunby 已提交
96 97
	sealedSegments, err := segAllocator.GetSealedSegments()
	assert.Nil(t, err)
S
sunby 已提交
98
	assert.EqualValues(t, 10, len(sealedSegments))
S
sunby 已提交
99 100 101 102 103 104 105
}

func TestExpireSegment(t *testing.T) {
	Params.Init()
	mockAllocator := newMockAllocator()
	meta, err := newMemoryMeta(mockAllocator)
	assert.Nil(t, err)
Y
yukun 已提交
106
	segAllocator := newSegmentAllocator(meta, mockAllocator)
S
sunby 已提交
107

S
sunby 已提交
108
	schema := newTestSchema()
S
sunby 已提交
109
	collID, err := mockAllocator.allocID()
S
sunby 已提交
110
	assert.Nil(t, err)
S
sunby 已提交
111 112 113 114 115
	err = meta.AddCollection(&collectionInfo{
		ID:     collID,
		Schema: schema,
	})
	assert.Nil(t, err)
N
neza2017 已提交
116 117
	id, err := mockAllocator.allocID()
	assert.Nil(t, err)
S
sunby 已提交
118
	segmentInfo, err := BuildSegment(collID, 100, id, "c1")
S
sunby 已提交
119 120 121 122 123 124
	assert.Nil(t, err)
	err = meta.AddSegment(segmentInfo)
	assert.Nil(t, err)
	err = segAllocator.OpenSegment(segmentInfo)
	assert.Nil(t, err)

S
sunby 已提交
125 126 127
	id1, _, et, err := segAllocator.AllocSegment(collID, 100, "c1", 10)
	ts2, _ := tsoutil.ParseTS(et)
	log.Printf("physical ts: %s", ts2.String())
S
sunby 已提交
128
	assert.Nil(t, err)
S
sunby 已提交
129

S
sunby 已提交
130 131
	ts, err := mockAllocator.allocTimestamp()
	assert.Nil(t, err)
S
sunby 已提交
132 133 134 135 136
	t1, _ := tsoutil.ParseTS(ts)
	log.Printf("before ts: %s", t1.String())
	time.Sleep(time.Duration(Params.SegIDAssignExpiration+1000) * time.Millisecond)
	ts, err = mockAllocator.allocTimestamp()
	assert.Nil(t, err)
S
sunby 已提交
137 138 139
	err = segAllocator.ExpireAllocations(ts)
	assert.Nil(t, err)
	expired, err := segAllocator.IsAllocationsExpired(id1, ts)
S
sunby 已提交
140 141 142 143
	if et > ts {
		tsPhy, _ := tsoutil.ParseTS(ts)
		log.Printf("ts %s", tsPhy.String())
	}
S
sunby 已提交
144 145 146 147
	assert.Nil(t, err)
	assert.True(t, expired)
	assert.EqualValues(t, 0, len(segAllocator.segments[id1].allocations))
}