提交 7270d18c 编写于 作者: S sunby 提交者: yefu.chen

Fix timesync startup bugs

Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 aaac8390
......@@ -56,12 +56,11 @@ type (
}
DataNode struct {
ctx context.Context
cancel context.CancelFunc
NodeID UniqueID
Role string
State internalpb2.StateCode
watchDm chan struct{}
ctx context.Context
cancel context.CancelFunc
NodeID UniqueID
Role string
State internalpb2.StateCode
dataSyncService *dataSyncService
metaService *metaService
......@@ -82,13 +81,11 @@ func NewDataNode(ctx context.Context) *DataNode {
Params.Init()
ctx2, cancel2 := context.WithCancel(ctx)
node := &DataNode{
ctx: ctx2,
cancel: cancel2,
NodeID: Params.NodeID, // GOOSE TODO: How to init
Role: typeutil.DataNodeRole,
State: internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic
watchDm: make(chan struct{}),
ctx: ctx2,
cancel: cancel2,
NodeID: Params.NodeID, // GOOSE TODO: How to init
Role: typeutil.DataNodeRole,
State: internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic
dataSyncService: nil,
metaService: nil,
masterService: nil,
......@@ -138,13 +135,6 @@ func (node *DataNode) Init() error {
return errors.Errorf("Register node failed: %v", err)
}
select {
case <-time.After(RPCConnectionTimeout):
return errors.New("Get DmChannels failed in 30 seconds")
case <-node.watchDm:
log.Println("insert channel names set")
}
for _, kv := range resp.InitParams.StartParams {
switch kv.Key {
case "DDChannelName":
......@@ -172,10 +162,10 @@ func (node *DataNode) Init() error {
node.flushChan = make(chan *flushMsg, chanSize)
node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc)
node.dataSyncService.init()
node.metaService = newMetaService(node.ctx, replica, node.masterService)
node.replica = replica
node.dataSyncService.initNodes()
// --- Opentracing ---
cfg := &config.Configuration{
......@@ -201,9 +191,14 @@ func (node *DataNode) Init() error {
func (node *DataNode) Start() error {
node.metaService.init()
return nil
}
// DataNode is HEALTHY until StartSync() is called
func (node *DataNode) StartSync() {
node.dataSyncService.init()
go node.dataSyncService.start()
node.State = internalpb2.StateCode_HEALTHY
return nil
}
func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
......@@ -224,7 +219,7 @@ func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*common
default:
Params.InsertChannelNames = in.GetChannelNames()
status.ErrorCode = commonpb.ErrorCode_SUCCESS
node.watchDm <- struct{}{}
node.StartSync()
return status, nil
}
}
......
......@@ -94,6 +94,7 @@ type (
ddChannelName string
segmentInfoStream msgstream.MsgStream
insertChannels []string
ttBarrier timesync.TimeTickBarrier
}
)
......@@ -177,23 +178,23 @@ func (s *Server) initSegmentInfoChannel() {
s.segmentInfoStream.Start()
}
func (s *Server) initMsgProducer() error {
var err error
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
ttMsgStream, _ := factory.NewMsgStream(s.ctx)
ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
s.ttMsgStream = ttMsgStream
if s.ttMsgStream, err = factory.NewMsgStream(s.ctx); err != nil {
return err
}
s.ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
s.ttMsgStream.Start()
timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs())
dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
k2sStream, _ := factory.NewMsgStream(s.ctx)
k2sStream.AsProducer(Params.K2SChannelNames)
s.k2sMsgStream = k2sStream
if s.k2sMsgStream, err = factory.NewMsgStream(s.ctx); err != nil {
return err
}
s.k2sMsgStream.AsProducer(Params.K2SChannelNames)
s.k2sMsgStream.Start()
dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream)
producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier, dataNodeTTWatcher, k2sMsgWatcher)
if err != nil {
if s.msgProducer, err = timesync.NewTimeSyncMsgProducer(s.ttBarrier, dataNodeTTWatcher, k2sMsgWatcher); err != nil {
return err
}
s.msgProducer = producer
s.msgProducer.Start(s.ctx)
return nil
}
......@@ -297,10 +298,11 @@ func (s *Server) checkMasterIsHealthy() error {
func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(3)
s.serverLoopWg.Add(4)
go s.startStatsChannel(s.serverLoopCtx)
go s.startSegmentFlushChannel(s.serverLoopCtx)
go s.startDDChannel(s.serverLoopCtx)
go s.startTTBarrier(s.serverLoopCtx)
}
func (s *Server) startStatsChannel(ctx context.Context) {
......@@ -388,6 +390,12 @@ func (s *Server) startDDChannel(ctx context.Context) {
}
}
func (s *Server) startTTBarrier(ctx context.Context) {
defer s.serverLoopWg.Done()
s.ttBarrier = timesync.NewHardTimeTickBarrier(ctx, s.ttMsgStream, s.cluster.GetNodeIDs())
s.ttBarrier.StartBackgroundLoop()
}
func (s *Server) waitDataNodeRegister() {
log.Println("waiting data node to register")
<-s.registerFinishCh
......
......@@ -76,6 +76,7 @@ func (s *Server) Start() error {
func (s *Server) Stop() error {
err := s.core.Stop()
s.cancel()
s.grpcServer.GracefulStop()
return err
}
......
......@@ -35,7 +35,7 @@ type Server struct {
func (s *Server) Init() error {
log.Println("query service init")
if err := s.queryService.Init(); err != nil {
return err
panic(err)
}
s.queryService.SetEnableGrpc(true)
return nil
......
......@@ -124,7 +124,7 @@ message LoadSegmentRequest {
int64 partitionID = 4;
repeated int64 segmentIDs = 5;
repeated int64 fieldIDs = 6;
repeated data.SegmentStateInfo segment_states = 7;
data.SegmentStateInfo last_segment_state = 7;
}
message ReleaseSegmentRequest {
......
......@@ -972,16 +972,16 @@ func (m *WatchDmChannelsRequest) GetChannelIDs() []string {
}
type LoadSegmentRequest struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"`
CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
SegmentIDs []int64 `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"`
FieldIDs []int64 `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"`
SegmentStates []*datapb.SegmentStateInfo `protobuf:"bytes,7,rep,name=segment_states,json=segmentStates,proto3" json:"segment_states,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"`
CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
SegmentIDs []int64 `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"`
FieldIDs []int64 `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"`
LastSegmentState *datapb.SegmentStateInfo `protobuf:"bytes,7,opt,name=last_segment_state,json=lastSegmentState,proto3" json:"last_segment_state,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *LoadSegmentRequest) Reset() { *m = LoadSegmentRequest{} }
......@@ -1051,9 +1051,9 @@ func (m *LoadSegmentRequest) GetFieldIDs() []int64 {
return nil
}
func (m *LoadSegmentRequest) GetSegmentStates() []*datapb.SegmentStateInfo {
func (m *LoadSegmentRequest) GetLastSegmentState() *datapb.SegmentStateInfo {
if m != nil {
return m.SegmentStates
return m.LastSegmentState
}
return nil
}
......@@ -1395,90 +1395,91 @@ func init() {
func init() { proto.RegisterFile("query_service.proto", fileDescriptor_5fcb6756dc1afb8d) }
var fileDescriptor_5fcb6756dc1afb8d = []byte{
// 1324 bytes of a gzipped FileDescriptorProto
// 1331 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0x5f, 0x73, 0xdb, 0x44,
0x10, 0xb7, 0x6c, 0xc7, 0xa9, 0x37, 0xae, 0xed, 0x5e, 0xfe, 0x19, 0x51, 0x4a, 0x39, 0xa0, 0x4d,
0x5b, 0x70, 0x98, 0x74, 0x60, 0x78, 0x82, 0x69, 0xe3, 0x4e, 0xc6, 0x0c, 0x0d, 0x41, 0x4e, 0xa7,
0x43, 0xa0, 0x63, 0x64, 0xe9, 0xe2, 0x5c, 0x6b, 0x49, 0xae, 0xee, 0x9c, 0x34, 0x79, 0x01, 0x66,
0x78, 0x64, 0xe0, 0x33, 0x30, 0x30, 0xc0, 0xf0, 0x81, 0x78, 0xe1, 0x05, 0xbe, 0x09, 0xa3, 0x93,
0xac, 0x48, 0xf2, 0x39, 0x72, 0xea, 0xa6, 0xe1, 0x4d, 0x77, 0xda, 0xdb, 0xdf, 0xee, 0x6f, 0xf7,
0xf6, 0x76, 0x61, 0xfe, 0xe9, 0x80, 0xb8, 0x87, 0x6d, 0x46, 0xdc, 0x7d, 0x6a, 0x90, 0x7a, 0xdf,
0x75, 0xb8, 0x83, 0x90, 0x45, 0x7b, 0xfb, 0x03, 0xe6, 0xaf, 0xea, 0x42, 0x42, 0x2d, 0x19, 0x8e,
0x65, 0x39, 0xb6, 0xbf, 0xa7, 0x96, 0xa2, 0x12, 0x6a, 0x99, 0xda, 0x9c, 0xb8, 0xb6, 0xde, 0x0b,
0xd6, 0xc8, 0xd4, 0xb9, 0x1e, 0xd7, 0x89, 0xbf, 0x81, 0x79, 0x8d, 0x74, 0x29, 0xe3, 0xc4, 0xdd,
0x74, 0x4c, 0xa2, 0x91, 0xa7, 0x03, 0xc2, 0x38, 0x7a, 0x0f, 0xf2, 0x1d, 0x9d, 0x91, 0x9a, 0x72,
0x55, 0x59, 0x99, 0x5b, 0xbb, 0x5c, 0x8f, 0x21, 0x07, 0x90, 0xf7, 0x59, 0xf7, 0xae, 0xce, 0x88,
0x26, 0x24, 0xd1, 0x07, 0x30, 0xab, 0x9b, 0xa6, 0x4b, 0x18, 0xab, 0x65, 0x4f, 0x38, 0x74, 0xc7,
0x97, 0xd1, 0x86, 0xc2, 0xf8, 0x27, 0x05, 0x16, 0xe2, 0x16, 0xb0, 0xbe, 0x63, 0x33, 0x82, 0x6e,
0x43, 0x81, 0x71, 0x9d, 0x0f, 0x58, 0x60, 0xc4, 0xab, 0x52, 0x7d, 0x2d, 0x21, 0xa2, 0x05, 0xa2,
0xe8, 0x2e, 0xcc, 0x51, 0x9b, 0xf2, 0x76, 0x5f, 0x77, 0x75, 0x6b, 0x68, 0xc9, 0x1b, 0xf1, 0x93,
0x21, 0x2b, 0x4d, 0x9b, 0xf2, 0x2d, 0x21, 0xa8, 0x01, 0x0d, 0xbf, 0xf1, 0x23, 0x58, 0x6c, 0xed,
0x39, 0x07, 0xeb, 0x4e, 0xaf, 0x47, 0x0c, 0x4e, 0x1d, 0xfb, 0xf9, 0x49, 0x41, 0x90, 0x37, 0x3b,
0xcd, 0x86, 0xb0, 0x23, 0xa7, 0x89, 0x6f, 0xcc, 0x60, 0x29, 0xa9, 0x7e, 0x1a, 0x8f, 0xdf, 0x82,
0x8b, 0x46, 0xa8, 0xaa, 0xd9, 0xf0, 0x7c, 0xce, 0xad, 0xe4, 0xb4, 0xf8, 0x26, 0xfe, 0x4e, 0x81,
0xc5, 0x4f, 0x1d, 0xdd, 0x3c, 0x23, 0xa7, 0x10, 0x86, 0x52, 0x14, 0xb0, 0x96, 0x13, 0xff, 0x62,
0x7b, 0xf8, 0x7b, 0x05, 0x6a, 0x1a, 0xe9, 0x11, 0x9d, 0x91, 0xf3, 0x34, 0xe3, 0x5b, 0x05, 0x16,
0xbc, 0x00, 0x6c, 0xe9, 0x2e, 0xa7, 0xe7, 0x63, 0x42, 0xdf, 0xcf, 0xb0, 0x88, 0x05, 0xd3, 0x64,
0x00, 0x86, 0x52, 0x7f, 0xa8, 0xe9, 0x38, 0x01, 0x62, 0x7b, 0xd8, 0x82, 0x4a, 0x88, 0xe6, 0x1d,
0x27, 0x0c, 0x5d, 0x85, 0xb9, 0x88, 0x88, 0x00, 0xcc, 0x69, 0xd1, 0x2d, 0xf4, 0x21, 0xcc, 0x78,
0x10, 0x44, 0xf8, 0x57, 0x5e, 0xc3, 0xf5, 0xd1, 0xfa, 0x53, 0x8f, 0x6b, 0xd5, 0xfc, 0x03, 0xf8,
0x37, 0x05, 0x96, 0x12, 0x78, 0x2f, 0x9d, 0xe5, 0x11, 0x5e, 0xf2, 0x12, 0x5e, 0xfe, 0x54, 0x60,
0x79, 0xc4, 0xd0, 0x69, 0x82, 0xb1, 0x03, 0x4b, 0x21, 0x40, 0xdb, 0x24, 0xcc, 0x70, 0x69, 0xdf,
0xfb, 0xf6, 0xc3, 0x32, 0xb7, 0xf6, 0x66, 0x3a, 0x89, 0x4c, 0x5b, 0x0c, 0x55, 0x34, 0x22, 0x1a,
0xf0, 0xaf, 0x0a, 0x2c, 0x78, 0x97, 0xf8, 0xfc, 0x32, 0x77, 0x22, 0x4e, 0x7f, 0x57, 0x60, 0x39,
0xb8, 0xe7, 0xff, 0x73, 0x4b, 0x7f, 0x56, 0x40, 0x5d, 0x77, 0x89, 0xce, 0xc9, 0xe7, 0x5e, 0x1c,
0xd6, 0xf7, 0x74, 0xdb, 0x26, 0xbd, 0xe9, 0x12, 0xe0, 0x3a, 0x54, 0x5c, 0xdf, 0xd9, 0xb6, 0xe1,
0xeb, 0x13, 0xa6, 0x17, 0xb5, 0x72, 0xb0, 0x1d, 0xa0, 0xa0, 0xb7, 0xa1, 0xec, 0x12, 0x36, 0xe8,
0x1d, 0xcb, 0xe5, 0x84, 0xdc, 0x45, 0x7f, 0x37, 0x10, 0xc3, 0xbf, 0x28, 0xb0, 0x7c, 0xc7, 0x34,
0xa3, 0x06, 0x4e, 0x71, 0x97, 0x6e, 0xc1, 0xa5, 0x84, 0x75, 0x01, 0xb5, 0x45, 0xad, 0x1a, 0xb7,
0xaf, 0xd9, 0x40, 0x37, 0xa0, 0x1a, 0xb7, 0x30, 0xa0, 0xba, 0xa8, 0x55, 0x62, 0x36, 0x36, 0x1b,
0xf8, 0x6f, 0x05, 0x54, 0x8d, 0x58, 0xce, 0x3e, 0x91, 0x1a, 0xfa, 0x5c, 0x4c, 0x0e, 0xbd, 0xcb,
0x4e, 0xe7, 0x5d, 0xee, 0x14, 0xde, 0xe5, 0xe5, 0xde, 0x3d, 0x86, 0xa5, 0x87, 0x3a, 0x37, 0xf6,
0x1a, 0xd6, 0xf4, 0x11, 0xb8, 0x02, 0x10, 0xe2, 0xf9, 0x45, 0xa1, 0xa8, 0x45, 0x76, 0xf0, 0x1f,
0x59, 0x40, 0xde, 0x25, 0x6f, 0x91, 0xae, 0x45, 0x6c, 0xfe, 0xf2, 0x2f, 0x4e, 0xe2, 0x5d, 0xc8,
0x8f, 0xbe, 0x0b, 0x57, 0x00, 0x98, 0x6f, 0x9d, 0xe7, 0xc2, 0x8c, 0xb8, 0x58, 0x91, 0x1d, 0xa4,
0xc2, 0x85, 0x5d, 0x4a, 0x7a, 0xa6, 0xf7, 0xb7, 0x20, 0xfe, 0x86, 0x6b, 0xf4, 0x09, 0x94, 0x03,
0xc9, 0xb6, 0x78, 0x2a, 0x58, 0x6d, 0x56, 0x56, 0x17, 0xbd, 0x4e, 0xb5, 0x1e, 0x50, 0x20, 0x8a,
0x62, 0xd3, 0xde, 0x75, 0xb4, 0x8b, 0x2c, 0xb2, 0xc3, 0xf0, 0xbf, 0x0a, 0x2c, 0x06, 0x85, 0xe6,
0xdc, 0xd8, 0x9a, 0xa0, 0xcc, 0x4c, 0xc3, 0x17, 0xfe, 0x51, 0x81, 0xe5, 0x75, 0xc7, 0xea, 0x3b,
0x76, 0xe8, 0xf7, 0x74, 0xf5, 0xe9, 0x23, 0xff, 0x10, 0x19, 0x36, 0xc7, 0xd7, 0xc6, 0x34, 0xc7,
0x49, 0xd0, 0xe0, 0x14, 0xfe, 0x47, 0x81, 0xb9, 0x80, 0x6d, 0x2f, 0x26, 0xe8, 0x32, 0x14, 0x43,
0x57, 0x82, 0x26, 0xe2, 0x78, 0x63, 0x84, 0xc2, 0x6c, 0x7a, 0xc2, 0xe5, 0x46, 0x13, 0xee, 0x15,
0xb8, 0x60, 0x11, 0xab, 0xcd, 0xe8, 0x11, 0x09, 0xf2, 0x71, 0xd6, 0x22, 0x56, 0x8b, 0x1e, 0x11,
0xef, 0x97, 0x3d, 0xb0, 0xda, 0xae, 0x73, 0xe0, 0x31, 0x2b, 0x7e, 0xd9, 0x03, 0x4b, 0x73, 0x0e,
0x18, 0x7a, 0x0d, 0x80, 0xda, 0x26, 0x79, 0xd6, 0xb6, 0x75, 0x8b, 0xd4, 0x0a, 0xe2, 0x6a, 0x17,
0xc5, 0xce, 0xa6, 0x6e, 0x11, 0x54, 0x83, 0x59, 0xb1, 0x68, 0x36, 0x6a, 0xb3, 0xfe, 0xc1, 0x60,
0x89, 0x77, 0x01, 0x45, 0x3c, 0x9c, 0xea, 0xaa, 0x47, 0xe2, 0x9e, 0x4d, 0xc6, 0xdd, 0x6b, 0xca,
0xe7, 0x63, 0x40, 0xd3, 0xc4, 0xf5, 0x7d, 0x98, 0xa1, 0xf6, 0xae, 0x33, 0xec, 0x33, 0x5e, 0x97,
0xf5, 0x19, 0x51, 0x30, 0x5f, 0xfa, 0xe6, 0x11, 0x94, 0xe3, 0xdd, 0x07, 0x2a, 0xc1, 0x85, 0x4d,
0x87, 0xdf, 0x7b, 0x46, 0x19, 0xaf, 0x66, 0x50, 0x19, 0x60, 0xd3, 0xe1, 0x5b, 0x2e, 0x61, 0xc4,
0xe6, 0x55, 0x05, 0x01, 0x14, 0x3e, 0xb3, 0x1b, 0x94, 0x3d, 0xa9, 0x66, 0xd1, 0x7c, 0xd0, 0x54,
0xea, 0xbd, 0xa6, 0x7d, 0x9f, 0x58, 0x8e, 0x7b, 0x58, 0xcd, 0x79, 0xc7, 0xc3, 0x55, 0x1e, 0x55,
0xa1, 0x14, 0x8a, 0x6c, 0x6c, 0x3d, 0xa8, 0xce, 0xa0, 0x22, 0xcc, 0xf8, 0x9f, 0x85, 0xb5, 0x1f,
0x00, 0x4a, 0xe2, 0xb9, 0x68, 0xf9, 0x23, 0x29, 0x32, 0xa0, 0x14, 0x1d, 0x05, 0xd1, 0x75, 0x99,
0x13, 0x92, 0x71, 0x55, 0x5d, 0x49, 0x17, 0xf4, 0xb9, 0xc5, 0x19, 0xf4, 0x18, 0x2a, 0xf1, 0xf9,
0x8b, 0xa1, 0x1b, 0x52, 0xb2, 0x64, 0x33, 0xa0, 0x7a, 0x73, 0x12, 0xd1, 0x10, 0xab, 0x0b, 0xe5,
0x58, 0xa3, 0xcf, 0xd0, 0xca, 0xb8, 0xf3, 0xc9, 0x56, 0x49, 0xbd, 0x31, 0x81, 0x64, 0x08, 0xf4,
0x05, 0x94, 0x63, 0x9d, 0xe1, 0x18, 0x20, 0x59, 0xf7, 0xa8, 0x9e, 0x94, 0x5e, 0x38, 0x83, 0xda,
0x70, 0x29, 0xd9, 0xcd, 0x31, 0x74, 0x4b, 0x4e, 0xb8, 0xb4, 0xe9, 0x4b, 0x03, 0xd8, 0xf1, 0x6d,
0x3f, 0x26, 0x50, 0x1e, 0x0f, 0xe9, 0xf8, 0x9a, 0xa6, 0xfb, 0xeb, 0xd0, 0xf8, 0x88, 0xfa, 0x77,
0x4e, 0x30, 0xfe, 0xd4, 0x08, 0x1d, 0x40, 0xa3, 0x2d, 0x24, 0x52, 0xa5, 0x87, 0xee, 0x59, 0x7d,
0x7e, 0xa8, 0xd6, 0x65, 0xf0, 0xe3, 0xdb, 0x50, 0x9c, 0x41, 0x0f, 0x01, 0x6d, 0x10, 0xbe, 0x4d,
0x2d, 0xb2, 0x4d, 0x8d, 0x27, 0x93, 0x60, 0x24, 0x9e, 0xd3, 0x60, 0xd1, 0xe2, 0x2e, 0xb5, 0xbb,
0xb1, 0xb4, 0x59, 0xd8, 0x20, 0xa2, 0xc2, 0x53, 0xc6, 0xa9, 0xc1, 0x5e, 0xa0, 0x6a, 0x47, 0xd8,
0x9c, 0x1c, 0x3a, 0x6f, 0x4e, 0x32, 0xfe, 0x04, 0xc4, 0xdf, 0x9a, 0x48, 0x36, 0x04, 0xdc, 0x11,
0x80, 0x89, 0x67, 0xeb, 0x44, 0x4f, 0x26, 0x7c, 0xfa, 0x70, 0x06, 0x19, 0x50, 0xf6, 0x78, 0x8a,
0x3c, 0x7b, 0xd7, 0xd2, 0xea, 0x6b, 0xe0, 0xc4, 0xf5, 0x54, 0xb9, 0xa1, 0x03, 0x6b, 0x7f, 0x15,
0xa0, 0x28, 0x12, 0x40, 0xd4, 0xbe, 0x33, 0x8b, 0xf9, 0x36, 0x54, 0x82, 0x98, 0xbf, 0xc8, 0x70,
0xb7, 0x4f, 0xcd, 0xbe, 0x34, 0xbc, 0x63, 0x5a, 0x1d, 0x9c, 0x41, 0x8f, 0xa0, 0x92, 0x18, 0x83,
0xe4, 0x45, 0x68, 0xcc, 0xac, 0x94, 0x76, 0x8d, 0x0d, 0x40, 0xa3, 0xf3, 0x0b, 0xaa, 0xcb, 0x2b,
0xc5, 0xb8, 0x39, 0x27, 0x0d, 0xe4, 0x2b, 0xa8, 0x24, 0xe6, 0x08, 0xf9, 0x85, 0x90, 0x0f, 0x1b,
0x69, 0xda, 0x1f, 0x40, 0x29, 0x32, 0x38, 0x30, 0x79, 0x8a, 0x8e, 0x8e, 0x16, 0x69, 0x6a, 0xbf,
0x84, 0x4a, 0xbc, 0xc9, 0x1e, 0xf3, 0x5e, 0x4a, 0x3b, 0xf1, 0x74, 0xda, 0xcf, 0xfe, 0x62, 0xdd,
0xbd, 0xb3, 0xf3, 0x71, 0x97, 0xf2, 0xbd, 0x41, 0xc7, 0x83, 0x5f, 0x3d, 0xa2, 0xbd, 0x1e, 0x3d,
0xe2, 0xc4, 0xd8, 0x5b, 0xf5, 0x35, 0xbc, 0x6b, 0x52, 0xc6, 0x5d, 0xda, 0x19, 0x70, 0x62, 0xae,
0x0e, 0x8b, 0xc0, 0xaa, 0x50, 0xbb, 0x2a, 0xd4, 0xf6, 0x3b, 0x9d, 0x82, 0x58, 0xde, 0xfe, 0x2f,
0x00, 0x00, 0xff, 0xff, 0x36, 0xce, 0x45, 0x9b, 0x98, 0x17, 0x00, 0x00,
0x13, 0x70, 0x98, 0x74, 0x60, 0x78, 0x82, 0x49, 0xe2, 0x4e, 0xc6, 0x33, 0x34, 0xa4, 0x72, 0x3a,
0x1d, 0x02, 0x1d, 0x23, 0x4b, 0x17, 0xe7, 0x5a, 0xfd, 0x71, 0x75, 0xe7, 0xa4, 0xc9, 0x0b, 0x30,
0xc3, 0x23, 0x03, 0x9f, 0x81, 0x81, 0x81, 0x19, 0x5e, 0xf8, 0x36, 0xbc, 0xf0, 0x02, 0xdf, 0x84,
0xd1, 0x49, 0x56, 0x24, 0x59, 0x8e, 0x9c, 0xba, 0x69, 0x78, 0xd3, 0x9d, 0xf6, 0xf6, 0xb7, 0xfb,
0xdb, 0xbd, 0xbd, 0x5d, 0x98, 0x7d, 0xd6, 0x27, 0xce, 0x71, 0x9b, 0x11, 0xe7, 0x90, 0x6a, 0xa4,
0xde, 0x73, 0x6c, 0x6e, 0x23, 0x64, 0x52, 0xe3, 0xb0, 0xcf, 0xbc, 0x55, 0x5d, 0x48, 0xc8, 0x25,
0xcd, 0x36, 0x4d, 0xdb, 0xf2, 0xf6, 0xe4, 0x52, 0x58, 0x42, 0x2e, 0x53, 0x8b, 0x13, 0xc7, 0x52,
0x0d, 0x7f, 0x8d, 0x74, 0x95, 0xab, 0x51, 0x9d, 0xf8, 0x1b, 0x98, 0x55, 0x48, 0x97, 0x32, 0x4e,
0x9c, 0x6d, 0x5b, 0x27, 0x0a, 0x79, 0xd6, 0x27, 0x8c, 0xa3, 0x0f, 0x20, 0xdf, 0x51, 0x19, 0xa9,
0x49, 0x37, 0xa5, 0xa5, 0x99, 0xb5, 0xeb, 0xf5, 0x08, 0xb2, 0x0f, 0x79, 0x9f, 0x75, 0x37, 0x54,
0x46, 0x14, 0x21, 0x89, 0x3e, 0x82, 0x69, 0x55, 0xd7, 0x1d, 0xc2, 0x58, 0x2d, 0x7b, 0xc6, 0xa1,
0x75, 0x4f, 0x46, 0x19, 0x08, 0xe3, 0x9f, 0x24, 0x98, 0x8b, 0x5a, 0xc0, 0x7a, 0xb6, 0xc5, 0x08,
0xba, 0x0b, 0x05, 0xc6, 0x55, 0xde, 0x67, 0xbe, 0x11, 0xaf, 0x27, 0xea, 0x6b, 0x09, 0x11, 0xc5,
0x17, 0x45, 0x1b, 0x30, 0x43, 0x2d, 0xca, 0xdb, 0x3d, 0xd5, 0x51, 0xcd, 0x81, 0x25, 0x6f, 0x45,
0x4f, 0x06, 0xac, 0x34, 0x2d, 0xca, 0x77, 0x84, 0xa0, 0x02, 0x34, 0xf8, 0xc6, 0x8f, 0x61, 0xbe,
0x75, 0x60, 0x1f, 0x6d, 0xda, 0x86, 0x41, 0x34, 0x4e, 0x6d, 0xeb, 0xc5, 0x49, 0x41, 0x90, 0xd7,
0x3b, 0xcd, 0x86, 0xb0, 0x23, 0xa7, 0x88, 0x6f, 0xcc, 0x60, 0x21, 0xae, 0x7e, 0x12, 0x8f, 0xdf,
0x81, 0xab, 0x5a, 0xa0, 0xaa, 0xd9, 0x70, 0x7d, 0xce, 0x2d, 0xe5, 0x94, 0xe8, 0x26, 0xfe, 0x4e,
0x82, 0xf9, 0xcf, 0x6c, 0x55, 0xbf, 0x20, 0xa7, 0x10, 0x86, 0x52, 0x18, 0xb0, 0x96, 0x13, 0xff,
0x22, 0x7b, 0xf8, 0x7b, 0x09, 0x6a, 0x0a, 0x31, 0x88, 0xca, 0xc8, 0x65, 0x9a, 0xf1, 0xad, 0x04,
0x73, 0x6e, 0x00, 0x76, 0x54, 0x87, 0xd3, 0xcb, 0x31, 0xa1, 0xe7, 0x65, 0x58, 0xc8, 0x82, 0x49,
0x32, 0x00, 0x43, 0xa9, 0x37, 0xd0, 0x74, 0x9a, 0x00, 0x91, 0x3d, 0x6c, 0x42, 0x25, 0x40, 0x73,
0x8f, 0x13, 0x86, 0x6e, 0xc2, 0x4c, 0x48, 0x44, 0x00, 0xe6, 0x94, 0xf0, 0x16, 0xfa, 0x18, 0xa6,
0x5c, 0x08, 0x22, 0xfc, 0x2b, 0xaf, 0xe1, 0xfa, 0x70, 0xfd, 0xa9, 0x47, 0xb5, 0x2a, 0xde, 0x01,
0xfc, 0x9b, 0x04, 0x0b, 0x31, 0xbc, 0x57, 0xce, 0xf2, 0x10, 0x2f, 0xf9, 0x04, 0x5e, 0xfe, 0x90,
0x60, 0x71, 0xc8, 0xd0, 0x49, 0x82, 0xb1, 0x07, 0x0b, 0x01, 0x40, 0x5b, 0x27, 0x4c, 0x73, 0x68,
0xcf, 0xfd, 0xf6, 0xc2, 0x32, 0xb3, 0xf6, 0x76, 0x3a, 0x89, 0x4c, 0x99, 0x0f, 0x54, 0x34, 0x42,
0x1a, 0xf0, 0xaf, 0x12, 0xcc, 0xb9, 0x97, 0xf8, 0xf2, 0x32, 0x77, 0x2c, 0x4e, 0x7f, 0x97, 0x60,
0xd1, 0xbf, 0xe7, 0xff, 0x73, 0x4b, 0x7f, 0x96, 0x40, 0xde, 0x74, 0x88, 0xca, 0xc9, 0x03, 0x37,
0x0e, 0x9b, 0x07, 0xaa, 0x65, 0x11, 0x63, 0xb2, 0x04, 0xb8, 0x0d, 0x15, 0xc7, 0x73, 0xb6, 0xad,
0x79, 0xfa, 0x84, 0xe9, 0x45, 0xa5, 0xec, 0x6f, 0xfb, 0x28, 0xe8, 0x5d, 0x28, 0x3b, 0x84, 0xf5,
0x8d, 0x53, 0xb9, 0x9c, 0x90, 0xbb, 0xea, 0xed, 0xfa, 0x62, 0xf8, 0x17, 0x09, 0x16, 0xd7, 0x75,
0x3d, 0x6c, 0xe0, 0x04, 0x77, 0x69, 0x05, 0xae, 0xc5, 0xac, 0xf3, 0xa9, 0x2d, 0x2a, 0xd5, 0xa8,
0x7d, 0xcd, 0x06, 0xba, 0x03, 0xd5, 0xa8, 0x85, 0x3e, 0xd5, 0x45, 0xa5, 0x12, 0xb1, 0xb1, 0xd9,
0xc0, 0x7f, 0x4b, 0x20, 0x2b, 0xc4, 0xb4, 0x0f, 0x49, 0xa2, 0xa1, 0x2f, 0xc4, 0xe4, 0xc0, 0xbb,
0xec, 0x64, 0xde, 0xe5, 0xce, 0xe1, 0x5d, 0x3e, 0xd9, 0xbb, 0x27, 0xb0, 0xf0, 0x48, 0xe5, 0xda,
0x41, 0xc3, 0x9c, 0x3c, 0x02, 0x37, 0x00, 0x02, 0x3c, 0xaf, 0x28, 0x14, 0x95, 0xd0, 0x0e, 0xfe,
0x33, 0x0b, 0xc8, 0xbd, 0xe4, 0x2d, 0xd2, 0x35, 0x89, 0xc5, 0x5f, 0xfd, 0xc5, 0x89, 0xbd, 0x0b,
0xf9, 0xe1, 0x77, 0xe1, 0x06, 0x00, 0xf3, 0xac, 0x73, 0x5d, 0x98, 0x12, 0x17, 0x2b, 0xb4, 0x83,
0x64, 0xb8, 0xb2, 0x4f, 0x89, 0xa1, 0xbb, 0x7f, 0x0b, 0xe2, 0x6f, 0xb0, 0x46, 0x0f, 0x00, 0x19,
0x2a, 0xe3, 0x6d, 0x5f, 0xbc, 0xed, 0x3d, 0x30, 0xd3, 0xc2, 0xab, 0x58, 0x6d, 0x74, 0xbb, 0xd5,
0xba, 0x4f, 0x83, 0x28, 0x8c, 0x4d, 0x6b, 0xdf, 0x56, 0xaa, 0xee, 0xf1, 0xf0, 0x2e, 0xfe, 0x57,
0x82, 0x79, 0xbf, 0xde, 0x5c, 0x1a, 0x69, 0x63, 0x54, 0x9b, 0x49, 0x68, 0xc3, 0x3f, 0x4a, 0xb0,
0xb8, 0x69, 0x9b, 0x3d, 0xdb, 0x1a, 0xb8, 0x3d, 0xe1, 0x3b, 0xf5, 0x89, 0x77, 0x88, 0x0c, 0x7a,
0xe4, 0x5b, 0x23, 0x7a, 0xe4, 0x38, 0xa8, 0x7f, 0x0a, 0xff, 0x23, 0xc1, 0x8c, 0xcf, 0xb6, 0x1b,
0x16, 0x74, 0x1d, 0x8a, 0x81, 0x2b, 0x7e, 0x2f, 0x71, 0xba, 0x31, 0x44, 0x61, 0x36, 0x3d, 0xef,
0x72, 0xc3, 0x79, 0xf7, 0x1a, 0x5c, 0x31, 0x89, 0xd9, 0x66, 0xf4, 0x84, 0xf8, 0x69, 0x39, 0x6d,
0x12, 0xb3, 0x45, 0x4f, 0x88, 0xfb, 0xcb, 0xea, 0x9b, 0x6d, 0xc7, 0x3e, 0x72, 0x99, 0x15, 0xbf,
0xac, 0xbe, 0xa9, 0xd8, 0x47, 0x0c, 0xbd, 0x01, 0x40, 0x2d, 0x9d, 0x3c, 0x6f, 0x5b, 0xaa, 0x49,
0x6a, 0x05, 0x71, 0xc3, 0x8b, 0x62, 0x67, 0x5b, 0x35, 0x09, 0xaa, 0xc1, 0xb4, 0x58, 0x34, 0x1b,
0x22, 0x0b, 0x73, 0xca, 0x60, 0x89, 0xf7, 0x01, 0x85, 0x3c, 0x9c, 0xe8, 0xc6, 0x87, 0xe2, 0x9e,
0x8d, 0xc7, 0xdd, 0xed, 0xcd, 0x67, 0x23, 0x40, 0x93, 0xc4, 0xf5, 0x43, 0x98, 0xa2, 0xd6, 0xbe,
0x3d, 0x68, 0x37, 0xde, 0x4c, 0x6a, 0x37, 0xc2, 0x60, 0x9e, 0xf4, 0xf2, 0x09, 0x94, 0xa3, 0x4d,
0x08, 0x2a, 0xc1, 0x95, 0x6d, 0x9b, 0xdf, 0x7b, 0x4e, 0x19, 0xaf, 0x66, 0x50, 0x19, 0x60, 0xdb,
0xe6, 0x3b, 0x0e, 0x61, 0xc4, 0xe2, 0x55, 0x09, 0x01, 0x14, 0x3e, 0xb7, 0x1a, 0x94, 0x3d, 0xad,
0x66, 0xd1, 0xac, 0xdf, 0x5b, 0xaa, 0x46, 0xd3, 0xba, 0x4f, 0x4c, 0xdb, 0x39, 0xae, 0xe6, 0xdc,
0xe3, 0xc1, 0x2a, 0x8f, 0xaa, 0x50, 0x0a, 0x44, 0xb6, 0x76, 0x1e, 0x56, 0xa7, 0x50, 0x11, 0xa6,
0xbc, 0xcf, 0xc2, 0xda, 0x0f, 0x00, 0x25, 0xf1, 0x6a, 0xb4, 0xbc, 0xc9, 0x14, 0x69, 0x50, 0x0a,
0x4f, 0x84, 0xe8, 0x76, 0x92, 0x13, 0x09, 0x53, 0xab, 0xbc, 0x94, 0x2e, 0xe8, 0x71, 0x8b, 0x33,
0xe8, 0x09, 0x54, 0xa2, 0x63, 0x18, 0x43, 0x77, 0x12, 0xc9, 0x4a, 0x1a, 0x05, 0xe5, 0xe5, 0x71,
0x44, 0x03, 0xac, 0x2e, 0x94, 0x23, 0xfd, 0x3e, 0x43, 0x4b, 0xa3, 0xce, 0xc7, 0x3b, 0x26, 0xf9,
0xce, 0x18, 0x92, 0x01, 0xd0, 0x17, 0x50, 0x8e, 0x34, 0x88, 0x23, 0x80, 0x92, 0x9a, 0x48, 0xf9,
0xac, 0xf4, 0xc2, 0x19, 0xd4, 0x86, 0x6b, 0xf1, 0xa6, 0x8e, 0xa1, 0x95, 0x64, 0xc2, 0x13, 0x7b,
0xbf, 0x34, 0x80, 0x3d, 0xcf, 0xf6, 0x53, 0x02, 0x93, 0xe3, 0x91, 0x38, 0xc5, 0xa6, 0xe9, 0xfe,
0x3a, 0x30, 0x3e, 0xa4, 0xfe, 0xbd, 0x33, 0x8c, 0x3f, 0x37, 0x42, 0x07, 0xd0, 0x70, 0x27, 0x89,
0xe4, 0xc4, 0x43, 0xf7, 0xcc, 0x1e, 0x3f, 0x96, 0xeb, 0x49, 0xf0, 0xa3, 0xbb, 0x51, 0x9c, 0x41,
0x8f, 0x00, 0x6d, 0x11, 0xbe, 0x4b, 0x4d, 0xb2, 0x4b, 0xb5, 0xa7, 0xe3, 0x60, 0xc4, 0x5e, 0x54,
0x7f, 0xd1, 0xe2, 0x0e, 0xb5, 0xba, 0x91, 0xb4, 0x99, 0xdb, 0x22, 0xa2, 0xc2, 0x53, 0xc6, 0xa9,
0xc6, 0x5e, 0xa2, 0x6a, 0x5b, 0xd8, 0x1c, 0x9f, 0x3d, 0x97, 0xc7, 0x99, 0x82, 0x7c, 0xe2, 0x57,
0xc6, 0x92, 0x0d, 0x00, 0xf7, 0x04, 0x60, 0xec, 0xd9, 0x3a, 0xd3, 0x93, 0x31, 0x9f, 0x3e, 0x9c,
0x41, 0x1a, 0x94, 0x5d, 0x9e, 0x42, 0xcf, 0xde, 0xad, 0xb4, 0xfa, 0xea, 0x3b, 0x71, 0x3b, 0x55,
0x6e, 0xe0, 0xc0, 0xda, 0x5f, 0x05, 0x28, 0x8a, 0x04, 0x10, 0xb5, 0xef, 0xc2, 0x62, 0xbe, 0x0b,
0x15, 0x3f, 0xe6, 0x2f, 0x33, 0xdc, 0xed, 0x73, 0xb3, 0x9f, 0x18, 0xde, 0x11, 0xad, 0x0e, 0xce,
0xa0, 0xc7, 0x50, 0x89, 0x4d, 0x43, 0xc9, 0x45, 0x68, 0xc4, 0xc8, 0x94, 0x76, 0x8d, 0x35, 0x40,
0xc3, 0x63, 0x0c, 0xaa, 0x27, 0x57, 0x8a, 0x51, 0xe3, 0x4e, 0x1a, 0xc8, 0x57, 0x50, 0x89, 0x8d,
0x13, 0xc9, 0x17, 0x22, 0x79, 0xe6, 0x48, 0xd3, 0xfe, 0x10, 0x4a, 0xa1, 0xf9, 0x81, 0x25, 0xa7,
0xe8, 0xf0, 0x84, 0x91, 0xa6, 0xf6, 0x4b, 0xa8, 0x44, 0x9b, 0xec, 0x11, 0xef, 0x65, 0x62, 0x27,
0x9e, 0x4e, 0xfb, 0xc5, 0x5f, 0xac, 0x8d, 0xf5, 0xbd, 0x4f, 0xbb, 0x94, 0x1f, 0xf4, 0x3b, 0x2e,
0xfc, 0xea, 0x09, 0x35, 0x0c, 0x7a, 0xc2, 0x89, 0x76, 0xb0, 0xea, 0x69, 0x78, 0x5f, 0xa7, 0x8c,
0x3b, 0xb4, 0xd3, 0xe7, 0x44, 0x5f, 0x1d, 0x14, 0x81, 0x55, 0xa1, 0x76, 0x55, 0xa8, 0xed, 0x75,
0x3a, 0x05, 0xb1, 0xbc, 0xfb, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe1, 0xdf, 0xec, 0xde, 0x9f,
0x17, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
......
......@@ -445,20 +445,18 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
}
// segments are ordered before LoadSegments calling
for i, state := range in.SegmentStates {
if state.State == commonpb.SegmentState_SegmentGrowing {
position := state.StartPosition
err = node.loadService.seekSegment(position)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
return status, err
if in.LastSegmentState.State == commonpb.SegmentState_SegmentGrowing {
segmentNum := len(segmentIDs)
position := in.LastSegmentState.StartPosition
err = node.loadService.seekSegment(position)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
segmentIDs = segmentIDs[:i]
break
return status, err
}
segmentIDs = segmentIDs[:segmentNum-1]
}
err = node.loadService.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs)
......
......@@ -13,9 +13,6 @@ type metaReplica interface {
loadPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error)
updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, state querypb.PartitionState) error
getPartitionStates(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) ([]*querypb.PartitionStates, error)
releaseCollection(dbID UniqueID, collectionID UniqueID) error
releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error
addDmChannels(collectionID UniqueID, channels []string) error
}
type segment struct {
......@@ -29,9 +26,9 @@ type partition struct {
}
type collection struct {
id UniqueID
partitions map[UniqueID]*partition
dmChannelNames []string
id UniqueID
partitions map[UniqueID]*partition
node2channel map[int][]string
}
type metaReplicaImpl struct {
......@@ -51,18 +48,18 @@ func newMetaReplica() metaReplica {
}
func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) {
//TODO:: assert dbID = 0 exist
if _, ok := mp.db2collections[dbID]; ok {
partitions := make(map[UniqueID]*partition)
node2channel := make(map[int][]string)
newCollection := &collection{
id: collectionID,
partitions: partitions,
id: collectionID,
partitions: partitions,
node2channel: node2channel,
}
mp.db2collections[dbID] = append(mp.db2collections[dbID], newCollection)
return newCollection, nil
}
return nil, errors.New("addCollection: can't find dbID when add collection")
return nil, errors.New("can't find dbID when add collection")
}
func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) {
......@@ -81,7 +78,7 @@ func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, pa
}
}
}
return nil, errors.New("addPartition: can't find collection when add partition")
return nil, errors.New("can't find collection when add partition")
}
func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) {
......@@ -89,7 +86,7 @@ func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error)
return collections, nil
}
return nil, errors.New("getCollections: can't find collectionID")
return nil, errors.New("can't find collectionID")
}
func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) {
......@@ -105,7 +102,7 @@ func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) (
}
}
return nil, errors.New("getPartitions: can't find partitionIDs")
return nil, errors.New("can't find partitionIDs")
}
func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) {
......@@ -122,7 +119,7 @@ func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, par
}
}
}
return nil, errors.New("getSegments: can't find segmentID")
return nil, errors.New("can't find segmentID")
}
func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) {
......@@ -130,16 +127,14 @@ func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID)
if collections, err := mp.getCollections(dbID); err == nil {
for _, collection := range collections {
if collectionID == collection.id {
res = collection
return res, nil
}
}
}
if res == nil {
collection, err := mp.addCollection(dbID, collectionID)
} else {
res, err = mp.addCollection(dbID, collectionID)
if err != nil {
return nil, err
}
res = collection
}
return res, nil
}
......@@ -182,7 +177,7 @@ func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID,
}
}
}
return errors.New("updatePartitionState: update partition state fail")
return errors.New("update partition state fail")
}
func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID,
......@@ -208,54 +203,3 @@ func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID,
}
return partitionStates, nil
}
func (mp *metaReplicaImpl) releaseCollection(dbID UniqueID, collectionID UniqueID) error {
if collections, ok := mp.db2collections[dbID]; ok {
for i, collection := range collections {
if collectionID == collection.id {
collections = append(collections[:i], collections[i+1:]...)
return nil
}
}
}
return errors.New("releaseCollection: can't find dbID or collectionID")
}
func (mp *metaReplicaImpl) releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
if _, ok := collection.partitions[partitionID]; ok {
delete(collection.partitions, partitionID)
return nil
}
}
}
}
return errors.New("releasePartition: can't find dbID or collectionID or partitionID")
}
func (mp *metaReplicaImpl) addDmChannels(collectionID UniqueID, channels []string) error {
//TODO :: use dbID
if collections, ok := mp.db2collections[0]; ok {
for _, collection := range collections {
if collectionID == collection.id {
dmChannels := collection.dmChannelNames
for _, channel := range channels {
match := false
for _, existedChannel := range dmChannels {
if channel == existedChannel {
match = true
break
}
}
if !match {
dmChannels = append(dmChannels, channel)
}
}
return nil
}
}
}
return errors.New("addDmChannels: can't find dbID or collectionID")
}
......@@ -8,6 +8,8 @@ import (
type queryNodeInfo struct {
client QueryNodeInterface
insertChannels string
nodeID uint64
segments []UniqueID
dmChannelNames []string
}
......@@ -23,21 +25,3 @@ func (qn *queryNodeInfo) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb
func (qn *queryNodeInfo) GetSegmentInfo(in *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) {
return qn.client.GetSegmentInfo(in)
}
func (qn *queryNodeInfo) WatchDmChannels(in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
return qn.client.WatchDmChannels(in)
}
func (qn *queryNodeInfo) AddDmChannels(channels []string) {
qn.dmChannelNames = append(qn.dmChannelNames, channels...)
}
func newQueryNodeInfo(client QueryNodeInterface) *queryNodeInfo {
segments := make([]UniqueID, 0)
dmChannelNames := make([]string, 0)
return &queryNodeInfo{
client: client,
segments: segments,
dmChannelNames: dmChannelNames,
}
}
......@@ -3,6 +3,7 @@ package queryservice
import (
"context"
"fmt"
"log"
"sort"
"strconv"
"sync/atomic"
......@@ -24,7 +25,6 @@ type MasterServiceInterface interface {
type DataServiceInterface interface {
GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error)
GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error)
}
type QueryNodeInterface interface {
......@@ -47,7 +47,7 @@ type QueryService struct {
dataServiceClient DataServiceInterface
masterServiceClient MasterServiceInterface
queryNodes map[UniqueID]*queryNodeInfo
queryNodes []*queryNodeInfo
numRegisterNode uint64
numQueryChannel uint64
......@@ -87,7 +87,7 @@ func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, erro
componentStates, err := node.GetComponentStates()
if err != nil {
subComponentInfos = append(subComponentInfos, &internalpb2.ComponentInfo{
NodeID: nodeID,
NodeID: int64(nodeID),
StateCode: internalpb2.StateCode_ABNORMAL,
})
continue
......@@ -111,21 +111,28 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) {
return Params.StatsChannelName, nil
}
// TODO:: do addWatchDmChannel to query node after registerNode
func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
fmt.Println("register query node =", req.Address)
// TODO:: add mutex
allocatedID := len(qs.queryNodes)
allocatedID := uint64(len(qs.queryNodes))
registerNodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10)
var node *queryNodeInfo
if qs.enableGrpc {
client := nodeclient.NewClient(registerNodeAddress)
node = newQueryNodeInfo(client)
node = &queryNodeInfo{
client: client,
nodeID: allocatedID,
}
} else {
client := querynode.NewQueryNode(qs.loopCtx, uint64(allocatedID))
node = newQueryNodeInfo(client)
client := querynode.NewQueryNode(qs.loopCtx, allocatedID)
node = &queryNodeInfo{
client: client,
nodeID: allocatedID,
}
}
qs.queryNodes[UniqueID(allocatedID)] = node
qs.queryNodes = append(qs.queryNodes, node)
//TODO::return init params to queryNode
return &querypb.RegisterNodeResponse{
......@@ -179,20 +186,10 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com
if err != nil {
return fn(err), err
}
if len(collection.dmChannelNames) != 0 {
if collection == nil {
return fn(nil), nil
}
channelRequest := datapb.InsertChannelRequest{
DbID: req.DbID,
CollectionID: req.CollectionID,
}
dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
if err != nil {
return fn(err), err
}
// get partitionIDs
showPartitionRequest := &milvuspb.ShowPartitionRequest{
Base: &commonpb.MsgBase{
......@@ -210,21 +207,6 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com
}
partitionIDs := showPartitionResponse.PartitionIDs
if len(partitionIDs) == 0 {
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
}
for _, node := range qs.queryNodes {
_, err := node.LoadSegments(loadSegmentRequest)
if err != nil {
return fn(err), err
}
}
nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels)
err = qs.watchDmChannels(dmChannels, nodeIDs, collection)
return fn(err), err
}
loadPartitionsRequest := &querypb.LoadPartitionRequest{
Base: req.Base,
DbID: dbID,
......@@ -262,14 +244,6 @@ func (qs *QueryService) ReleaseCollection(req *querypb.ReleaseCollectionRequest)
status, err := qs.ReleasePartitions(releasePartitionRequest)
err = qs.replica.releaseCollection(dbID, collectionID)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, err
}
//TODO:: queryNode cancel subscribe dmChannels
return status, err
}
......@@ -302,40 +276,16 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
dbID := req.DbID
collectionID := req.CollectionID
partitionIDs := req.PartitionIDs
qs.replica.loadPartition(dbID, collectionID, partitionIDs[0])
fn := func(err error) *commonpb.Status {
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
}
if len(partitionIDs) == 0 {
err := errors.New("partitionIDs are empty")
return fn(err), err
}
var collection *collection = nil
var err error
if collection, err = qs.replica.loadCollection(dbID, collectionID); err != nil {
return fn(err), err
}
// get segments and load segment to query node
for _, partitionID := range partitionIDs {
partition, err := qs.replica.loadPartition(dbID, collectionID, partitionID)
if err != nil {
return fn(err), err
}
if partition == nil {
return fn(err), nil
}
showSegmentRequest := &milvuspb.ShowSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowSegment,
......@@ -347,110 +297,92 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
if err != nil {
return fn(err), err
}
if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Fatal("showSegment fail, v%", showSegmentResponse.Status.Reason)
}
segmentIDs := showSegmentResponse.SegmentIDs
segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo)
channel2segs := make(map[string][]UniqueID)
channel2id := make(map[string]int)
//id2channels := make(map[int][]string)
id2segs := make(map[int][]UniqueID)
offset := 0
resp, err := qs.dataServiceClient.GetSegmentStates(&datapb.SegmentStatesRequest{
SegmentIDs: segmentIDs,
})
if err != nil {
return fn(err), err
log.Fatal("get segment states fail")
}
for _, state := range resp.States {
segmentID := state.SegmentID
segmentStates[segmentID] = state
channelName := state.StartPosition.ChannelName
if _, ok := channel2segs[channelName]; !ok {
segments := make([]UniqueID, 0)
segments = append(segments, segmentID)
channel2segs[channelName] = segments
var flatChannelName string
// channelNames := make([]string, 0)
// for i, str := range state.StartPositions {
// flatChannelName += str.ChannelName
// channelNames = append(channelNames, str.ChannelName)
// if i+1 < len(state.StartPositions) {
// flatChannelName += "/"
// }
// }
if flatChannelName == "" {
log.Fatal("segmentState's channel name is empty")
}
if _, ok := channel2id[flatChannelName]; !ok {
channel2id[flatChannelName] = offset
//id2channels[offset] = channelNames
id2segs[offset] = make([]UniqueID, 0)
id2segs[offset] = append(id2segs[offset], segmentID)
offset++
} else {
channel2segs[channelName] = append(channel2segs[channelName], segmentID)
//TODO::check channel name
id := channel2id[flatChannelName]
id2segs[id] = append(id2segs[id], segmentID)
}
}
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory)
for channel, segmentIDs := range channel2segs {
sort.Slice(segmentIDs, func(i, j int) bool {
return segmentStates[segmentIDs[i]].StartPosition.Timestamp < segmentStates[segmentIDs[j]].StartPosition.Timestamp
})
var channelLoadDone = false
for _, node := range qs.queryNodes {
channels2node := node.dmChannelNames
for _, ch := range channels2node {
if channel == ch {
channelLoadDone = true
for key, value := range id2segs {
sort.Slice(value, func(i, j int) bool { return segmentStates[value[i]].CreateTime < segmentStates[value[j]].CreateTime })
selectedSegs := make([]UniqueID, 0)
for i, v := range value {
if segmentStates[v].State == commonpb.SegmentState_SegmentFlushed {
selectedSegs = append(selectedSegs, v)
} else {
if i > 0 && segmentStates[selectedSegs[i-1]].State != commonpb.SegmentState_SegmentFlushed {
break
}
}
if channelLoadDone {
break
}
}
if !channelLoadDone {
states := make([]*datapb.SegmentStateInfo, 0)
for _, id := range segmentIDs {
states = append(states, segmentStates[id])
}
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
PartitionID: partitionID,
SegmentIDs: segmentIDs,
SegmentStates: states,
}
dmChannels := []string{channel}
nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels)
err = qs.watchDmChannels(dmChannels, nodeIDs, collection)
if err != nil {
return fn(err), err
}
queryNode := qs.queryNodes[nodeIDs[0]]
//TODO:: seek when loadSegment may cause more msgs consumed
status, err := queryNode.LoadSegments(loadSegmentRequest)
if err != nil {
return status, err
selectedSegs = append(selectedSegs, v)
}
}
id2segs[key] = selectedSegs
}
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory)
}
if len(collection.dmChannelNames) == 0 {
channelRequest := datapb.InsertChannelRequest{
DbID: dbID,
CollectionID: collectionID,
}
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory)
dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
if err != nil {
return fn(err), err
}
for _, partitionID := range partitionIDs {
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
PartitionID: partitionID,
}
for _, node := range qs.queryNodes {
_, err := node.LoadSegments(loadSegmentRequest)
if err != nil {
return fn(err), nil
// TODO:: filter channel for query node
for channels, i := range channel2id {
for key, node := range qs.queryNodes {
if channels == node.insertChannels {
statesID := id2segs[i][len(id2segs[i])-1]
//TODO :: should be start position
// position := segmentStates[statesID-1].StartPositions
// segmentStates[statesID].StartPositions = position
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
PartitionID: partitionID,
SegmentIDs: id2segs[i],
LastSegmentState: segmentStates[statesID],
}
status, err := qs.queryNodes[key].LoadSegments(loadSegmentRequest)
if err != nil {
return status, err
}
}
}
nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels)
err = qs.watchDmChannels(dmChannels, nodeIDs, collection)
}
if err != nil {
return fn(err), err
}
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory)
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
......@@ -475,13 +407,6 @@ func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest)
}
segmentIDs = append(segmentIDs, res...)
err = qs.replica.releasePartition(dbID, collectionID, partitionID)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, err
}
}
releaseSegmentRequest := &querypb.ReleaseSegmentRequest{
Base: req.Base,
......@@ -498,7 +423,6 @@ func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest)
}
}
//TODO:: queryNode cancel subscribe dmChannels
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
......@@ -562,14 +486,14 @@ func (qs *QueryService) GetSegmentInfo(req *querypb.SegmentInfoRequest) (*queryp
}
func NewQueryService(ctx context.Context) (*QueryService, error) {
nodes := make(map[UniqueID]*queryNodeInfo)
nodes := make([]*queryNodeInfo, 0)
ctx1, cancel := context.WithCancel(ctx)
replica := newMetaReplica()
service := &QueryService{
loopCtx: ctx1,
loopCancel: cancel,
replica: replica,
queryNodes: nodes,
replica: replica,
numRegisterNode: 0,
numQueryChannel: 0,
enableGrpc: false,
......@@ -590,77 +514,3 @@ func (qs *QueryService) SetDataService(dataService DataServiceInterface) {
func (qs *QueryService) SetEnableGrpc(en bool) {
qs.enableGrpc = en
}
func (qs *QueryService) watchDmChannels(dmChannels []string, assignedNodeIDs []UniqueID, collection *collection) error {
err := qs.replica.addDmChannels(collection.id, dmChannels)
if err != nil {
return err
}
node2channels := make(map[UniqueID][]string)
for i, channel := range dmChannels {
nodeID := assignedNodeIDs[i]
findChannel := false
for _, ch := range collection.dmChannelNames {
if channel == ch {
findChannel = true
}
}
if !findChannel {
if _, ok := node2channels[nodeID]; ok {
node2channels[nodeID] = append(node2channels[nodeID], channel)
} else {
channels := make([]string, 0)
channels = append(channels, channel)
node2channels[nodeID] = channels
}
}
}
for nodeID, channels := range node2channels {
node := qs.queryNodes[nodeID]
request := &querypb.WatchDmChannelsRequest{
ChannelIDs: channels,
}
_, err := node.WatchDmChannels(request)
node.AddDmChannels(channels)
if err != nil {
return err
}
}
return nil
}
func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) []UniqueID {
maxNumDMChannel := 0
res := make([]UniqueID, 0)
node2lens := make(map[UniqueID]int)
for id, node := range qs.queryNodes {
node2lens[id] = len(node.dmChannelNames)
}
offset := 0
for {
lastOffset := offset
for id, len := range node2lens {
if len >= maxNumDMChannel {
maxNumDMChannel = len
} else {
res = append(res, id)
node2lens[id]++
offset++
}
}
if lastOffset == offset {
for id := range node2lens {
res = append(res, id)
node2lens[id]++
offset++
break
}
}
if offset == len(dmChannels) {
break
}
}
return res
}
......@@ -155,9 +155,6 @@ func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datap
return ret, nil
}
func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) {
return []string{"test-insert"}, nil
}
func TestQueryService_Init(t *testing.T) {
service, err := NewQueryService(context.Background())
......
......@@ -63,19 +63,13 @@ func (producer *MsgProducer) broadcastMsg() {
func (producer *MsgProducer) Start(ctx context.Context) {
producer.ctx, producer.cancel = context.WithCancel(ctx)
producer.wg.Add(2 + len(producer.watchers))
go producer.startTTBarrier()
producer.wg.Add(1 + len(producer.watchers))
for _, watcher := range producer.watchers {
go producer.startWatcher(watcher)
}
go producer.broadcastMsg()
}
func (producer *MsgProducer) startTTBarrier() {
defer producer.wg.Done()
producer.ttBarrier.StartBackgroundLoop(producer.ctx)
}
func (producer *MsgProducer) startWatcher(watcher TimeTickWatcher) {
defer producer.wg.Done()
watcher.StartBackgroundLoop(producer.ctx)
......
......@@ -18,7 +18,7 @@ type (
TimeTickBarrier interface {
GetTimeTick() (Timestamp, error)
StartBackgroundLoop(ctx context.Context)
StartBackgroundLoop()
}
softTimeTickBarrier struct {
......@@ -38,7 +38,7 @@ type (
}
)
func NewSoftTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier {
func NewSoftTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier {
if len(peerIds) <= 0 {
log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!\n")
return nil
......@@ -49,6 +49,7 @@ func NewSoftTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID, minTtInte
sttbarrier.ttStream = ttStream
sttbarrier.outTt = make(chan Timestamp, 1024)
sttbarrier.peer2LastTt = make(map[UniqueID]Timestamp)
sttbarrier.ctx = ctx
for _, id := range peerIds {
sttbarrier.peer2LastTt[id] = Timestamp(0)
}
......@@ -79,12 +80,11 @@ func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) {
}
}
func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop(ctx context.Context) {
ttBarrier.ctx = ctx
func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop() {
for {
select {
case <-ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ctx.Err())
case <-ttBarrier.ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err())
return
default:
}
......@@ -137,14 +137,13 @@ func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp, error) {
}
}
func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop(ctx context.Context) {
ttBarrier.ctx = ctx
func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop() {
// Last timestamp synchronized
state := Timestamp(0)
for {
select {
case <-ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ctx.Err())
case <-ttBarrier.ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err())
return
default:
}
......@@ -188,7 +187,7 @@ func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp {
return tempMin
}
func NewHardTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier {
func NewHardTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier {
if len(peerIds) <= 0 {
log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!")
return nil
......@@ -199,6 +198,7 @@ func NewHardTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID) *hardTime
sttbarrier.outTt = make(chan Timestamp, 1024)
sttbarrier.peer2Tt = make(map[UniqueID]Timestamp)
sttbarrier.ctx = ctx
for _, id := range peerIds {
sttbarrier.peer2Tt[id] = Timestamp(0)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册