未验证 提交 fc15789d 编写于 作者: B bigsheeper 提交者: GitHub

Ensure compatibility of channel seek position and move syncPeriod to config (#20504)

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 c21497fb
......@@ -285,11 +285,13 @@ dataNode:
flowGraph:
maxQueueLength: 1024 # Maximum length of task queue in flowgraph
maxParallelism: 1024 # Maximum number of tasks executed in parallel in the flowgraph
flush:
segment:
# Max buffer size to flush for a single segment.
insertBufSize: 16777216 # Bytes, 16 MB
# Max buffer size to flush del for a single channel
deleteBufBytes: 67108864 # Bytes, 64MB
# The period to sync segments if buffer is not empty.
syncPeriod: 600 # Seconds, 10min
# Configures the system log output.
......
......@@ -756,7 +756,7 @@ func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error {
return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error())
}
if err := c.h.FinishDropChannel(channelName); err != nil {
return fmt.Errorf("FinishDropChannel failed, err=%s", err)
return fmt.Errorf("FinishDropChannel failed, err=%w", err)
}
log.Info("removed channel assignment", zap.String("channel name", channelName))
return nil
......@@ -805,7 +805,7 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
log.Info("try to cleanup removal flag ", zap.String("channel name", channelName))
if err := c.h.FinishDropChannel(channelName); err != nil {
return fmt.Errorf("FinishDropChannel failed, err=%s", err)
return fmt.Errorf("FinishDropChannel failed, err=%w", err)
}
log.Info("removed channel assignment", zap.Any("channel", chToCleanUp))
......
......@@ -88,7 +88,7 @@ func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID Uniq
return &datapb.VchannelInfo{
CollectionID: channel.CollectionID,
ChannelName: channel.Name,
SeekPosition: h.getChannelCheckpoint(channel),
SeekPosition: h.GetChannelSeekPosition(channel, partitionID),
FlushedSegmentIds: flushedIDs.Collect(),
UnflushedSegmentIds: unflushedIDs.Collect(),
DroppedSegmentIds: droppedIDs.Collect(),
......@@ -151,44 +151,119 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni
return &datapb.VchannelInfo{
CollectionID: channel.CollectionID,
ChannelName: channel.Name,
SeekPosition: h.getChannelCheckpoint(channel),
SeekPosition: h.GetChannelSeekPosition(channel, partitionID),
FlushedSegmentIds: indexedIDs.Collect(),
UnflushedSegmentIds: unIndexedIDs.Collect(),
DroppedSegmentIds: droppedIDs.Collect(),
}
}
func (h *ServerHandler) getChannelCheckpoint(channel *channel) *internalpb.MsgPosition {
seekPosition := h.s.meta.GetChannelCheckpoint(channel.Name)
if seekPosition != nil {
log.Info("channel seek position set from ChannelCP",
// getEarliestSegmentDMLPos returns the earliest dml position of segments,
// this is mainly for COMPATIBILITY with old version <=2.1.x
func (h *ServerHandler) getEarliestSegmentDMLPos(channel *channel, partitionID UniqueID) *internalpb.MsgPosition {
var minPos *internalpb.MsgPosition
var minPosSegID int64
var minPosTs uint64
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
return s.InsertChannel == channel.Name
})
for _, s := range segments {
if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
continue
}
if s.GetIsImporting() {
// Skip bulk insert segments.
continue
}
if s.GetState() == commonpb.SegmentState_Dropped {
continue
}
var segmentPosition *internalpb.MsgPosition
if s.GetDmlPosition() != nil {
segmentPosition = s.GetDmlPosition()
} else {
segmentPosition = s.GetStartPosition()
}
if minPos == nil || segmentPosition.Timestamp < minPos.Timestamp {
minPosSegID = s.GetID()
minPosTs = segmentPosition.GetTimestamp()
minPos = segmentPosition
}
}
if minPos != nil {
log.Info("getEarliestSegmentDMLPos done",
zap.Int64("segment ID", minPosSegID),
zap.Uint64("posTs", minPosTs),
zap.Time("posTime", tsoutil.PhysicalTime(minPosTs)))
}
return minPos
}
// getCollectionStartPos returns collection start position.
func (h *ServerHandler) getCollectionStartPos(channel *channel) *internalpb.MsgPosition {
// use collection start position when segment position is not found
var startPosition *internalpb.MsgPosition
if channel.StartPositions == nil {
collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
if collection != nil && err == nil {
startPosition = getCollectionStartPosition(channel.Name, collection)
}
log.Info("NEITHER segment position or channel start position are found, setting channel seek position to collection start position",
zap.String("channel", channel.Name),
zap.Uint64("position timestamp", seekPosition.Timestamp),
zap.Time("realworld position timestamp", tsoutil.PhysicalTime(seekPosition.GetTimestamp())),
zap.Uint64("posTs", startPosition.GetTimestamp()),
zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
)
} else {
// use collection start position when segment position is not found
if channel.StartPositions == nil {
collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
if collection != nil && err == nil {
seekPosition = getCollectionStartPosition(channel.Name, collection)
}
log.Info("NEITHER segment position or channel start position are found, setting channel seek position to collection start position",
zap.String("channel", channel.Name),
zap.Uint64("position timestamp", seekPosition.GetTimestamp()),
zap.Time("realworld position timestamp", tsoutil.PhysicalTime(seekPosition.GetTimestamp())),
)
} else {
// use passed start positions, skip to ask RootCoord.
seekPosition = toMsgPosition(channel.Name, channel.StartPositions)
log.Info("segment position not found, setting channel seek position to channel start position",
zap.String("channel", channel.Name),
zap.Uint64("position timestamp", seekPosition.GetTimestamp()),
zap.Time("realworld position timestamp", tsoutil.PhysicalTime(seekPosition.GetTimestamp())),
)
}
// use passed start positions, skip to ask RootCoord.
startPosition = toMsgPosition(channel.Name, channel.StartPositions)
log.Info("segment position not found, setting channel seek position to channel start position",
zap.String("channel", channel.Name),
zap.Uint64("posTs", startPosition.GetTimestamp()),
zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
)
}
return startPosition
}
// GetChannelSeekPosition gets channel seek position from:
// 1. Channel checkpoint meta;
// 2. Segments earliest dml position;
// 3. Collection start position;
// And would return if any position is valid.
func (h *ServerHandler) GetChannelSeekPosition(channel *channel, partitionID UniqueID) *internalpb.MsgPosition {
var seekPosition *internalpb.MsgPosition
seekPosition = h.s.meta.GetChannelCheckpoint(channel.Name)
if seekPosition != nil {
log.Info("channel seek position set from channel checkpoint meta",
zap.String("channel", channel.Name),
zap.Uint64("posTs", seekPosition.Timestamp),
zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
return seekPosition
}
seekPosition = h.getEarliestSegmentDMLPos(channel, partitionID)
if seekPosition != nil {
log.Info("channel seek position set from earliest segment dml position",
zap.String("channel", channel.Name),
zap.Uint64("posTs", seekPosition.Timestamp),
zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
return seekPosition
}
return seekPosition
seekPosition = h.getCollectionStartPos(channel)
if seekPosition != nil {
log.Info("channel seek position set from collection start position",
zap.String("channel", channel.Name),
zap.Uint64("posTs", seekPosition.Timestamp),
zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
return seekPosition
}
log.Warn("get channel checkpoint failed, channelCPMeta and earliestSegDMLPos and collStartPos are all invalid",
zap.String("channel", channel.Name))
return nil
}
func getCollectionStartPosition(channel string, collectionInfo *collectionInfo) *internalpb.MsgPosition {
......
......@@ -1139,13 +1139,13 @@ func (m *meta) GetCompactionTo(segmentID int64) *SegmentInfo {
// UpdateChannelCheckpoint updates and saves channel checkpoint.
func (m *meta) UpdateChannelCheckpoint(vChannel string, pos *internalpb.MsgPosition) error {
m.Lock()
defer m.Unlock()
if pos == nil {
return fmt.Errorf("channelCP is nil, vChannel=%s", vChannel)
}
m.Lock()
defer m.Unlock()
oldPosition, ok := m.channelCPs[vChannel]
if !ok || oldPosition.Timestamp < pos.Timestamp {
err := m.catalog.SaveChannelCheckpoint(m.ctx, vChannel, pos)
......
......@@ -1646,61 +1646,98 @@ func TestDataNodeTtChannel(t *testing.T) {
})
}
func TestGetChannelCheckpoint(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&collectionInfo{
ID: 0,
Schema: schema,
StartPositions: []*commonpb.KeyDataPair{
{
Key: "ch1",
Data: []byte{8, 9, 10},
},
func TestGetChannelSeekPosition(t *testing.T) {
startPos1 := []*commonpb.KeyDataPair{
{
Key: "ch1",
Data: []byte{1, 2, 3},
},
})
svr.meta.AddCollection(&collectionInfo{
ID: 1,
Schema: schema,
StartPositions: []*commonpb.KeyDataPair{
{
Key: "ch0",
Data: []byte{11, 12, 13},
},
}
startPosNonExist := []*commonpb.KeyDataPair{
{
Key: "ch2",
Data: []byte{4, 5, 6},
},
})
t.Run("get non-existent channel", func(t *testing.T) {
channelCP := svr.handler.(*ServerHandler).getChannelCheckpoint(&channel{Name: "chx1", CollectionID: 0})
assert.Nil(t, channelCP)
})
t.Run("get no channelCP in meta", func(t *testing.T) {
channelCP := svr.handler.(*ServerHandler).getChannelCheckpoint(&channel{Name: "ch1", CollectionID: 0})
assert.NotNil(t, channelCP)
assert.EqualValues(t, []byte{8, 9, 10}, channelCP.GetMsgID())
channelCP = svr.handler.(*ServerHandler).getChannelCheckpoint(&channel{Name: "ch0", CollectionID: 1})
assert.NotNil(t, channelCP)
assert.EqualValues(t, []byte{11, 12, 13}, channelCP.GetMsgID())
})
}
t.Run("empty collection", func(t *testing.T) {
channelCP := svr.handler.(*ServerHandler).getChannelCheckpoint(&channel{Name: "ch0_suffix", CollectionID: 2})
assert.Nil(t, channelCP)
})
tests := []struct {
testName string
channelCP *internalpb.MsgPosition
segDMLPos []*internalpb.MsgPosition
collStartPos []*commonpb.KeyDataPair
channelName string
expectedPos *internalpb.MsgPosition
}{
{"test-with-channelCP",
&internalpb.MsgPosition{ChannelName: "ch1", Timestamp: 100},
[]*internalpb.MsgPosition{{ChannelName: "ch1", Timestamp: 50}, {ChannelName: "ch1", Timestamp: 200}},
startPos1,
"ch1", &internalpb.MsgPosition{ChannelName: "ch1", Timestamp: 100}},
{"test-with-segmentDMLPos",
nil,
[]*internalpb.MsgPosition{{ChannelName: "ch1", Timestamp: 50}, {ChannelName: "ch1", Timestamp: 200}},
startPos1,
"ch1", &internalpb.MsgPosition{ChannelName: "ch1", Timestamp: 50}},
{"test-with-collStartPos",
nil,
nil,
startPos1,
"ch1", &internalpb.MsgPosition{ChannelName: "ch1", MsgID: startPos1[0].Data}},
{"test-non-exist-channel-1",
nil,
nil,
startPosNonExist,
"ch1", nil},
{"test-non-exist-channel-2",
nil,
nil,
nil,
"ch1", nil},
}
for _, test := range tests {
t.Run(test.testName, func(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
schema := newTestSchema()
if test.collStartPos != nil {
svr.meta.AddCollection(&collectionInfo{
ID: 0,
Schema: schema,
StartPositions: test.collStartPos,
})
}
for i, segPos := range test.segDMLPos {
seg := &datapb.SegmentInfo{
ID: UniqueID(i),
CollectionID: 0,
PartitionID: 0,
DmlPosition: segPos,
InsertChannel: "ch1",
}
err := svr.meta.AddSegment(NewSegmentInfo(seg))
assert.NoError(t, err)
}
if test.channelCP != nil {
err := svr.meta.UpdateChannelCheckpoint(test.channelCP.ChannelName, test.channelCP)
assert.NoError(t, err)
}
t.Run("with channel cp", func(t *testing.T) {
err := svr.meta.UpdateChannelCheckpoint("ch1", &internalpb.MsgPosition{
ChannelName: "ch1",
Timestamp: 100,
seekPos := svr.handler.(*ServerHandler).GetChannelSeekPosition(&channel{
Name: test.channelName,
CollectionID: 0}, allPartitionID)
if test.expectedPos == nil {
assert.True(t, seekPos == nil)
} else {
assert.Equal(t, test.expectedPos.ChannelName, seekPos.ChannelName)
assert.Equal(t, test.expectedPos.Timestamp, seekPos.Timestamp)
assert.ElementsMatch(t, test.expectedPos.MsgID, seekPos.MsgID)
}
})
assert.NoError(t, err)
channelCP := svr.handler.(*ServerHandler).getChannelCheckpoint(&channel{Name: "ch1", CollectionID: 1})
assert.NotNil(t, channelCP)
assert.True(t, channelCP.ChannelName == "ch1")
assert.True(t, channelCP.Timestamp == 100)
})
}
}
func TestGetDataVChanPositions(t *testing.T) {
......
......@@ -520,7 +520,7 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, startP
if !loaded {
buffer, err = newBufferData(collSchema)
if err != nil {
return fmt.Errorf("newBufferData failed, segment=%d, channel=%s, err=%s", currentSegID, ibNode.channelName, err)
return fmt.Errorf("newBufferData failed, segment=%d, channel=%s, err=%w", currentSegID, ibNode.channelName, err)
}
}
......
......@@ -59,7 +59,7 @@ func (ttn *ttNode) Name() string {
// Operate handles input messages, implementing flowgraph.Node
func (ttn *ttNode) Operate(in []Msg) []Msg {
if in == nil {
log.Debug("type assertion failed for flowGraphMsg because it's nil")
log.Warn("type assertion failed for flowGraphMsg because it's nil")
return []Msg{}
}
......
......@@ -134,7 +134,7 @@ func (s *Segment) evictHistoryInsertBuffer(endPos *internalpb.MsgPosition) {
}
s.historyInsertBuf = tmpBuffers
ts, _ := tsoutil.ParseTS(endPos.Timestamp)
log.Debug("evictHistoryInsertBuffer done", zap.Int64("segmentID", s.segmentID), zap.Time("ts", ts), zap.String("channel", endPos.ChannelName))
log.Info("evictHistoryInsertBuffer done", zap.Int64("segmentID", s.segmentID), zap.Time("ts", ts), zap.String("channel", endPos.ChannelName))
}
// rollDeleteBuffer moves curDeleteBuf to historyDeleteBuf, and then sets curDeleteBuf to nil.
......@@ -157,7 +157,7 @@ func (s *Segment) evictHistoryDeleteBuffer(endPos *internalpb.MsgPosition) {
}
s.historyDeleteBuf = tmpBuffers
ts, _ := tsoutil.ParseTS(endPos.Timestamp)
log.Debug("evictHistoryDeleteBuffer done", zap.Int64("segmentID", s.segmentID), zap.Time("ts", ts), zap.String("channel", endPos.ChannelName))
log.Info("evictHistoryDeleteBuffer done", zap.Int64("segmentID", s.segmentID), zap.Time("ts", ts), zap.String("channel", endPos.ChannelName))
}
func (s *Segment) isBufferEmpty() bool {
......
......@@ -17,15 +17,9 @@
package datanode
import (
"time"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
const (
syncPeriod = 10 * time.Minute // TODO: move to config?
)
// segmentSyncPolicy sync policy applies to segment
type segmentSyncPolicy func(segment *Segment, ts Timestamp) bool
......@@ -34,6 +28,7 @@ func syncPeriodically() segmentSyncPolicy {
return func(segment *Segment, ts Timestamp) bool {
endTime := tsoutil.PhysicalTime(ts)
lastSyncTime := tsoutil.PhysicalTime(segment.lastSyncTs)
return endTime.Sub(lastSyncTime) >= syncPeriod && !segment.isBufferEmpty()
return endTime.Sub(lastSyncTime) >= Params.DataNodeCfg.SyncPeriod &&
!segment.isBufferEmpty()
}
}
......@@ -35,10 +35,10 @@ func TestSyncPeriodically(t *testing.T) {
isBufferEmpty bool
shouldSync bool
}{
{"test buffer empty and stale", t0, t0.Add(syncPeriod), true, false},
{"test buffer empty and not stale", t0, t0.Add(syncPeriod / 2), true, false},
{"test buffer not empty and stale", t0, t0.Add(syncPeriod), false, true},
{"test buffer not empty and not stale", t0, t0.Add(syncPeriod / 2), false, false},
{"test buffer empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod), true, false},
{"test buffer empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod / 2), true, false},
{"test buffer not empty and stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod), false, true},
{"test buffer not empty and not stale", t0, t0.Add(Params.DataNodeCfg.SyncPeriod / 2), false, false},
}
for _, test := range tests {
......
......@@ -1338,8 +1338,11 @@ type dataNodeConfig struct {
FlowGraphMaxQueueLength int32
FlowGraphMaxParallelism int32
FlushInsertBufferSize int64
FlushDeleteBufferBytes int64
// segment
FlushInsertBufferSize int64
FlushDeleteBufferBytes int64
SyncPeriod time.Duration
Alias string // Different datanode in one machine
......@@ -1359,6 +1362,7 @@ func (p *dataNodeConfig) init(base *BaseTable) {
p.initFlowGraphMaxParallelism()
p.initFlushInsertBufferSize()
p.initFlushDeleteBufferSize()
p.initSyncPeriod()
p.initIOConcurrency()
p.initChannelWatchPath()
......@@ -1378,7 +1382,7 @@ func (p *dataNodeConfig) initFlowGraphMaxParallelism() {
}
func (p *dataNodeConfig) initFlushInsertBufferSize() {
bufferSize := p.Base.LoadWithDefault2([]string{"DATA_NODE_IBUFSIZE", "datanode.flush.insertBufSize"}, "0")
bufferSize := p.Base.LoadWithDefault2([]string{"DATA_NODE_IBUFSIZE", "datanode.segment.insertBufSize"}, "0")
bs, err := strconv.ParseInt(bufferSize, 10, 64)
if err != nil {
panic(err)
......@@ -1387,11 +1391,16 @@ func (p *dataNodeConfig) initFlushInsertBufferSize() {
}
func (p *dataNodeConfig) initFlushDeleteBufferSize() {
deleteBufBytes := p.Base.ParseInt64WithDefault("datanode.flush.deleteBufBytes",
deleteBufBytes := p.Base.ParseInt64WithDefault("datanode.segment.deleteBufBytes",
64*1024*1024)
p.FlushDeleteBufferBytes = deleteBufBytes
}
func (p *dataNodeConfig) initSyncPeriod() {
syncPeriodInSeconds := p.Base.ParseInt64WithDefault("datanode.segment.syncPeriod", 600)
p.SyncPeriod = time.Duration(syncPeriodInSeconds) * time.Second
}
func (p *dataNodeConfig) initChannelWatchPath() {
p.ChannelWatchSubPath = "channelwatch"
}
......
......@@ -321,6 +321,10 @@ func TestComponentParam(t *testing.T) {
size := Params.FlushInsertBufferSize
t.Logf("FlushInsertBufferSize: %d", size)
period := Params.SyncPeriod
t.Logf("SyncPeriod: %v", period)
assert.Equal(t, 10*time.Minute, Params.SyncPeriod)
Params.CreatedTime = time.Now()
t.Logf("CreatedTime: %v", Params.CreatedTime)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册