未验证 提交 9551dc3b 编写于 作者: S sunby 提交者: GitHub

Fix segment auto flush bug (#7085)

issue: #7084
Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 e771bda9
// Copyright (C) 2019-2020 Zilliz. All rights reserved.//// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
......@@ -77,6 +79,7 @@ type Server struct {
serverLoopCancel context.CancelFunc
serverLoopWg sync.WaitGroup
isServing ServerState
helper ServerHelper
kvClient *etcdkv.EtcdKV
meta *meta
......@@ -97,6 +100,16 @@ type Server struct {
rootCoordClientCreator rootCoordCreatorFunc
}
type ServerHelper struct {
eventAfterHandleDataNodeTt func()
}
func defaultServerHelper() ServerHelper {
return ServerHelper{
eventAfterHandleDataNodeTt: func() {},
}
}
type Option func(svr *Server)
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
......@@ -105,6 +118,12 @@ func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
}
}
func SetServerHelper(helper ServerHelper) Option {
return func(svr *Server) {
svr.helper = helper
}
}
// CreateServer create `Server` instance
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
rand.Seed(time.Now().UnixNano())
......@@ -114,6 +133,7 @@ func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option
flushCh: make(chan UniqueID, 1024),
dataClientCreator: defaultDataNodeCreatorFunc,
rootCoordClientCreator: defaultRootCoordCreatorFunc,
helper: defaultServerHelper(),
}
for _, opt := range opts {
......@@ -352,6 +372,7 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
ch := ttMsg.ChannelName
ts := ttMsg.Timestamp
s.segmentManager.ExpireAllocations(ch, ts)
segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
if err != nil {
log.Warn("get flushable segments failed", zap.Error(err))
......@@ -375,8 +396,8 @@ func (s *Server) startDataNodeTtLoop(ctx context.Context) {
if len(segmentInfos) > 0 {
s.cluster.Flush(segmentInfos)
}
s.segmentManager.ExpireAllocations(ch, ts)
}
s.helper.eventAfterHandleDataNodeTt()
}
}
......
......@@ -11,13 +11,12 @@ package datacoord
import (
"context"
"math/rand"
"path"
"strconv"
"testing"
"time"
"math/rand"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
......@@ -29,6 +28,10 @@ import (
"go.etcd.io/etcd/clientv3"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func TestGetSegmentInfoChannel(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
......@@ -622,6 +625,70 @@ func TestDataNodeTtChannel(t *testing.T) {
assert.EqualValues(t, 1, len(flushReq.SegmentIDs))
assert.EqualValues(t, assign.SegID, flushReq.SegmentIDs[0])
})
t.Run("test expire allocation after receiving tt msg", func(t *testing.T) {
ch := make(chan interface{}, 1)
helper := ServerHelper{
eventAfterHandleDataNodeTt: func() { ch <- struct{}{} },
}
svr := newTestServer(t, nil, SetServerHelper(helper))
defer closeTestServer(t, svr)
svr.meta.AddCollection(&datapb.CollectionInfo{
ID: 0,
Schema: newTestSchema(),
Partitions: []int64{0},
})
ttMsgStream, err := svr.msFactory.NewMsgStream(context.TODO())
assert.Nil(t, err)
ttMsgStream.AsProducer([]string{Params.TimeTickChannelName})
ttMsgStream.Start()
defer ttMsgStream.Close()
info := &datapb.DataNodeInfo{
Address: "localhost:7777",
Version: 0,
Channels: []*datapb.ChannelStatus{
{
Name: "ch-1",
State: datapb.ChannelWatchState_Complete,
},
},
}
node := NewNodeInfo(context.TODO(), info)
node.client, err = newMockDataNodeClient(1, ch)
assert.Nil(t, err)
svr.cluster.Register(node)
resp, err := svr.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{
NodeID: 0,
PeerRole: "",
SegmentIDRequests: []*datapb.SegmentIDRequest{
{
CollectionID: 0,
PartitionID: 0,
ChannelName: "ch-1",
Count: 100,
},
},
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.EqualValues(t, 1, len(resp.SegIDAssignments))
assignedSegmentID := resp.SegIDAssignments[0].SegID
segment := svr.meta.GetSegment(assignedSegmentID)
assert.EqualValues(t, 1, len(segment.allocations))
msgPack := msgstream.MsgPack{}
msg := genMsg(commonpb.MsgType_DataNodeTt, "ch-1", resp.SegIDAssignments[0].ExpireTime)
msgPack.Msgs = append(msgPack.Msgs, msg)
ttMsgStream.Produce(&msgPack)
<-ch
segment = svr.meta.GetSegment(assignedSegmentID)
assert.EqualValues(t, 0, len(segment.allocations))
})
}
func TestGetVChannelPos(t *testing.T) {
......@@ -816,10 +883,10 @@ func TestGetRecoveryInfo(t *testing.T) {
})
}
func newTestServer(t *testing.T, receiveCh chan interface{}) *Server {
func newTestServer(t *testing.T, receiveCh chan interface{}, opts ...Option) *Server {
Params.Init()
Params.TimeTickChannelName += strconv.Itoa(rand.Int())
Params.StatisticsChannelName += strconv.Itoa(rand.Int())
Params.TimeTickChannelName = strconv.Itoa(rand.Int())
Params.StatisticsChannelName = strconv.Itoa(rand.Int())
var err error
factory := msgstream.NewPmsFactory()
m := map[string]interface{}{
......@@ -836,7 +903,7 @@ func newTestServer(t *testing.T, receiveCh chan interface{}) *Server {
_, err = etcdCli.Delete(context.Background(), sessKey, clientv3.WithPrefix())
assert.Nil(t, err)
svr, err := CreateServer(context.TODO(), factory)
svr, err := CreateServer(context.TODO(), factory, opts...)
assert.Nil(t, err)
svr.dataClientCreator = func(ctx context.Context, addr string) (types.DataNode, error) {
return newMockDataNodeClient(0, receiveCh)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册