watcher_test.go 2.6 KB
Newer Older
X
Xiangyu Wang 已提交
1 2 3
package dataservice

import (
紫晴 已提交
4
	"context"
X
Xiangyu Wang 已提交
5 6 7 8
	"strconv"
	"testing"
	"time"

S
sunby 已提交
9 10
	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"

X
Xiangyu Wang 已提交
11 12
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
G
godchen 已提交
13
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
X
Xiangyu Wang 已提交
14 15 16 17 18

	"github.com/stretchr/testify/assert"
)

func TestDataNodeTTWatcher(t *testing.T) {
紫晴 已提交
19
	ctx := context.Background()
X
Xiangyu Wang 已提交
20 21 22 23 24 25 26 27
	Params.Init()
	c := make(chan struct{})
	cluster := newDataNodeCluster(c)
	defer cluster.ShutDownClients()
	schema := newTestSchema()
	allocator := newMockAllocator()
	meta, err := newMemoryMeta(allocator)
	assert.Nil(t, err)
Y
yukun 已提交
28
	segAllocator := newSegmentAllocator(meta, allocator)
X
Xiangyu Wang 已提交
29 30 31 32 33
	assert.Nil(t, err)
	watcher := newDataNodeTimeTickWatcher(meta, segAllocator, cluster)

	id, err := allocator.allocID()
	assert.Nil(t, err)
S
sunby 已提交
34
	err = meta.AddCollection(&datapb.CollectionInfo{
X
Xiangyu Wang 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
		Schema: schema,
		ID:     id,
	})
	assert.Nil(t, err)

	cases := []struct {
		sealed     bool
		allocation bool
		expired    bool
		expected   bool
	}{
		{false, false, true, false},
		{false, true, true, false},
		{false, true, false, false},
		{true, false, true, true},
		{true, true, false, false},
		{true, true, true, true},
	}

	segmentIDs := make([]UniqueID, len(cases))
	for i, c := range cases {
		segID, err := allocator.allocID()
		segmentIDs[i] = segID
		assert.Nil(t, err)
S
sunby 已提交
59
		segmentInfo, err := BuildSegment(id, 100, segID, "channel"+strconv.Itoa(i))
X
Xiangyu Wang 已提交
60 61 62
		assert.Nil(t, err)
		err = meta.AddSegment(segmentInfo)
		assert.Nil(t, err)
紫晴 已提交
63
		err = segAllocator.OpenSegment(ctx, segmentInfo)
X
Xiangyu Wang 已提交
64 65
		assert.Nil(t, err)
		if c.allocation && c.expired {
紫晴 已提交
66
			_, _, _, err := segAllocator.AllocSegment(ctx, id, 100, "channel"+strconv.Itoa(i), 100)
X
Xiangyu Wang 已提交
67 68 69 70
			assert.Nil(t, err)
		}
	}

S
sunby 已提交
71
	time.Sleep(time.Duration(Params.SegIDAssignExpiration+1000) * time.Millisecond)
X
Xiangyu Wang 已提交
72 73
	for i, c := range cases {
		if c.allocation && !c.expired {
紫晴 已提交
74
			_, _, _, err := segAllocator.AllocSegment(ctx, id, 100, "channel"+strconv.Itoa(i), 100)
X
Xiangyu Wang 已提交
75 76 77
			assert.Nil(t, err)
		}
		if c.sealed {
紫晴 已提交
78
			err := segAllocator.SealSegment(ctx, segmentIDs[i])
X
Xiangyu Wang 已提交
79 80 81 82 83 84 85 86 87 88
			assert.Nil(t, err)
		}
	}
	ts, err := allocator.allocTimestamp()
	assert.Nil(t, err)

	err = watcher.handleTimeTickMsg(&msgstream.TimeTickMsg{
		BaseMsg: msgstream.BaseMsg{
			HashValues: []uint32{0},
		},
G
godchen 已提交
89
		TimeTickMsg: internalpb.TimeTickMsg{
X
Xiangyu Wang 已提交
90
			Base: &commonpb.MsgBase{
91
				MsgType:   commonpb.MsgType_TimeTick,
X
Xiangyu Wang 已提交
92 93 94 95 96 97 98 99 100 101
				Timestamp: ts,
			},
		},
	})
	assert.Nil(t, err)
	for i, c := range cases {
		_, ok := segAllocator.segments[segmentIDs[i]]
		assert.EqualValues(t, !c.expected, ok)
	}
}