未验证 提交 715878c8 编写于 作者: B bigsheeper 提交者: GitHub

Set delBuf's start-end position of compacted segment (#20615)

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 cb294d14
......@@ -295,6 +295,7 @@ func (ddb *DelDataBuf) mergeDelDataBuf(buf *DelDataBuf) {
tr := TimeRange{timestampMax: buf.TimestampTo, timestampMin: buf.TimestampFrom}
ddb.updateTimeRange(tr)
ddb.updateStartAndEndPosition(buf.startPos, buf.endPos)
ddb.delData.Pks = append(ddb.delData.Pks, buf.delData.Pks...)
ddb.delData.Tss = append(ddb.delData.Tss, buf.delData.Tss...)
......
......@@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
func genTestCollectionSchema(dim int64) *schemapb.CollectionSchema {
......@@ -162,10 +163,12 @@ func Test_CompactSegBuff(t *testing.T) {
//2. set up deleteDataBuf for seg1 and seg2
delDataBuf1 := newDelDataBuf()
delDataBuf1.EntriesNum++
delDataBuf1.updateStartAndEndPosition(nil, &internalpb.MsgPosition{Timestamp: 50})
delBufferManager.Store(segID1, delDataBuf1)
heap.Push(delBufferManager.delBufHeap, delDataBuf1.item)
delDataBuf2 := newDelDataBuf()
delDataBuf2.EntriesNum++
delDataBuf2.updateStartAndEndPosition(nil, &internalpb.MsgPosition{Timestamp: 50})
delBufferManager.Store(segID2, delDataBuf2)
heap.Push(delBufferManager.delBufHeap, delDataBuf2.item)
......@@ -180,4 +183,16 @@ func Test_CompactSegBuff(t *testing.T) {
assert.False(t, seg1Exist)
assert.False(t, seg2Exist)
assert.Equal(t, int64(2), delBufferManager.GetEntriesNum(compactedToSegID))
//5. test roll and evict (https://github.com/milvus-io/milvus/issues/20501)
delBufferManager.channel.rollDeleteBuffer(compactedToSegID)
_, segCompactedToExist := delBufferManager.Load(compactedToSegID)
assert.False(t, segCompactedToExist)
delBufferManager.channel.evictHistoryDeleteBuffer(compactedToSegID, &internalpb.MsgPosition{
Timestamp: 100,
})
cp := delBufferManager.channel.getChannelCheckpoint(&internalpb.MsgPosition{
Timestamp: 200,
})
assert.Equal(t, Timestamp(200), cp.Timestamp) // evict all buffer, use ttPos as cp
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册