未验证 提交 3711fcd8 编写于 作者: C congqixia 提交者: GitHub

Add more info for Delta log compaction (#10083)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 42d56575
...@@ -19,6 +19,7 @@ package datanode ...@@ -19,6 +19,7 @@ package datanode
import ( import (
"context" "context"
"encoding/binary" "encoding/binary"
"math"
"path" "path"
"strconv" "strconv"
"sync" "sync"
...@@ -55,18 +56,31 @@ type deleteNode struct { ...@@ -55,18 +56,31 @@ type deleteNode struct {
type DelDataBuf struct { type DelDataBuf struct {
delData *DeleteData delData *DeleteData
size int64 size int64
tsFrom Timestamp
tsTo Timestamp
} }
func (ddb *DelDataBuf) updateSize(size int64) { func (ddb *DelDataBuf) updateSize(size int64) {
ddb.size += size ddb.size += size
} }
func (ddb *DelDataBuf) updateTimeRange(tr TimeRange) {
if tr.timestampMin < ddb.tsFrom {
ddb.tsFrom = tr.timestampMin
}
if tr.timestampMax > ddb.tsTo {
ddb.tsTo = tr.timestampMax
}
}
func newDelDataBuf() *DelDataBuf { func newDelDataBuf() *DelDataBuf {
return &DelDataBuf{ return &DelDataBuf{
delData: &DeleteData{ delData: &DeleteData{
Data: make(map[string]int64), Data: make(map[string]int64),
}, },
size: 0, size: 0,
tsFrom: math.MaxUint64,
tsTo: 0,
} }
} }
...@@ -78,7 +92,7 @@ func (dn *deleteNode) Close() { ...@@ -78,7 +92,7 @@ func (dn *deleteNode) Close() {
log.Info("Flowgraph Delete Node closing") log.Info("Flowgraph Delete Node closing")
} }
func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg) error { func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) error {
log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys)) log.Debug("bufferDeleteMsg", zap.Any("primary keys", msg.PrimaryKeys))
segIDToPkMap := make(map[UniqueID][]int64) segIDToPkMap := make(map[UniqueID][]int64)
...@@ -115,6 +129,7 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg) error { ...@@ -115,6 +129,7 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg) error {
// store // store
delDataBuf.(*DelDataBuf).updateSize(int64(rows)) delDataBuf.(*DelDataBuf).updateSize(int64(rows))
delDataBuf.(*DelDataBuf).updateTimeRange(tr)
dn.delBuf.Store(segID, delDataBuf) dn.delBuf.Store(segID, delDataBuf)
} }
...@@ -159,7 +174,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg { ...@@ -159,7 +174,7 @@ func (dn *deleteNode) Operate(in []Msg) []Msg {
} }
for _, msg := range fgMsg.deleteMessages { for _, msg := range fgMsg.deleteMessages {
if err := dn.bufferDeleteMsg(msg); err != nil { if err := dn.bufferDeleteMsg(msg, fgMsg.timeRange); err != nil {
log.Error("buffer delete msg failed", zap.Error(err)) log.Error("buffer delete msg failed", zap.Error(err))
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册