未验证 提交 24a4d231 编写于 作者: X XuanYang-cn 提交者: GitHub

Fix DataNode ut never meet condition (#22094)

See also: #22079
Signed-off-by: Nyangxuan <xuan.yang@zilliz.com>
上级 1ac16f9d
......@@ -27,9 +27,12 @@ import (
func TestCompactionExecutor(t *testing.T) {
t.Run("Test execute", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ex := newCompactionExecutor()
go ex.start(context.TODO())
go ex.start(ctx)
ex.execute(newMockCompactor(true))
cancel()
})
t.Run("Test stopTask", func(t *testing.T) {
......
......@@ -26,6 +26,7 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
......@@ -199,7 +200,6 @@ func TestDataSyncService_Start(t *testing.T) {
// init data node
insertChannelName := "by-dev-rootcoord-dml"
ddlChannelName := "by-dev-rootcoord-ddl"
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
......@@ -320,22 +320,14 @@ func TestDataSyncService_Start(t *testing.T) {
insertStream, _ := factory.NewMsgStream(ctx)
insertStream.AsProducer([]string{insertChannelName})
ddStream, _ := factory.NewMsgStream(ctx)
ddStream.AsProducer([]string{ddlChannelName})
var insertMsgStream msgstream.MsgStream = insertStream
insertMsgStream.Start()
var ddMsgStream msgstream.MsgStream = ddStream
ddMsgStream.Start()
err = insertMsgStream.Produce(&msgPack)
assert.NoError(t, err)
err = insertMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
err = ddMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
select {
case flushPack := <-sync.flushListener:
......@@ -354,42 +346,35 @@ func TestDataSyncService_Close(t *testing.T) {
defer cancel()
os.RemoveAll("/tmp/milvus")
defer os.RemoveAll("/tmp/milvus")
Params.DataNodeCfg.FlushInsertBufferSize = 16 * (1 << 20) // 16MB
// init data node
insertChannelName := "by-dev-rootcoord-dml2"
ddlChannelName := "by-dev-rootcoord-ddl2"
var (
insertChannelName = "by-dev-rootcoord-dml2"
Factory := &MetaFactory{}
collMeta := Factory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
mockRootCoord := &RootCoordFactory{
pkType: schemapb.DataType_Int64,
}
metaFactory = &MetaFactory{}
mockRootCoord = &RootCoordFactory{pkType: schemapb.DataType_Int64}
flushChan := make(chan flushMsg, 100)
resendTTChan := make(chan resendTTMsg, 100)
cm := storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
collMeta = metaFactory.GetCollectionMeta(UniqueID(0), "coll1", schemapb.DataType_Int64)
cm = storage.NewLocalChunkManager(storage.RootPath(dataSyncServiceTestDir))
)
defer cm.RemoveWithPrefix(ctx, cm.RootPath())
channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm)
allocFactory := NewAllocatorFactory(1)
factory := dependency.NewDefaultFactory(true)
defer os.RemoveAll("/tmp/milvus")
Params.DataNodeCfg.FlushInsertBufferSize = 0
ufs := []*datapb.SegmentInfo{{
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
ID: 0,
NumOfRows: 0,
ID: 1,
NumOfRows: 1,
DmlPosition: &internalpb.MsgPosition{},
}}
fs := []*datapb.SegmentInfo{{
CollectionID: collMeta.ID,
PartitionID: 1,
InsertChannel: insertChannelName,
ID: 1,
NumOfRows: 0,
ID: 0,
NumOfRows: 1,
DmlPosition: &internalpb.MsgPosition{},
}}
var ufsIds []int64
......@@ -407,10 +392,16 @@ func TestDataSyncService_Close(t *testing.T) {
FlushedSegmentIds: fsIds,
}
signalCh := make(chan string, 100)
var (
flushChan = make(chan flushMsg, 100)
resendTTChan = make(chan resendTTMsg, 100)
signalCh = make(chan string, 100)
dataCoord := &DataCoordFactory{}
dataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{
allocFactory = NewAllocatorFactory(1)
factory = dependency.NewDefaultFactory(true)
mockDataCoord = &DataCoordFactory{}
)
mockDataCoord.UserSegmentInfo = map[int64]*datapb.SegmentInfo{
0: {
ID: 0,
CollectionID: collMeta.ID,
......@@ -426,15 +417,19 @@ func TestDataSyncService_Close(t *testing.T) {
},
}
sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, allocFactory, factory, vchan, signalCh, dataCoord, newCache(), cm, newCompactionExecutor())
channel := newChannel(insertChannelName, collMeta.ID, collMeta.GetSchema(), mockRootCoord, cm)
sync, err := newDataSyncService(ctx, flushChan, resendTTChan, channel, allocFactory, factory, vchan, signalCh, mockDataCoord, newCache(), cm, newCompactionExecutor())
assert.Nil(t, err)
sync.flushListener = make(chan *segmentFlushPack, 10)
defer close(sync.flushListener)
sync.start()
dataFactory := NewDataFactory()
ts := tsoutil.GetCurrentTime()
var (
dataFactory = NewDataFactory()
ts = tsoutil.GetCurrentTime()
)
insertMessages := dataFactory.GetMsgStreamTsInsertMsgs(2, insertChannelName, ts)
msgPack := msgstream.MsgPack{
BeginTs: ts,
......@@ -449,9 +444,7 @@ func TestDataSyncService_Close(t *testing.T) {
}
// 400 is the actual data
int64Pks := []primaryKey{
newInt64PrimaryKey(400),
}
int64Pks := []primaryKey{newInt64PrimaryKey(400)}
deleteMessages := dataFactory.GenMsgStreamDeleteMsgWithTs(0, int64Pks, insertChannelName, ts+1)
inMsgs := make([]msgstream.TsMsg, 0)
inMsgs = append(inMsgs, deleteMessages)
......@@ -502,11 +495,7 @@ func TestDataSyncService_Close(t *testing.T) {
insertStream, _ := factory.NewMsgStream(ctx)
insertStream.AsProducer([]string{insertChannelName})
ddStream, _ := factory.NewMsgStream(ctx)
ddStream.AsProducer([]string{ddlChannelName})
var insertMsgStream msgstream.MsgStream = insertStream
var ddMsgStream msgstream.MsgStream = ddStream
err = insertMsgStream.Produce(&msgPack)
assert.NoError(t, err)
......@@ -516,34 +505,28 @@ func TestDataSyncService_Close(t *testing.T) {
err = insertMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
err = ddMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
// wait for delete
for sync.delBufferManager.GetEntriesNum(1) == 0 {
time.Sleep(100)
}
// wait for delete, no auto flush leads to all data in buffer.
require.Eventually(t, func() bool { return sync.delBufferManager.GetEntriesNum(1) == 1 },
5*time.Second, 100*time.Millisecond)
assert.Equal(t, 0, len(sync.flushListener))
// close and wait for flush
// close will trigger a force sync
sync.close()
for {
select {
case flushPack, ok := <-sync.flushListener:
assert.True(t, ok)
if flushPack.segmentID == 1 {
assert.True(t, len(flushPack.insertLogs) == 12)
assert.True(t, len(flushPack.statsLogs) == 1)
assert.True(t, len(flushPack.deltaLogs) == 1)
return
}
if flushPack.segmentID == 0 {
assert.True(t, len(flushPack.insertLogs) == 0)
assert.True(t, len(flushPack.statsLogs) == 0)
assert.True(t, len(flushPack.deltaLogs) == 0)
}
case <-sync.ctx.Done():
}
}
assert.Eventually(t, func() bool { return len(sync.flushListener) == 1 },
5*time.Second, 100*time.Millisecond)
flushPack, ok := <-sync.flushListener
assert.True(t, ok)
assert.Equal(t, UniqueID(1), flushPack.segmentID)
assert.True(t, len(flushPack.insertLogs) == 12)
assert.True(t, len(flushPack.statsLogs) == 1)
assert.True(t, len(flushPack.deltaLogs) == 1)
<-sync.ctx.Done()
// Double close is safe
sync.close()
<-sync.ctx.Done()
}
func genBytes() (rawData []byte) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册