未验证 提交 daa5ed57 编写于 作者: W wayblink 提交者: GitHub

Save binlog timestampFrom, timestampTo meta when compact (#26210)

Signed-off-by: Nwayblink <anyang.wang@zilliz.com>
上级 153860a6
...@@ -807,6 +807,11 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis ...@@ -807,6 +807,11 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis
for _, l := range binlogs.GetBinlogs() { for _, l := range binlogs.GetBinlogs() {
// TODO, we should probably estimate expired log entries by total rows in binlog and the ralationship of timeTo, timeFrom and expire time // TODO, we should probably estimate expired log entries by total rows in binlog and the ralationship of timeTo, timeFrom and expire time
if l.TimestampTo < compactTime.expireTime { if l.TimestampTo < compactTime.expireTime {
log.RatedDebug(10, "mark binlog as expired",
zap.Int64("segmentID", segment.ID),
zap.Int64("binlogID", l.GetLogID()),
zap.Uint64("binlogTimestampTo", l.TimestampTo),
zap.Uint64("compactExpireTime", compactTime.expireTime))
totalExpiredRows += int(l.GetEntriesNum()) totalExpiredRows += int(l.GetEntriesNum())
totalExpiredSize += l.GetLogSize() totalExpiredSize += l.GetLogSize()
} }
...@@ -814,8 +819,9 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis ...@@ -814,8 +819,9 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis
} }
if float32(totalExpiredRows)/float32(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold || totalExpiredSize > Params.DataCoordCfg.SingleCompactionExpiredLogMaxSize { if float32(totalExpiredRows)/float32(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold || totalExpiredSize > Params.DataCoordCfg.SingleCompactionExpiredLogMaxSize {
log.Info("total expired entities is too much, trigger compaction", zap.Int64("segment", segment.ID), log.Info("total expired entities is too much, trigger compaction", zap.Int64("segmentID", segment.ID),
zap.Int("expired rows", totalExpiredRows), zap.Int64("expired log size", totalExpiredSize)) zap.Int("expiredRows", totalExpiredRows), zap.Int64("expiredLogSize", totalExpiredSize),
zap.Bool("createdByCompaction", segment.CreatedByCompaction), zap.Int64s("compactionFrom", segment.CompactionFrom))
return true return true
} }
......
...@@ -265,8 +265,12 @@ func (t *compactionTask) merge( ...@@ -265,8 +265,12 @@ func (t *compactionTask) merge(
return false return false
} }
addInsertFieldPath := func(inPaths map[UniqueID]*datapb.FieldBinlog) { addInsertFieldPath := func(inPaths map[UniqueID]*datapb.FieldBinlog, timestampFrom, timestampTo int64) {
for fID, path := range inPaths { for fID, path := range inPaths {
for _, binlog := range path.GetBinlogs() {
binlog.TimestampTo = uint64(timestampTo)
binlog.TimestampFrom = uint64(timestampFrom)
}
tmpBinlog, ok := insertField2Path[fID] tmpBinlog, ok := insertField2Path[fID]
if !ok { if !ok {
tmpBinlog = path tmpBinlog = path
...@@ -316,6 +320,12 @@ func (t *compactionTask) merge( ...@@ -316,6 +320,12 @@ func (t *compactionTask) merge(
downloadTimeCost := time.Duration(0) downloadTimeCost := time.Duration(0)
uploadInsertTimeCost := time.Duration(0) uploadInsertTimeCost := time.Duration(0)
// initial timestampFrom, timestampTo = -1, -1 is an illegal value, only to mark initial state
var (
timestampTo int64 = -1
timestampFrom int64 = -1
)
for _, path := range unMergedInsertlogs { for _, path := range unMergedInsertlogs {
downloadStart := time.Now() downloadStart := time.Now()
data, err := t.download(ctxTimeout, path) data, err := t.download(ctxTimeout, path)
...@@ -330,6 +340,7 @@ func (t *compactionTask) merge( ...@@ -330,6 +340,7 @@ func (t *compactionTask) merge(
log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err)) log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err))
return nil, nil, 0, err return nil, nil, 0, err
} }
for iter.HasNext() { for iter.HasNext() {
vInter, _ := iter.Next() vInter, _ := iter.Next()
v, ok := vInter.(*storage.Value) v, ok := vInter.(*storage.Value)
...@@ -349,6 +360,14 @@ func (t *compactionTask) merge( ...@@ -349,6 +360,14 @@ func (t *compactionTask) merge(
continue continue
} }
// Update timestampFrom, timestampTo
if v.Timestamp < timestampFrom || timestampFrom == -1 {
timestampFrom = v.Timestamp
}
if v.Timestamp > timestampTo || timestampFrom == -1 {
timestampTo = v.Timestamp
}
row, ok := v.Value.(map[UniqueID]interface{}) row, ok := v.Value.(map[UniqueID]interface{})
if !ok { if !ok {
log.Warn("transfer interface to map wrong", zap.Strings("path", path)) log.Warn("transfer interface to map wrong", zap.Strings("path", path))
...@@ -371,7 +390,9 @@ func (t *compactionTask) merge( ...@@ -371,7 +390,9 @@ func (t *compactionTask) merge(
return nil, nil, 0, err return nil, nil, 0, err
} }
uploadInsertTimeCost += time.Since(uploadInsertStart) uploadInsertTimeCost += time.Since(uploadInsertStart)
addInsertFieldPath(inPaths) addInsertFieldPath(inPaths, timestampFrom, timestampTo)
timestampFrom = -1
timestampTo = -1
addStatFieldPath(statsPaths) addStatFieldPath(statsPaths)
fID2Content = make(map[int64][]interface{}) fID2Content = make(map[int64][]interface{})
...@@ -389,8 +410,7 @@ func (t *compactionTask) merge( ...@@ -389,8 +410,7 @@ func (t *compactionTask) merge(
return nil, nil, 0, err return nil, nil, 0, err
} }
uploadInsertTimeCost += time.Since(uploadInsertStart) uploadInsertTimeCost += time.Since(uploadInsertStart)
addInsertFieldPath(inPaths, timestampFrom, timestampTo)
addInsertFieldPath(inPaths)
addStatFieldPath(statsPaths) addStatFieldPath(statsPaths)
numRows += int64(currentRows) numRows += int64(currentRows)
......
...@@ -310,6 +310,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) { ...@@ -310,6 +310,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
assert.Equal(t, int64(2), numOfRow) assert.Equal(t, int64(2), numOfRow)
assert.Equal(t, 1, len(inPaths[0].GetBinlogs())) assert.Equal(t, 1, len(inPaths[0].GetBinlogs()))
assert.Equal(t, 1, len(statsPaths)) assert.Equal(t, 1, len(statsPaths))
assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampFrom())
assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampTo())
}) })
t.Run("Merge without expiration2", func(t *testing.T) { t.Run("Merge without expiration2", func(t *testing.T) {
alloc := NewAllocatorFactory(1) alloc := NewAllocatorFactory(1)
...@@ -347,6 +349,53 @@ func TestCompactionTaskInnerMethods(t *testing.T) { ...@@ -347,6 +349,53 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
assert.Equal(t, 2, len(inPaths[0].GetBinlogs())) assert.Equal(t, 2, len(inPaths[0].GetBinlogs()))
assert.Equal(t, 1, len(statsPaths)) assert.Equal(t, 1, len(statsPaths))
assert.Equal(t, 2, len(statsPaths[0].GetBinlogs())) assert.Equal(t, 2, len(statsPaths[0].GetBinlogs()))
assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampFrom())
assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampTo())
})
// set Params.DataNodeCfg.BinLogMaxSize.Key = 1 to generate multi binlogs, each has only one row
t.Run("Merge without expiration3", func(t *testing.T) {
alloc := NewAllocatorFactory(1)
mockbIO := &binlogIO{cm, alloc}
Params.CommonCfg.EntityExpirationTTL = 0
flushInsertBufferSize := Params.DataNodeCfg.BinLogMaxSize
defer func() {
Params.DataNodeCfg.BinLogMaxSize = flushInsertBufferSize
}()
Params.DataNodeCfg.BinLogMaxSize = 1
iData := genInsertDataWithExpiredTS()
var allPaths [][]string
inpath, _, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
assert.NoError(t, err)
assert.Equal(t, 12, len(inpath))
binlogNum := len(inpath[0].GetBinlogs())
assert.Equal(t, 1, binlogNum)
for idx := 0; idx < binlogNum; idx++ {
var ps []string
for _, path := range inpath {
ps = append(ps, path.GetBinlogs()[idx].GetLogPath())
}
allPaths = append(allPaths, ps)
}
dm := map[interface{}]Timestamp{
1: 10000,
}
ct := &compactionTask{Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1)}
inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
assert.NoError(t, err)
assert.Equal(t, int64(2), numOfRow)
assert.Equal(t, 2, len(inPaths[0].GetBinlogs()))
assert.Equal(t, 1, len(statsPaths))
for _, inpath := range inPaths {
assert.NotEqual(t, -1, inpath.GetBinlogs()[0].GetTimestampFrom())
assert.NotEqual(t, -1, inpath.GetBinlogs()[0].GetTimestampTo())
// as only one row for each binlog, timestampTo == timestampFrom
assert.Equal(t, inpath.GetBinlogs()[0].GetTimestampTo(), inpath.GetBinlogs()[0].GetTimestampFrom())
}
}) })
t.Run("Merge with expiration", func(t *testing.T) { t.Run("Merge with expiration", func(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册