未验证 提交 68a65873 编写于 作者: C congqixia 提交者: GitHub

Set insert&stats binlog timestamp range (#19005)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 9cd19f53
......@@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"math"
"reflect"
"sync"
......@@ -108,6 +109,8 @@ type BufferData struct {
buffer *InsertData
size int64
limit int64
tsFrom Timestamp
tsTo Timestamp
}
// newBufferData needs an input dimension to calculate the limit of this buffer
......@@ -133,7 +136,12 @@ func newBufferData(dimension int64) (*BufferData, error) {
limit := Params.DataNodeCfg.FlushInsertBufferSize / (dimension * 4)
//TODO::xige-16 eval vec and string field
return &BufferData{&InsertData{Data: make(map[UniqueID]storage.FieldData)}, 0, limit}, nil
return &BufferData{
buffer: &InsertData{Data: make(map[UniqueID]storage.FieldData)},
size: 0,
limit: limit,
tsFrom: math.MaxUint64,
tsTo: 0}, nil
}
func (bd *BufferData) effectiveCap() int64 {
......@@ -144,6 +152,16 @@ func (bd *BufferData) updateSize(no int64) {
bd.size += no
}
// updateTimeRange update BufferData tsFrom, tsTo range according to input time range
func (bd *BufferData) updateTimeRange(tr TimeRange) {
if tr.timestampMin < bd.tsFrom {
bd.tsFrom = tr.timestampMin
}
if tr.timestampMax > bd.tsTo {
bd.tsTo = tr.timestampMax
}
}
func (ibNode *insertBufferNode) Name() string {
return "ibNode-" + ibNode.channelName
}
......@@ -544,8 +562,17 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
// Maybe there are large write zoom if frequent insert requests are met.
buffer.buffer = storage.MergeInsertData(buffer.buffer, addedBuffer)
tsData, err := storage.GetTimestampFromInsertData(addedBuffer)
if err != nil {
log.Warn("no timestamp field found in insert msg", zap.Error(err))
return err
}
// update buffer size
buffer.updateSize(int64(msg.NRows()))
// update timestamp range
buffer.updateTimeRange(ibNode.getTimestampRange(tsData))
metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.GetNodeID()), metrics.InsertLabel).Add(float64(len(msg.RowData)))
// store in buffer
......@@ -557,6 +584,23 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
return nil
}
func (ibNode *insertBufferNode) getTimestampRange(tsData *storage.Int64FieldData) TimeRange {
tr := TimeRange{
timestampMin: math.MaxUint64,
timestampMax: 0,
}
for _, data := range tsData.Data {
if data < int64(tr.timestampMin) {
tr.timestampMin = Timestamp(data)
}
if data > int64(tr.timestampMax) {
tr.timestampMax = Timestamp(data)
}
}
return tr
}
// writeHardTimeTick writes timetick once insertBufferNode operates.
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp, segmentIDs []int64) {
ibNode.ttLogger.LogTs(ts)
......
......@@ -19,6 +19,7 @@ package datanode
import (
"context"
"errors"
"math"
"sync"
"testing"
"time"
......@@ -1030,6 +1031,56 @@ func TestInsertBufferNode_BufferData(te *testing.T) {
assert.Nil(t, idata)
}
})
}
}
func TestInsertBufferNode_BufferData_updateTimeRange(t *testing.T) {
Params.DataNodeCfg.FlushInsertBufferSize = 16 * (1 << 20) // 16 MB
type testCase struct {
tag string
trs []TimeRange
expectFrom Timestamp
expectTo Timestamp
}
cases := []testCase{
{
tag: "no input range",
expectTo: 0,
expectFrom: math.MaxUint64,
},
{
tag: "single range",
trs: []TimeRange{
{timestampMin: 100, timestampMax: 200},
},
expectFrom: 100,
expectTo: 200,
},
{
tag: "multiple range",
trs: []TimeRange{
{timestampMin: 150, timestampMax: 250},
{timestampMin: 100, timestampMax: 200},
{timestampMin: 50, timestampMax: 180},
},
expectFrom: 50,
expectTo: 250,
},
}
for _, tc := range cases {
t.Run(tc.tag, func(t *testing.T) {
bd, err := newBufferData(16)
require.NoError(t, err)
for _, tr := range tc.trs {
bd.updateTimeRange(tr)
}
assert.Equal(t, tc.expectFrom, bd.tsFrom)
assert.Equal(t, tc.expectTo, bd.tsTo)
})
}
}
......@@ -391,8 +391,8 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segStats []by
kvs[key] = blob.Value[:]
field2Insert[fieldID] = &datapb.Binlog{
EntriesNum: data.size,
TimestampFrom: 0, //TODO
TimestampTo: 0, //TODO,
TimestampFrom: data.tsFrom,
TimestampTo: data.tsTo,
LogPath: key,
LogSize: int64(fieldMemorySize[fieldID]),
}
......
......@@ -780,6 +780,24 @@ func GetPkFromInsertData(collSchema *schemapb.CollectionSchema, data *InsertData
return realPfData, nil
}
// GetTimestampFromInsertData returns the Int64FieldData for timestamp field.
func GetTimestampFromInsertData(data *InsertData) (*Int64FieldData, error) {
if data == nil {
return nil, errors.New("try to get timestamp from nil insert data")
}
fieldData, ok := data.Data[common.TimeStampField]
if !ok {
return nil, errors.New("no timestamp field in insert data")
}
ifd, ok := fieldData.(*Int64FieldData)
if !ok {
return nil, errors.New("timestamp field is not Int64")
}
return ifd, nil
}
func boolFieldDataToPbBytes(field *BoolFieldData) ([]byte, error) {
arr := &schemapb.BoolArray{Data: field.Data}
return proto.Marshal(arr)
......
......@@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCheckTsField(t *testing.T) {
......@@ -1069,6 +1070,63 @@ func TestGetPkFromInsertData(t *testing.T) {
assert.Equal(t, []int64{1, 2, 3}, d.(*Int64FieldData).Data)
}
func Test_GetTimestampFromInsertData(t *testing.T) {
type testCase struct {
tag string
data *InsertData
expectError bool
expectData *Int64FieldData
}
cases := []testCase{
{
tag: "nil data",
expectError: true,
},
{
tag: "no timestamp",
expectError: true,
data: &InsertData{
Data: map[FieldID]FieldData{
common.StartOfUserFieldID: &Int64FieldData{Data: []int64{1, 2, 3}},
},
},
},
{
tag: "timestamp wrong type",
expectError: true,
data: &InsertData{
Data: map[FieldID]FieldData{
common.TimeStampField: &Int32FieldData{Data: []int32{1, 2, 3}},
},
},
},
{
tag: "normal insert data",
data: &InsertData{
Data: map[FieldID]FieldData{
common.TimeStampField: &Int64FieldData{Data: []int64{1, 2, 3}},
common.StartOfUserFieldID: &Int32FieldData{Data: []int32{1, 2, 3}},
},
},
expectData: &Int64FieldData{Data: []int64{1, 2, 3}},
},
}
for _, tc := range cases {
t.Run(tc.tag, func(t *testing.T) {
result, err := GetTimestampFromInsertData(tc.data)
if tc.expectError {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tc.expectData, result)
}
})
}
}
func Test_boolFieldDataToBytes(t *testing.T) {
field := &BoolFieldData{Data: []bool{true, false}}
bs, err := boolFieldDataToPbBytes(field)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册