diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index e577c525c89969c9ac6d7d4d23e81418c5c79a86..5599216bede48787d937a7edb66382dbf817e670 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -56,11 +56,12 @@ type ( } DataNode struct { - ctx context.Context - cancel context.CancelFunc - NodeID UniqueID - Role string - State internalpb2.StateCode + ctx context.Context + cancel context.CancelFunc + NodeID UniqueID + Role string + State internalpb2.StateCode + watchDm chan struct{} dataSyncService *dataSyncService metaService *metaService @@ -81,11 +82,13 @@ func NewDataNode(ctx context.Context) *DataNode { Params.Init() ctx2, cancel2 := context.WithCancel(ctx) node := &DataNode{ - ctx: ctx2, - cancel: cancel2, - NodeID: Params.NodeID, // GOOSE TODO: How to init - Role: typeutil.DataNodeRole, - State: internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic + ctx: ctx2, + cancel: cancel2, + NodeID: Params.NodeID, // GOOSE TODO: How to init + Role: typeutil.DataNodeRole, + State: internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic + watchDm: make(chan struct{}), + dataSyncService: nil, metaService: nil, masterService: nil, @@ -135,6 +138,13 @@ func (node *DataNode) Init() error { return errors.Errorf("Register node failed: %v", err) } + select { + case <-time.After(RPCConnectionTimeout): + return errors.New("Get DmChannels failed in 30 seconds") + case <-node.watchDm: + log.Println("insert channel names set") + } + for _, kv := range resp.InitParams.StartParams { switch kv.Key { case "DDChannelName": @@ -162,10 +172,10 @@ func (node *DataNode) Init() error { node.flushChan = make(chan *flushMsg, chanSize) node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc) + node.dataSyncService.init() node.metaService = newMetaService(node.ctx, replica, node.masterService) node.replica = replica - node.dataSyncService.initNodes() // --- Opentracing --- cfg := &config.Configuration{ @@ -191,14 +201,9 @@ func (node *DataNode) Init() error { func (node *DataNode) Start() error { node.metaService.init() - return nil -} - -// DataNode is HEALTHY until StartSync() is called -func (node *DataNode) StartSync() { - node.dataSyncService.init() go node.dataSyncService.start() node.State = internalpb2.StateCode_HEALTHY + return nil } func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) { @@ -219,7 +224,7 @@ func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*common default: Params.InsertChannelNames = in.GetChannelNames() status.ErrorCode = commonpb.ErrorCode_SUCCESS - node.StartSync() + node.watchDm <- struct{}{} return status, nil } } diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 5b1c54e09e8accdb6ef8f923fa7103f0ddc751eb..fcb2c5cce87aaeca457e041f04769ea6afd50192 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -94,7 +94,6 @@ type ( ddChannelName string segmentInfoStream msgstream.MsgStream insertChannels []string - ttBarrier timesync.TimeTickBarrier } ) @@ -178,23 +177,23 @@ func (s *Server) initSegmentInfoChannel() { s.segmentInfoStream.Start() } func (s *Server) initMsgProducer() error { - var err error factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024) - if s.ttMsgStream, err = factory.NewMsgStream(s.ctx); err != nil { - return err - } - s.ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName) + ttMsgStream, _ := factory.NewMsgStream(s.ctx) + ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName) + s.ttMsgStream = ttMsgStream s.ttMsgStream.Start() - if s.k2sMsgStream, err = factory.NewMsgStream(s.ctx); err != nil { - return err - } - s.k2sMsgStream.AsProducer(Params.K2SChannelNames) - s.k2sMsgStream.Start() + timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs()) dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster) + k2sStream, _ := factory.NewMsgStream(s.ctx) + k2sStream.AsProducer(Params.K2SChannelNames) + s.k2sMsgStream = k2sStream + s.k2sMsgStream.Start() k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream) - if s.msgProducer, err = timesync.NewTimeSyncMsgProducer(s.ttBarrier, dataNodeTTWatcher, k2sMsgWatcher); err != nil { + producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier, dataNodeTTWatcher, k2sMsgWatcher) + if err != nil { return err } + s.msgProducer = producer s.msgProducer.Start(s.ctx) return nil } @@ -298,11 +297,10 @@ func (s *Server) checkMasterIsHealthy() error { func (s *Server) startServerLoop() { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - s.serverLoopWg.Add(4) + s.serverLoopWg.Add(3) go s.startStatsChannel(s.serverLoopCtx) go s.startSegmentFlushChannel(s.serverLoopCtx) go s.startDDChannel(s.serverLoopCtx) - go s.startTTBarrier(s.serverLoopCtx) } func (s *Server) startStatsChannel(ctx context.Context) { @@ -390,12 +388,6 @@ func (s *Server) startDDChannel(ctx context.Context) { } } -func (s *Server) startTTBarrier(ctx context.Context) { - defer s.serverLoopWg.Done() - s.ttBarrier = timesync.NewHardTimeTickBarrier(ctx, s.ttMsgStream, s.cluster.GetNodeIDs()) - s.ttBarrier.StartBackgroundLoop() -} - func (s *Server) waitDataNodeRegister() { log.Println("waiting data node to register") <-s.registerFinishCh diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 2c605b93f4fae2c5dd6ac6e141c61ab4efde1f0f..9821405c9f594b0004bf8fa400ffe8271c8e1ab7 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -76,7 +76,6 @@ func (s *Server) Start() error { func (s *Server) Stop() error { err := s.core.Stop() - s.cancel() s.grpcServer.GracefulStop() return err } diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index b6ddde255d779cb0c51662e3950d2d95247b6b0e..06687170d7e6b90c3eeeba7e628644f9301ddf50 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -35,7 +35,7 @@ type Server struct { func (s *Server) Init() error { log.Println("query service init") if err := s.queryService.Init(); err != nil { - panic(err) + return err } s.queryService.SetEnableGrpc(true) return nil diff --git a/internal/proto/query_service.proto b/internal/proto/query_service.proto index 7a8b837b45d7d659c01163532fecbaa7208e9d42..aeba557f58c27c0e0042aeef154eda3bf65f659a 100644 --- a/internal/proto/query_service.proto +++ b/internal/proto/query_service.proto @@ -124,7 +124,7 @@ message LoadSegmentRequest { int64 partitionID = 4; repeated int64 segmentIDs = 5; repeated int64 fieldIDs = 6; - data.SegmentStateInfo last_segment_state = 7; + repeated data.SegmentStateInfo segment_states = 7; } message ReleaseSegmentRequest { diff --git a/internal/proto/querypb/query_service.pb.go b/internal/proto/querypb/query_service.pb.go index 64d1bb5b438d590d2e036f699473520002b09ae8..833984a93a0bff1f2717c69738f86a79d45081e9 100644 --- a/internal/proto/querypb/query_service.pb.go +++ b/internal/proto/querypb/query_service.pb.go @@ -972,16 +972,16 @@ func (m *WatchDmChannelsRequest) GetChannelIDs() []string { } type LoadSegmentRequest struct { - Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` - DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"` - CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"` - PartitionID int64 `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"` - SegmentIDs []int64 `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"` - FieldIDs []int64 `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"` - LastSegmentState *datapb.SegmentStateInfo `protobuf:"bytes,7,opt,name=last_segment_state,json=lastSegmentState,proto3" json:"last_segment_state,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"` + DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"` + CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"` + PartitionID int64 `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"` + SegmentIDs []int64 `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"` + FieldIDs []int64 `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"` + SegmentStates []*datapb.SegmentStateInfo `protobuf:"bytes,7,rep,name=segment_states,json=segmentStates,proto3" json:"segment_states,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *LoadSegmentRequest) Reset() { *m = LoadSegmentRequest{} } @@ -1051,9 +1051,9 @@ func (m *LoadSegmentRequest) GetFieldIDs() []int64 { return nil } -func (m *LoadSegmentRequest) GetLastSegmentState() *datapb.SegmentStateInfo { +func (m *LoadSegmentRequest) GetSegmentStates() []*datapb.SegmentStateInfo { if m != nil { - return m.LastSegmentState + return m.SegmentStates } return nil } @@ -1395,91 +1395,90 @@ func init() { func init() { proto.RegisterFile("query_service.proto", fileDescriptor_5fcb6756dc1afb8d) } var fileDescriptor_5fcb6756dc1afb8d = []byte{ - // 1331 bytes of a gzipped FileDescriptorProto + // 1324 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0x5f, 0x73, 0xdb, 0x44, 0x10, 0xb7, 0x6c, 0xc7, 0xa9, 0x37, 0xae, 0xed, 0x5e, 0xfe, 0x19, 0x51, 0x4a, 0x39, 0xa0, 0x4d, - 0x13, 0x70, 0x98, 0x74, 0x60, 0x78, 0x82, 0x49, 0xe2, 0x4e, 0xc6, 0x33, 0x34, 0xa4, 0x72, 0x3a, - 0x1d, 0x02, 0x1d, 0x23, 0x4b, 0x17, 0xe7, 0x5a, 0xfd, 0x71, 0x75, 0xe7, 0xa4, 0xc9, 0x0b, 0x30, - 0xc3, 0x23, 0x03, 0x9f, 0x81, 0x81, 0x81, 0x19, 0x5e, 0xf8, 0x36, 0xbc, 0xf0, 0x02, 0xdf, 0x84, - 0xd1, 0x49, 0x56, 0x24, 0x59, 0x8e, 0x9c, 0xba, 0x69, 0x78, 0xd3, 0x9d, 0xf6, 0xf6, 0xb7, 0xfb, - 0xdb, 0xbd, 0xbd, 0x5d, 0x98, 0x7d, 0xd6, 0x27, 0xce, 0x71, 0x9b, 0x11, 0xe7, 0x90, 0x6a, 0xa4, - 0xde, 0x73, 0x6c, 0x6e, 0x23, 0x64, 0x52, 0xe3, 0xb0, 0xcf, 0xbc, 0x55, 0x5d, 0x48, 0xc8, 0x25, - 0xcd, 0x36, 0x4d, 0xdb, 0xf2, 0xf6, 0xe4, 0x52, 0x58, 0x42, 0x2e, 0x53, 0x8b, 0x13, 0xc7, 0x52, - 0x0d, 0x7f, 0x8d, 0x74, 0x95, 0xab, 0x51, 0x9d, 0xf8, 0x1b, 0x98, 0x55, 0x48, 0x97, 0x32, 0x4e, - 0x9c, 0x6d, 0x5b, 0x27, 0x0a, 0x79, 0xd6, 0x27, 0x8c, 0xa3, 0x0f, 0x20, 0xdf, 0x51, 0x19, 0xa9, - 0x49, 0x37, 0xa5, 0xa5, 0x99, 0xb5, 0xeb, 0xf5, 0x08, 0xb2, 0x0f, 0x79, 0x9f, 0x75, 0x37, 0x54, - 0x46, 0x14, 0x21, 0x89, 0x3e, 0x82, 0x69, 0x55, 0xd7, 0x1d, 0xc2, 0x58, 0x2d, 0x7b, 0xc6, 0xa1, - 0x75, 0x4f, 0x46, 0x19, 0x08, 0xe3, 0x9f, 0x24, 0x98, 0x8b, 0x5a, 0xc0, 0x7a, 0xb6, 0xc5, 0x08, - 0xba, 0x0b, 0x05, 0xc6, 0x55, 0xde, 0x67, 0xbe, 0x11, 0xaf, 0x27, 0xea, 0x6b, 0x09, 0x11, 0xc5, - 0x17, 0x45, 0x1b, 0x30, 0x43, 0x2d, 0xca, 0xdb, 0x3d, 0xd5, 0x51, 0xcd, 0x81, 0x25, 0x6f, 0x45, - 0x4f, 0x06, 0xac, 0x34, 0x2d, 0xca, 0x77, 0x84, 0xa0, 0x02, 0x34, 0xf8, 0xc6, 0x8f, 0x61, 0xbe, - 0x75, 0x60, 0x1f, 0x6d, 0xda, 0x86, 0x41, 0x34, 0x4e, 0x6d, 0xeb, 0xc5, 0x49, 0x41, 0x90, 0xd7, - 0x3b, 0xcd, 0x86, 0xb0, 0x23, 0xa7, 0x88, 0x6f, 0xcc, 0x60, 0x21, 0xae, 0x7e, 0x12, 0x8f, 0xdf, - 0x81, 0xab, 0x5a, 0xa0, 0xaa, 0xd9, 0x70, 0x7d, 0xce, 0x2d, 0xe5, 0x94, 0xe8, 0x26, 0xfe, 0x4e, - 0x82, 0xf9, 0xcf, 0x6c, 0x55, 0xbf, 0x20, 0xa7, 0x10, 0x86, 0x52, 0x18, 0xb0, 0x96, 0x13, 0xff, - 0x22, 0x7b, 0xf8, 0x7b, 0x09, 0x6a, 0x0a, 0x31, 0x88, 0xca, 0xc8, 0x65, 0x9a, 0xf1, 0xad, 0x04, - 0x73, 0x6e, 0x00, 0x76, 0x54, 0x87, 0xd3, 0xcb, 0x31, 0xa1, 0xe7, 0x65, 0x58, 0xc8, 0x82, 0x49, - 0x32, 0x00, 0x43, 0xa9, 0x37, 0xd0, 0x74, 0x9a, 0x00, 0x91, 0x3d, 0x6c, 0x42, 0x25, 0x40, 0x73, - 0x8f, 0x13, 0x86, 0x6e, 0xc2, 0x4c, 0x48, 0x44, 0x00, 0xe6, 0x94, 0xf0, 0x16, 0xfa, 0x18, 0xa6, - 0x5c, 0x08, 0x22, 0xfc, 0x2b, 0xaf, 0xe1, 0xfa, 0x70, 0xfd, 0xa9, 0x47, 0xb5, 0x2a, 0xde, 0x01, - 0xfc, 0x9b, 0x04, 0x0b, 0x31, 0xbc, 0x57, 0xce, 0xf2, 0x10, 0x2f, 0xf9, 0x04, 0x5e, 0xfe, 0x90, - 0x60, 0x71, 0xc8, 0xd0, 0x49, 0x82, 0xb1, 0x07, 0x0b, 0x01, 0x40, 0x5b, 0x27, 0x4c, 0x73, 0x68, - 0xcf, 0xfd, 0xf6, 0xc2, 0x32, 0xb3, 0xf6, 0x76, 0x3a, 0x89, 0x4c, 0x99, 0x0f, 0x54, 0x34, 0x42, - 0x1a, 0xf0, 0xaf, 0x12, 0xcc, 0xb9, 0x97, 0xf8, 0xf2, 0x32, 0x77, 0x2c, 0x4e, 0x7f, 0x97, 0x60, - 0xd1, 0xbf, 0xe7, 0xff, 0x73, 0x4b, 0x7f, 0x96, 0x40, 0xde, 0x74, 0x88, 0xca, 0xc9, 0x03, 0x37, - 0x0e, 0x9b, 0x07, 0xaa, 0x65, 0x11, 0x63, 0xb2, 0x04, 0xb8, 0x0d, 0x15, 0xc7, 0x73, 0xb6, 0xad, - 0x79, 0xfa, 0x84, 0xe9, 0x45, 0xa5, 0xec, 0x6f, 0xfb, 0x28, 0xe8, 0x5d, 0x28, 0x3b, 0x84, 0xf5, - 0x8d, 0x53, 0xb9, 0x9c, 0x90, 0xbb, 0xea, 0xed, 0xfa, 0x62, 0xf8, 0x17, 0x09, 0x16, 0xd7, 0x75, - 0x3d, 0x6c, 0xe0, 0x04, 0x77, 0x69, 0x05, 0xae, 0xc5, 0xac, 0xf3, 0xa9, 0x2d, 0x2a, 0xd5, 0xa8, - 0x7d, 0xcd, 0x06, 0xba, 0x03, 0xd5, 0xa8, 0x85, 0x3e, 0xd5, 0x45, 0xa5, 0x12, 0xb1, 0xb1, 0xd9, - 0xc0, 0x7f, 0x4b, 0x20, 0x2b, 0xc4, 0xb4, 0x0f, 0x49, 0xa2, 0xa1, 0x2f, 0xc4, 0xe4, 0xc0, 0xbb, - 0xec, 0x64, 0xde, 0xe5, 0xce, 0xe1, 0x5d, 0x3e, 0xd9, 0xbb, 0x27, 0xb0, 0xf0, 0x48, 0xe5, 0xda, - 0x41, 0xc3, 0x9c, 0x3c, 0x02, 0x37, 0x00, 0x02, 0x3c, 0xaf, 0x28, 0x14, 0x95, 0xd0, 0x0e, 0xfe, - 0x33, 0x0b, 0xc8, 0xbd, 0xe4, 0x2d, 0xd2, 0x35, 0x89, 0xc5, 0x5f, 0xfd, 0xc5, 0x89, 0xbd, 0x0b, - 0xf9, 0xe1, 0x77, 0xe1, 0x06, 0x00, 0xf3, 0xac, 0x73, 0x5d, 0x98, 0x12, 0x17, 0x2b, 0xb4, 0x83, - 0x64, 0xb8, 0xb2, 0x4f, 0x89, 0xa1, 0xbb, 0x7f, 0x0b, 0xe2, 0x6f, 0xb0, 0x46, 0x0f, 0x00, 0x19, - 0x2a, 0xe3, 0x6d, 0x5f, 0xbc, 0xed, 0x3d, 0x30, 0xd3, 0xc2, 0xab, 0x58, 0x6d, 0x74, 0xbb, 0xd5, - 0xba, 0x4f, 0x83, 0x28, 0x8c, 0x4d, 0x6b, 0xdf, 0x56, 0xaa, 0xee, 0xf1, 0xf0, 0x2e, 0xfe, 0x57, - 0x82, 0x79, 0xbf, 0xde, 0x5c, 0x1a, 0x69, 0x63, 0x54, 0x9b, 0x49, 0x68, 0xc3, 0x3f, 0x4a, 0xb0, - 0xb8, 0x69, 0x9b, 0x3d, 0xdb, 0x1a, 0xb8, 0x3d, 0xe1, 0x3b, 0xf5, 0x89, 0x77, 0x88, 0x0c, 0x7a, - 0xe4, 0x5b, 0x23, 0x7a, 0xe4, 0x38, 0xa8, 0x7f, 0x0a, 0xff, 0x23, 0xc1, 0x8c, 0xcf, 0xb6, 0x1b, - 0x16, 0x74, 0x1d, 0x8a, 0x81, 0x2b, 0x7e, 0x2f, 0x71, 0xba, 0x31, 0x44, 0x61, 0x36, 0x3d, 0xef, - 0x72, 0xc3, 0x79, 0xf7, 0x1a, 0x5c, 0x31, 0x89, 0xd9, 0x66, 0xf4, 0x84, 0xf8, 0x69, 0x39, 0x6d, - 0x12, 0xb3, 0x45, 0x4f, 0x88, 0xfb, 0xcb, 0xea, 0x9b, 0x6d, 0xc7, 0x3e, 0x72, 0x99, 0x15, 0xbf, - 0xac, 0xbe, 0xa9, 0xd8, 0x47, 0x0c, 0xbd, 0x01, 0x40, 0x2d, 0x9d, 0x3c, 0x6f, 0x5b, 0xaa, 0x49, - 0x6a, 0x05, 0x71, 0xc3, 0x8b, 0x62, 0x67, 0x5b, 0x35, 0x09, 0xaa, 0xc1, 0xb4, 0x58, 0x34, 0x1b, - 0x22, 0x0b, 0x73, 0xca, 0x60, 0x89, 0xf7, 0x01, 0x85, 0x3c, 0x9c, 0xe8, 0xc6, 0x87, 0xe2, 0x9e, - 0x8d, 0xc7, 0xdd, 0xed, 0xcd, 0x67, 0x23, 0x40, 0x93, 0xc4, 0xf5, 0x43, 0x98, 0xa2, 0xd6, 0xbe, - 0x3d, 0x68, 0x37, 0xde, 0x4c, 0x6a, 0x37, 0xc2, 0x60, 0x9e, 0xf4, 0xf2, 0x09, 0x94, 0xa3, 0x4d, - 0x08, 0x2a, 0xc1, 0x95, 0x6d, 0x9b, 0xdf, 0x7b, 0x4e, 0x19, 0xaf, 0x66, 0x50, 0x19, 0x60, 0xdb, - 0xe6, 0x3b, 0x0e, 0x61, 0xc4, 0xe2, 0x55, 0x09, 0x01, 0x14, 0x3e, 0xb7, 0x1a, 0x94, 0x3d, 0xad, - 0x66, 0xd1, 0xac, 0xdf, 0x5b, 0xaa, 0x46, 0xd3, 0xba, 0x4f, 0x4c, 0xdb, 0x39, 0xae, 0xe6, 0xdc, - 0xe3, 0xc1, 0x2a, 0x8f, 0xaa, 0x50, 0x0a, 0x44, 0xb6, 0x76, 0x1e, 0x56, 0xa7, 0x50, 0x11, 0xa6, - 0xbc, 0xcf, 0xc2, 0xda, 0x0f, 0x00, 0x25, 0xf1, 0x6a, 0xb4, 0xbc, 0xc9, 0x14, 0x69, 0x50, 0x0a, - 0x4f, 0x84, 0xe8, 0x76, 0x92, 0x13, 0x09, 0x53, 0xab, 0xbc, 0x94, 0x2e, 0xe8, 0x71, 0x8b, 0x33, - 0xe8, 0x09, 0x54, 0xa2, 0x63, 0x18, 0x43, 0x77, 0x12, 0xc9, 0x4a, 0x1a, 0x05, 0xe5, 0xe5, 0x71, - 0x44, 0x03, 0xac, 0x2e, 0x94, 0x23, 0xfd, 0x3e, 0x43, 0x4b, 0xa3, 0xce, 0xc7, 0x3b, 0x26, 0xf9, - 0xce, 0x18, 0x92, 0x01, 0xd0, 0x17, 0x50, 0x8e, 0x34, 0x88, 0x23, 0x80, 0x92, 0x9a, 0x48, 0xf9, - 0xac, 0xf4, 0xc2, 0x19, 0xd4, 0x86, 0x6b, 0xf1, 0xa6, 0x8e, 0xa1, 0x95, 0x64, 0xc2, 0x13, 0x7b, - 0xbf, 0x34, 0x80, 0x3d, 0xcf, 0xf6, 0x53, 0x02, 0x93, 0xe3, 0x91, 0x38, 0xc5, 0xa6, 0xe9, 0xfe, - 0x3a, 0x30, 0x3e, 0xa4, 0xfe, 0xbd, 0x33, 0x8c, 0x3f, 0x37, 0x42, 0x07, 0xd0, 0x70, 0x27, 0x89, - 0xe4, 0xc4, 0x43, 0xf7, 0xcc, 0x1e, 0x3f, 0x96, 0xeb, 0x49, 0xf0, 0xa3, 0xbb, 0x51, 0x9c, 0x41, - 0x8f, 0x00, 0x6d, 0x11, 0xbe, 0x4b, 0x4d, 0xb2, 0x4b, 0xb5, 0xa7, 0xe3, 0x60, 0xc4, 0x5e, 0x54, - 0x7f, 0xd1, 0xe2, 0x0e, 0xb5, 0xba, 0x91, 0xb4, 0x99, 0xdb, 0x22, 0xa2, 0xc2, 0x53, 0xc6, 0xa9, - 0xc6, 0x5e, 0xa2, 0x6a, 0x5b, 0xd8, 0x1c, 0x9f, 0x3d, 0x97, 0xc7, 0x99, 0x82, 0x7c, 0xe2, 0x57, - 0xc6, 0x92, 0x0d, 0x00, 0xf7, 0x04, 0x60, 0xec, 0xd9, 0x3a, 0xd3, 0x93, 0x31, 0x9f, 0x3e, 0x9c, - 0x41, 0x1a, 0x94, 0x5d, 0x9e, 0x42, 0xcf, 0xde, 0xad, 0xb4, 0xfa, 0xea, 0x3b, 0x71, 0x3b, 0x55, - 0x6e, 0xe0, 0xc0, 0xda, 0x5f, 0x05, 0x28, 0x8a, 0x04, 0x10, 0xb5, 0xef, 0xc2, 0x62, 0xbe, 0x0b, - 0x15, 0x3f, 0xe6, 0x2f, 0x33, 0xdc, 0xed, 0x73, 0xb3, 0x9f, 0x18, 0xde, 0x11, 0xad, 0x0e, 0xce, - 0xa0, 0xc7, 0x50, 0x89, 0x4d, 0x43, 0xc9, 0x45, 0x68, 0xc4, 0xc8, 0x94, 0x76, 0x8d, 0x35, 0x40, - 0xc3, 0x63, 0x0c, 0xaa, 0x27, 0x57, 0x8a, 0x51, 0xe3, 0x4e, 0x1a, 0xc8, 0x57, 0x50, 0x89, 0x8d, - 0x13, 0xc9, 0x17, 0x22, 0x79, 0xe6, 0x48, 0xd3, 0xfe, 0x10, 0x4a, 0xa1, 0xf9, 0x81, 0x25, 0xa7, - 0xe8, 0xf0, 0x84, 0x91, 0xa6, 0xf6, 0x4b, 0xa8, 0x44, 0x9b, 0xec, 0x11, 0xef, 0x65, 0x62, 0x27, - 0x9e, 0x4e, 0xfb, 0xc5, 0x5f, 0xac, 0x8d, 0xf5, 0xbd, 0x4f, 0xbb, 0x94, 0x1f, 0xf4, 0x3b, 0x2e, - 0xfc, 0xea, 0x09, 0x35, 0x0c, 0x7a, 0xc2, 0x89, 0x76, 0xb0, 0xea, 0x69, 0x78, 0x5f, 0xa7, 0x8c, - 0x3b, 0xb4, 0xd3, 0xe7, 0x44, 0x5f, 0x1d, 0x14, 0x81, 0x55, 0xa1, 0x76, 0x55, 0xa8, 0xed, 0x75, - 0x3a, 0x05, 0xb1, 0xbc, 0xfb, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe1, 0xdf, 0xec, 0xde, 0x9f, - 0x17, 0x00, 0x00, + 0x5b, 0x70, 0x98, 0x74, 0x60, 0x78, 0x82, 0x69, 0xe3, 0x4e, 0xc6, 0x0c, 0x0d, 0x41, 0x4e, 0xa7, + 0x43, 0xa0, 0x63, 0x64, 0xe9, 0xe2, 0x5c, 0x6b, 0x49, 0xae, 0xee, 0x9c, 0x34, 0x79, 0x01, 0x66, + 0x78, 0x64, 0xe0, 0x33, 0x30, 0x30, 0xc0, 0xf0, 0x81, 0x78, 0xe1, 0x05, 0xbe, 0x09, 0xa3, 0x93, + 0xac, 0x48, 0xf2, 0x39, 0x72, 0xea, 0xa6, 0xe1, 0x4d, 0x77, 0xda, 0xdb, 0xdf, 0xee, 0x6f, 0xf7, + 0xf6, 0x76, 0x61, 0xfe, 0xe9, 0x80, 0xb8, 0x87, 0x6d, 0x46, 0xdc, 0x7d, 0x6a, 0x90, 0x7a, 0xdf, + 0x75, 0xb8, 0x83, 0x90, 0x45, 0x7b, 0xfb, 0x03, 0xe6, 0xaf, 0xea, 0x42, 0x42, 0x2d, 0x19, 0x8e, + 0x65, 0x39, 0xb6, 0xbf, 0xa7, 0x96, 0xa2, 0x12, 0x6a, 0x99, 0xda, 0x9c, 0xb8, 0xb6, 0xde, 0x0b, + 0xd6, 0xc8, 0xd4, 0xb9, 0x1e, 0xd7, 0x89, 0xbf, 0x81, 0x79, 0x8d, 0x74, 0x29, 0xe3, 0xc4, 0xdd, + 0x74, 0x4c, 0xa2, 0x91, 0xa7, 0x03, 0xc2, 0x38, 0x7a, 0x0f, 0xf2, 0x1d, 0x9d, 0x91, 0x9a, 0x72, + 0x55, 0x59, 0x99, 0x5b, 0xbb, 0x5c, 0x8f, 0x21, 0x07, 0x90, 0xf7, 0x59, 0xf7, 0xae, 0xce, 0x88, + 0x26, 0x24, 0xd1, 0x07, 0x30, 0xab, 0x9b, 0xa6, 0x4b, 0x18, 0xab, 0x65, 0x4f, 0x38, 0x74, 0xc7, + 0x97, 0xd1, 0x86, 0xc2, 0xf8, 0x27, 0x05, 0x16, 0xe2, 0x16, 0xb0, 0xbe, 0x63, 0x33, 0x82, 0x6e, + 0x43, 0x81, 0x71, 0x9d, 0x0f, 0x58, 0x60, 0xc4, 0xab, 0x52, 0x7d, 0x2d, 0x21, 0xa2, 0x05, 0xa2, + 0xe8, 0x2e, 0xcc, 0x51, 0x9b, 0xf2, 0x76, 0x5f, 0x77, 0x75, 0x6b, 0x68, 0xc9, 0x1b, 0xf1, 0x93, + 0x21, 0x2b, 0x4d, 0x9b, 0xf2, 0x2d, 0x21, 0xa8, 0x01, 0x0d, 0xbf, 0xf1, 0x23, 0x58, 0x6c, 0xed, + 0x39, 0x07, 0xeb, 0x4e, 0xaf, 0x47, 0x0c, 0x4e, 0x1d, 0xfb, 0xf9, 0x49, 0x41, 0x90, 0x37, 0x3b, + 0xcd, 0x86, 0xb0, 0x23, 0xa7, 0x89, 0x6f, 0xcc, 0x60, 0x29, 0xa9, 0x7e, 0x1a, 0x8f, 0xdf, 0x82, + 0x8b, 0x46, 0xa8, 0xaa, 0xd9, 0xf0, 0x7c, 0xce, 0xad, 0xe4, 0xb4, 0xf8, 0x26, 0xfe, 0x4e, 0x81, + 0xc5, 0x4f, 0x1d, 0xdd, 0x3c, 0x23, 0xa7, 0x10, 0x86, 0x52, 0x14, 0xb0, 0x96, 0x13, 0xff, 0x62, + 0x7b, 0xf8, 0x7b, 0x05, 0x6a, 0x1a, 0xe9, 0x11, 0x9d, 0x91, 0xf3, 0x34, 0xe3, 0x5b, 0x05, 0x16, + 0xbc, 0x00, 0x6c, 0xe9, 0x2e, 0xa7, 0xe7, 0x63, 0x42, 0xdf, 0xcf, 0xb0, 0x88, 0x05, 0xd3, 0x64, + 0x00, 0x86, 0x52, 0x7f, 0xa8, 0xe9, 0x38, 0x01, 0x62, 0x7b, 0xd8, 0x82, 0x4a, 0x88, 0xe6, 0x1d, + 0x27, 0x0c, 0x5d, 0x85, 0xb9, 0x88, 0x88, 0x00, 0xcc, 0x69, 0xd1, 0x2d, 0xf4, 0x21, 0xcc, 0x78, + 0x10, 0x44, 0xf8, 0x57, 0x5e, 0xc3, 0xf5, 0xd1, 0xfa, 0x53, 0x8f, 0x6b, 0xd5, 0xfc, 0x03, 0xf8, + 0x37, 0x05, 0x96, 0x12, 0x78, 0x2f, 0x9d, 0xe5, 0x11, 0x5e, 0xf2, 0x12, 0x5e, 0xfe, 0x54, 0x60, + 0x79, 0xc4, 0xd0, 0x69, 0x82, 0xb1, 0x03, 0x4b, 0x21, 0x40, 0xdb, 0x24, 0xcc, 0x70, 0x69, 0xdf, + 0xfb, 0xf6, 0xc3, 0x32, 0xb7, 0xf6, 0x66, 0x3a, 0x89, 0x4c, 0x5b, 0x0c, 0x55, 0x34, 0x22, 0x1a, + 0xf0, 0xaf, 0x0a, 0x2c, 0x78, 0x97, 0xf8, 0xfc, 0x32, 0x77, 0x22, 0x4e, 0x7f, 0x57, 0x60, 0x39, + 0xb8, 0xe7, 0xff, 0x73, 0x4b, 0x7f, 0x56, 0x40, 0x5d, 0x77, 0x89, 0xce, 0xc9, 0xe7, 0x5e, 0x1c, + 0xd6, 0xf7, 0x74, 0xdb, 0x26, 0xbd, 0xe9, 0x12, 0xe0, 0x3a, 0x54, 0x5c, 0xdf, 0xd9, 0xb6, 0xe1, + 0xeb, 0x13, 0xa6, 0x17, 0xb5, 0x72, 0xb0, 0x1d, 0xa0, 0xa0, 0xb7, 0xa1, 0xec, 0x12, 0x36, 0xe8, + 0x1d, 0xcb, 0xe5, 0x84, 0xdc, 0x45, 0x7f, 0x37, 0x10, 0xc3, 0xbf, 0x28, 0xb0, 0x7c, 0xc7, 0x34, + 0xa3, 0x06, 0x4e, 0x71, 0x97, 0x6e, 0xc1, 0xa5, 0x84, 0x75, 0x01, 0xb5, 0x45, 0xad, 0x1a, 0xb7, + 0xaf, 0xd9, 0x40, 0x37, 0xa0, 0x1a, 0xb7, 0x30, 0xa0, 0xba, 0xa8, 0x55, 0x62, 0x36, 0x36, 0x1b, + 0xf8, 0x6f, 0x05, 0x54, 0x8d, 0x58, 0xce, 0x3e, 0x91, 0x1a, 0xfa, 0x5c, 0x4c, 0x0e, 0xbd, 0xcb, + 0x4e, 0xe7, 0x5d, 0xee, 0x14, 0xde, 0xe5, 0xe5, 0xde, 0x3d, 0x86, 0xa5, 0x87, 0x3a, 0x37, 0xf6, + 0x1a, 0xd6, 0xf4, 0x11, 0xb8, 0x02, 0x10, 0xe2, 0xf9, 0x45, 0xa1, 0xa8, 0x45, 0x76, 0xf0, 0x1f, + 0x59, 0x40, 0xde, 0x25, 0x6f, 0x91, 0xae, 0x45, 0x6c, 0xfe, 0xf2, 0x2f, 0x4e, 0xe2, 0x5d, 0xc8, + 0x8f, 0xbe, 0x0b, 0x57, 0x00, 0x98, 0x6f, 0x9d, 0xe7, 0xc2, 0x8c, 0xb8, 0x58, 0x91, 0x1d, 0xa4, + 0xc2, 0x85, 0x5d, 0x4a, 0x7a, 0xa6, 0xf7, 0xb7, 0x20, 0xfe, 0x86, 0x6b, 0xf4, 0x09, 0x94, 0x03, + 0xc9, 0xb6, 0x78, 0x2a, 0x58, 0x6d, 0x56, 0x56, 0x17, 0xbd, 0x4e, 0xb5, 0x1e, 0x50, 0x20, 0x8a, + 0x62, 0xd3, 0xde, 0x75, 0xb4, 0x8b, 0x2c, 0xb2, 0xc3, 0xf0, 0xbf, 0x0a, 0x2c, 0x06, 0x85, 0xe6, + 0xdc, 0xd8, 0x9a, 0xa0, 0xcc, 0x4c, 0xc3, 0x17, 0xfe, 0x51, 0x81, 0xe5, 0x75, 0xc7, 0xea, 0x3b, + 0x76, 0xe8, 0xf7, 0x74, 0xf5, 0xe9, 0x23, 0xff, 0x10, 0x19, 0x36, 0xc7, 0xd7, 0xc6, 0x34, 0xc7, + 0x49, 0xd0, 0xe0, 0x14, 0xfe, 0x47, 0x81, 0xb9, 0x80, 0x6d, 0x2f, 0x26, 0xe8, 0x32, 0x14, 0x43, + 0x57, 0x82, 0x26, 0xe2, 0x78, 0x63, 0x84, 0xc2, 0x6c, 0x7a, 0xc2, 0xe5, 0x46, 0x13, 0xee, 0x15, + 0xb8, 0x60, 0x11, 0xab, 0xcd, 0xe8, 0x11, 0x09, 0xf2, 0x71, 0xd6, 0x22, 0x56, 0x8b, 0x1e, 0x11, + 0xef, 0x97, 0x3d, 0xb0, 0xda, 0xae, 0x73, 0xe0, 0x31, 0x2b, 0x7e, 0xd9, 0x03, 0x4b, 0x73, 0x0e, + 0x18, 0x7a, 0x0d, 0x80, 0xda, 0x26, 0x79, 0xd6, 0xb6, 0x75, 0x8b, 0xd4, 0x0a, 0xe2, 0x6a, 0x17, + 0xc5, 0xce, 0xa6, 0x6e, 0x11, 0x54, 0x83, 0x59, 0xb1, 0x68, 0x36, 0x6a, 0xb3, 0xfe, 0xc1, 0x60, + 0x89, 0x77, 0x01, 0x45, 0x3c, 0x9c, 0xea, 0xaa, 0x47, 0xe2, 0x9e, 0x4d, 0xc6, 0xdd, 0x6b, 0xca, + 0xe7, 0x63, 0x40, 0xd3, 0xc4, 0xf5, 0x7d, 0x98, 0xa1, 0xf6, 0xae, 0x33, 0xec, 0x33, 0x5e, 0x97, + 0xf5, 0x19, 0x51, 0x30, 0x5f, 0xfa, 0xe6, 0x11, 0x94, 0xe3, 0xdd, 0x07, 0x2a, 0xc1, 0x85, 0x4d, + 0x87, 0xdf, 0x7b, 0x46, 0x19, 0xaf, 0x66, 0x50, 0x19, 0x60, 0xd3, 0xe1, 0x5b, 0x2e, 0x61, 0xc4, + 0xe6, 0x55, 0x05, 0x01, 0x14, 0x3e, 0xb3, 0x1b, 0x94, 0x3d, 0xa9, 0x66, 0xd1, 0x7c, 0xd0, 0x54, + 0xea, 0xbd, 0xa6, 0x7d, 0x9f, 0x58, 0x8e, 0x7b, 0x58, 0xcd, 0x79, 0xc7, 0xc3, 0x55, 0x1e, 0x55, + 0xa1, 0x14, 0x8a, 0x6c, 0x6c, 0x3d, 0xa8, 0xce, 0xa0, 0x22, 0xcc, 0xf8, 0x9f, 0x85, 0xb5, 0x1f, + 0x00, 0x4a, 0xe2, 0xb9, 0x68, 0xf9, 0x23, 0x29, 0x32, 0xa0, 0x14, 0x1d, 0x05, 0xd1, 0x75, 0x99, + 0x13, 0x92, 0x71, 0x55, 0x5d, 0x49, 0x17, 0xf4, 0xb9, 0xc5, 0x19, 0xf4, 0x18, 0x2a, 0xf1, 0xf9, + 0x8b, 0xa1, 0x1b, 0x52, 0xb2, 0x64, 0x33, 0xa0, 0x7a, 0x73, 0x12, 0xd1, 0x10, 0xab, 0x0b, 0xe5, + 0x58, 0xa3, 0xcf, 0xd0, 0xca, 0xb8, 0xf3, 0xc9, 0x56, 0x49, 0xbd, 0x31, 0x81, 0x64, 0x08, 0xf4, + 0x05, 0x94, 0x63, 0x9d, 0xe1, 0x18, 0x20, 0x59, 0xf7, 0xa8, 0x9e, 0x94, 0x5e, 0x38, 0x83, 0xda, + 0x70, 0x29, 0xd9, 0xcd, 0x31, 0x74, 0x4b, 0x4e, 0xb8, 0xb4, 0xe9, 0x4b, 0x03, 0xd8, 0xf1, 0x6d, + 0x3f, 0x26, 0x50, 0x1e, 0x0f, 0xe9, 0xf8, 0x9a, 0xa6, 0xfb, 0xeb, 0xd0, 0xf8, 0x88, 0xfa, 0x77, + 0x4e, 0x30, 0xfe, 0xd4, 0x08, 0x1d, 0x40, 0xa3, 0x2d, 0x24, 0x52, 0xa5, 0x87, 0xee, 0x59, 0x7d, + 0x7e, 0xa8, 0xd6, 0x65, 0xf0, 0xe3, 0xdb, 0x50, 0x9c, 0x41, 0x0f, 0x01, 0x6d, 0x10, 0xbe, 0x4d, + 0x2d, 0xb2, 0x4d, 0x8d, 0x27, 0x93, 0x60, 0x24, 0x9e, 0xd3, 0x60, 0xd1, 0xe2, 0x2e, 0xb5, 0xbb, + 0xb1, 0xb4, 0x59, 0xd8, 0x20, 0xa2, 0xc2, 0x53, 0xc6, 0xa9, 0xc1, 0x5e, 0xa0, 0x6a, 0x47, 0xd8, + 0x9c, 0x1c, 0x3a, 0x6f, 0x4e, 0x32, 0xfe, 0x04, 0xc4, 0xdf, 0x9a, 0x48, 0x36, 0x04, 0xdc, 0x11, + 0x80, 0x89, 0x67, 0xeb, 0x44, 0x4f, 0x26, 0x7c, 0xfa, 0x70, 0x06, 0x19, 0x50, 0xf6, 0x78, 0x8a, + 0x3c, 0x7b, 0xd7, 0xd2, 0xea, 0x6b, 0xe0, 0xc4, 0xf5, 0x54, 0xb9, 0xa1, 0x03, 0x6b, 0x7f, 0x15, + 0xa0, 0x28, 0x12, 0x40, 0xd4, 0xbe, 0x33, 0x8b, 0xf9, 0x36, 0x54, 0x82, 0x98, 0xbf, 0xc8, 0x70, + 0xb7, 0x4f, 0xcd, 0xbe, 0x34, 0xbc, 0x63, 0x5a, 0x1d, 0x9c, 0x41, 0x8f, 0xa0, 0x92, 0x18, 0x83, + 0xe4, 0x45, 0x68, 0xcc, 0xac, 0x94, 0x76, 0x8d, 0x0d, 0x40, 0xa3, 0xf3, 0x0b, 0xaa, 0xcb, 0x2b, + 0xc5, 0xb8, 0x39, 0x27, 0x0d, 0xe4, 0x2b, 0xa8, 0x24, 0xe6, 0x08, 0xf9, 0x85, 0x90, 0x0f, 0x1b, + 0x69, 0xda, 0x1f, 0x40, 0x29, 0x32, 0x38, 0x30, 0x79, 0x8a, 0x8e, 0x8e, 0x16, 0x69, 0x6a, 0xbf, + 0x84, 0x4a, 0xbc, 0xc9, 0x1e, 0xf3, 0x5e, 0x4a, 0x3b, 0xf1, 0x74, 0xda, 0xcf, 0xfe, 0x62, 0xdd, + 0xbd, 0xb3, 0xf3, 0x71, 0x97, 0xf2, 0xbd, 0x41, 0xc7, 0x83, 0x5f, 0x3d, 0xa2, 0xbd, 0x1e, 0x3d, + 0xe2, 0xc4, 0xd8, 0x5b, 0xf5, 0x35, 0xbc, 0x6b, 0x52, 0xc6, 0x5d, 0xda, 0x19, 0x70, 0x62, 0xae, + 0x0e, 0x8b, 0xc0, 0xaa, 0x50, 0xbb, 0x2a, 0xd4, 0xf6, 0x3b, 0x9d, 0x82, 0x58, 0xde, 0xfe, 0x2f, + 0x00, 0x00, 0xff, 0xff, 0x36, 0xce, 0x45, 0x9b, 0x98, 0x17, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/internal/proxynode/impl.go b/internal/proxynode/impl.go index 33de45bdae91699c962feec2ba6327e2cd568f0f..6e951f477cfe36034e5c0bddc508a24d89d5cdcd 100644 --- a/internal/proxynode/impl.go +++ b/internal/proxynode/impl.go @@ -132,11 +132,63 @@ func (node *NodeImpl) HasCollection(request *milvuspb.HasCollectionRequest) (*mi } func (node *NodeImpl) LoadCollection(request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) { - panic("implement me") + log.Println("load collection: ", request) + ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) + defer cancel() + + lct := &LoadCollectionTask{ + Condition: NewTaskCondition(ctx), + LoadCollectionRequest: request, + queryserviceClient: node.queryServiceClient, + } + + err := node.sched.DdQueue.Enqueue(lct) + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + + err = lct.WaitToFinish() + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + + return lct.result, nil } func (node *NodeImpl) ReleaseCollection(request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) { - panic("implement me") + log.Println("release collection: ", request) + ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) + defer cancel() + + rct := &ReleaseCollectionTask{ + Condition: NewTaskCondition(ctx), + ReleaseCollectionRequest: request, + queryserviceClient: node.queryServiceClient, + } + + err := node.sched.DdQueue.Enqueue(rct) + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + + err = rct.WaitToFinish() + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + + return rct.result, nil } func (node *NodeImpl) DescribeCollection(request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { @@ -332,11 +384,63 @@ func (node *NodeImpl) HasPartition(request *milvuspb.HasPartitionRequest) (*milv } func (node *NodeImpl) LoadPartitions(request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error) { - panic("implement me") + log.Println("load partitions: ", request) + ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) + defer cancel() + + lpt := &LoadPartitionTask{ + Condition: NewTaskCondition(ctx), + LoadPartitonRequest: request, + queryserviceClient: node.queryServiceClient, + } + + err := node.sched.DdQueue.Enqueue(lpt) + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + + err = lpt.WaitToFinish() + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + + return lpt.result, nil } func (node *NodeImpl) ReleasePartitions(request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) { - panic("implement me") + log.Println("load partitions: ", request) + ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval) + defer cancel() + + rpt := &ReleasePartitionTask{ + Condition: NewTaskCondition(ctx), + ReleasePartitionRequest: request, + queryserviceClient: node.queryServiceClient, + } + + err := node.sched.DdQueue.Enqueue(rpt) + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + + err = rpt.WaitToFinish() + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, nil + } + + return rpt.result, nil } func (node *NodeImpl) GetPartitionStatistics(request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error) { diff --git a/internal/proxynode/task.go b/internal/proxynode/task.go index 2a5c688def306a6bded9341a98bffe11fb31cb9f..bfa92e0ed24580e6cdc10343e61b04db1ac6b358 100644 --- a/internal/proxynode/task.go +++ b/internal/proxynode/task.go @@ -1,7 +1,6 @@ package proxynode import ( - "context" "errors" "log" "math" @@ -1519,7 +1518,6 @@ type LoadCollectionTask struct { *milvuspb.LoadCollectionRequest queryserviceClient QueryServiceClient result *commonpb.Status - ctx context.Context } func (lct *LoadCollectionTask) OnEnqueue() error { @@ -1592,7 +1590,6 @@ type ReleaseCollectionTask struct { *milvuspb.ReleaseCollectionRequest queryserviceClient QueryServiceClient result *commonpb.Status - ctx context.Context } func (rct *ReleaseCollectionTask) OnEnqueue() error { @@ -1665,7 +1662,6 @@ type LoadPartitionTask struct { *milvuspb.LoadPartitonRequest queryserviceClient QueryServiceClient result *commonpb.Status - ctx context.Context } func (lpt *LoadPartitionTask) OnEnqueue() error { @@ -1747,7 +1743,6 @@ type ReleasePartitionTask struct { *milvuspb.ReleasePartitionRequest queryserviceClient QueryServiceClient result *commonpb.Status - ctx context.Context } func (rpt *ReleasePartitionTask) OnEnqueue() error { diff --git a/internal/querynode/query_node.go b/internal/querynode/query_node.go index ff9545780a836b799f672e4c968a99721bac8a8e..9f4629c1b924392a036df8ae4f6ccf75dbc80f7e 100644 --- a/internal/querynode/query_node.go +++ b/internal/querynode/query_node.go @@ -445,18 +445,20 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S } // segments are ordered before LoadSegments calling - if in.LastSegmentState.State == commonpb.SegmentState_SegmentGrowing { - segmentNum := len(segmentIDs) - position := in.LastSegmentState.StartPosition - err = node.loadService.seekSegment(position) - if err != nil { - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: err.Error(), + for i, state := range in.SegmentStates { + if state.State == commonpb.SegmentState_SegmentGrowing { + position := state.StartPosition + err = node.loadService.seekSegment(position) + if err != nil { + status := &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } + return status, err } - return status, err + segmentIDs = segmentIDs[:i] + break } - segmentIDs = segmentIDs[:segmentNum-1] } err = node.loadService.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs) diff --git a/internal/queryservice/meta_replica.go b/internal/queryservice/meta_replica.go index 2d165ad460a0681feff2b288d58964be3b2225dd..c7c97085a44700dfe085ee9ca7b8c3796d219e5d 100644 --- a/internal/queryservice/meta_replica.go +++ b/internal/queryservice/meta_replica.go @@ -13,6 +13,9 @@ type metaReplica interface { loadPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, state querypb.PartitionState) error getPartitionStates(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) ([]*querypb.PartitionStates, error) + releaseCollection(dbID UniqueID, collectionID UniqueID) error + releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error + addDmChannels(collectionID UniqueID, channels []string) error } type segment struct { @@ -26,9 +29,9 @@ type partition struct { } type collection struct { - id UniqueID - partitions map[UniqueID]*partition - node2channel map[int][]string + id UniqueID + partitions map[UniqueID]*partition + dmChannelNames []string } type metaReplicaImpl struct { @@ -48,18 +51,18 @@ func newMetaReplica() metaReplica { } func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) { + //TODO:: assert dbID = 0 exist if _, ok := mp.db2collections[dbID]; ok { partitions := make(map[UniqueID]*partition) - node2channel := make(map[int][]string) newCollection := &collection{ - id: collectionID, - partitions: partitions, - node2channel: node2channel, + id: collectionID, + partitions: partitions, } mp.db2collections[dbID] = append(mp.db2collections[dbID], newCollection) return newCollection, nil } - return nil, errors.New("can't find dbID when add collection") + + return nil, errors.New("addCollection: can't find dbID when add collection") } func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) { @@ -78,7 +81,7 @@ func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, pa } } } - return nil, errors.New("can't find collection when add partition") + return nil, errors.New("addPartition: can't find collection when add partition") } func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) { @@ -86,7 +89,7 @@ func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) return collections, nil } - return nil, errors.New("can't find collectionID") + return nil, errors.New("getCollections: can't find collectionID") } func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) { @@ -102,7 +105,7 @@ func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) ( } } - return nil, errors.New("can't find partitionIDs") + return nil, errors.New("getPartitions: can't find partitionIDs") } func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) { @@ -119,7 +122,7 @@ func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, par } } } - return nil, errors.New("can't find segmentID") + return nil, errors.New("getSegments: can't find segmentID") } func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) { @@ -127,14 +130,16 @@ func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) if collections, err := mp.getCollections(dbID); err == nil { for _, collection := range collections { if collectionID == collection.id { - return res, nil + res = collection } } - } else { - res, err = mp.addCollection(dbID, collectionID) + } + if res == nil { + collection, err := mp.addCollection(dbID, collectionID) if err != nil { return nil, err } + res = collection } return res, nil } @@ -177,7 +182,7 @@ func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID, } } } - return errors.New("update partition state fail") + return errors.New("updatePartitionState: update partition state fail") } func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID, @@ -203,3 +208,54 @@ func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID, } return partitionStates, nil } + +func (mp *metaReplicaImpl) releaseCollection(dbID UniqueID, collectionID UniqueID) error { + if collections, ok := mp.db2collections[dbID]; ok { + for i, collection := range collections { + if collectionID == collection.id { + collections = append(collections[:i], collections[i+1:]...) + return nil + } + } + } + return errors.New("releaseCollection: can't find dbID or collectionID") +} + +func (mp *metaReplicaImpl) releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error { + if collections, ok := mp.db2collections[dbID]; ok { + for _, collection := range collections { + if collectionID == collection.id { + if _, ok := collection.partitions[partitionID]; ok { + delete(collection.partitions, partitionID) + return nil + } + } + } + } + return errors.New("releasePartition: can't find dbID or collectionID or partitionID") +} + +func (mp *metaReplicaImpl) addDmChannels(collectionID UniqueID, channels []string) error { + //TODO :: use dbID + if collections, ok := mp.db2collections[0]; ok { + for _, collection := range collections { + if collectionID == collection.id { + dmChannels := collection.dmChannelNames + for _, channel := range channels { + match := false + for _, existedChannel := range dmChannels { + if channel == existedChannel { + match = true + break + } + } + if !match { + dmChannels = append(dmChannels, channel) + } + } + return nil + } + } + } + return errors.New("addDmChannels: can't find dbID or collectionID") +} diff --git a/internal/queryservice/querynode.go b/internal/queryservice/querynode.go index bc43fa7c0318233d32a2e149598e1bd710978cdc..46b03fa414fe5c3359cfe671c61bcfc8363413e2 100644 --- a/internal/queryservice/querynode.go +++ b/internal/queryservice/querynode.go @@ -8,8 +8,6 @@ import ( type queryNodeInfo struct { client QueryNodeInterface - insertChannels string - nodeID uint64 segments []UniqueID dmChannelNames []string } @@ -25,3 +23,21 @@ func (qn *queryNodeInfo) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb func (qn *queryNodeInfo) GetSegmentInfo(in *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) { return qn.client.GetSegmentInfo(in) } + +func (qn *queryNodeInfo) WatchDmChannels(in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) { + return qn.client.WatchDmChannels(in) +} + +func (qn *queryNodeInfo) AddDmChannels(channels []string) { + qn.dmChannelNames = append(qn.dmChannelNames, channels...) +} + +func newQueryNodeInfo(client QueryNodeInterface) *queryNodeInfo { + segments := make([]UniqueID, 0) + dmChannelNames := make([]string, 0) + return &queryNodeInfo{ + client: client, + segments: segments, + dmChannelNames: dmChannelNames, + } +} diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 01b60046b105f3aa3cb4a5e286cf206a223ea41d..cb55a836db5f7d81166c46e3732bb3f54be3ad03 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -3,7 +3,6 @@ package queryservice import ( "context" "fmt" - "log" "sort" "strconv" "sync/atomic" @@ -25,6 +24,7 @@ type MasterServiceInterface interface { type DataServiceInterface interface { GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) + GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) } type QueryNodeInterface interface { @@ -47,7 +47,7 @@ type QueryService struct { dataServiceClient DataServiceInterface masterServiceClient MasterServiceInterface - queryNodes []*queryNodeInfo + queryNodes map[UniqueID]*queryNodeInfo numRegisterNode uint64 numQueryChannel uint64 @@ -87,7 +87,7 @@ func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, erro componentStates, err := node.GetComponentStates() if err != nil { subComponentInfos = append(subComponentInfos, &internalpb2.ComponentInfo{ - NodeID: int64(nodeID), + NodeID: nodeID, StateCode: internalpb2.StateCode_ABNORMAL, }) continue @@ -111,28 +111,21 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) { return Params.StatsChannelName, nil } -// TODO:: do addWatchDmChannel to query node after registerNode func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { fmt.Println("register query node =", req.Address) // TODO:: add mutex - allocatedID := uint64(len(qs.queryNodes)) + allocatedID := len(qs.queryNodes) registerNodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10) var node *queryNodeInfo if qs.enableGrpc { client := nodeclient.NewClient(registerNodeAddress) - node = &queryNodeInfo{ - client: client, - nodeID: allocatedID, - } + node = newQueryNodeInfo(client) } else { - client := querynode.NewQueryNode(qs.loopCtx, allocatedID) - node = &queryNodeInfo{ - client: client, - nodeID: allocatedID, - } + client := querynode.NewQueryNode(qs.loopCtx, uint64(allocatedID)) + node = newQueryNodeInfo(client) } - qs.queryNodes = append(qs.queryNodes, node) + qs.queryNodes[UniqueID(allocatedID)] = node //TODO::return init params to queryNode return &querypb.RegisterNodeResponse{ @@ -186,10 +179,20 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com if err != nil { return fn(err), err } - if collection == nil { + + if len(collection.dmChannelNames) != 0 { return fn(nil), nil } + channelRequest := datapb.InsertChannelRequest{ + DbID: req.DbID, + CollectionID: req.CollectionID, + } + dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest) + if err != nil { + return fn(err), err + } + // get partitionIDs showPartitionRequest := &milvuspb.ShowPartitionRequest{ Base: &commonpb.MsgBase{ @@ -207,6 +210,21 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com } partitionIDs := showPartitionResponse.PartitionIDs + if len(partitionIDs) == 0 { + loadSegmentRequest := &querypb.LoadSegmentRequest{ + CollectionID: collectionID, + } + for _, node := range qs.queryNodes { + _, err := node.LoadSegments(loadSegmentRequest) + if err != nil { + return fn(err), err + } + } + nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels) + err = qs.watchDmChannels(dmChannels, nodeIDs, collection) + return fn(err), err + } + loadPartitionsRequest := &querypb.LoadPartitionRequest{ Base: req.Base, DbID: dbID, @@ -244,6 +262,14 @@ func (qs *QueryService) ReleaseCollection(req *querypb.ReleaseCollectionRequest) status, err := qs.ReleasePartitions(releasePartitionRequest) + err = qs.replica.releaseCollection(dbID, collectionID) + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, err + } + //TODO:: queryNode cancel subscribe dmChannels return status, err } @@ -276,16 +302,40 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm dbID := req.DbID collectionID := req.CollectionID partitionIDs := req.PartitionIDs - qs.replica.loadPartition(dbID, collectionID, partitionIDs[0]) + fn := func(err error) *commonpb.Status { + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + } + } return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, - Reason: err.Error(), + ErrorCode: commonpb.ErrorCode_SUCCESS, } } - // get segments and load segment to query node + if len(partitionIDs) == 0 { + err := errors.New("partitionIDs are empty") + return fn(err), err + } + + var collection *collection = nil + var err error + if collection, err = qs.replica.loadCollection(dbID, collectionID); err != nil { + return fn(err), err + } + for _, partitionID := range partitionIDs { + partition, err := qs.replica.loadPartition(dbID, collectionID, partitionID) + if err != nil { + return fn(err), err + } + + if partition == nil { + return fn(err), nil + } + showSegmentRequest := &milvuspb.ShowSegmentRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowSegment, @@ -297,92 +347,110 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm if err != nil { return fn(err), err } - if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { - log.Fatal("showSegment fail, v%", showSegmentResponse.Status.Reason) - } + segmentIDs := showSegmentResponse.SegmentIDs segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo) - channel2id := make(map[string]int) - //id2channels := make(map[int][]string) - id2segs := make(map[int][]UniqueID) - offset := 0 + channel2segs := make(map[string][]UniqueID) resp, err := qs.dataServiceClient.GetSegmentStates(&datapb.SegmentStatesRequest{ SegmentIDs: segmentIDs, }) if err != nil { - log.Fatal("get segment states fail") + return fn(err), err } for _, state := range resp.States { segmentID := state.SegmentID segmentStates[segmentID] = state - var flatChannelName string - // channelNames := make([]string, 0) - // for i, str := range state.StartPositions { - // flatChannelName += str.ChannelName - // channelNames = append(channelNames, str.ChannelName) - // if i+1 < len(state.StartPositions) { - // flatChannelName += "/" - // } - // } - if flatChannelName == "" { - log.Fatal("segmentState's channel name is empty") - } - if _, ok := channel2id[flatChannelName]; !ok { - channel2id[flatChannelName] = offset - //id2channels[offset] = channelNames - id2segs[offset] = make([]UniqueID, 0) - id2segs[offset] = append(id2segs[offset], segmentID) - offset++ + + channelName := state.StartPosition.ChannelName + + if _, ok := channel2segs[channelName]; !ok { + segments := make([]UniqueID, 0) + segments = append(segments, segmentID) + channel2segs[channelName] = segments } else { - //TODO::check channel name - id := channel2id[flatChannelName] - id2segs[id] = append(id2segs[id], segmentID) + channel2segs[channelName] = append(channel2segs[channelName], segmentID) } } - for key, value := range id2segs { - sort.Slice(value, func(i, j int) bool { return segmentStates[value[i]].CreateTime < segmentStates[value[j]].CreateTime }) - selectedSegs := make([]UniqueID, 0) - for i, v := range value { - if segmentStates[v].State == commonpb.SegmentState_SegmentFlushed { - selectedSegs = append(selectedSegs, v) - } else { - if i > 0 && segmentStates[selectedSegs[i-1]].State != commonpb.SegmentState_SegmentFlushed { + + qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory) + for channel, segmentIDs := range channel2segs { + sort.Slice(segmentIDs, func(i, j int) bool { + return segmentStates[segmentIDs[i]].StartPosition.Timestamp < segmentStates[segmentIDs[j]].StartPosition.Timestamp + }) + var channelLoadDone = false + for _, node := range qs.queryNodes { + channels2node := node.dmChannelNames + for _, ch := range channels2node { + if channel == ch { + channelLoadDone = true break } - selectedSegs = append(selectedSegs, v) + } + if channelLoadDone { + break + } + } + if !channelLoadDone { + states := make([]*datapb.SegmentStateInfo, 0) + for _, id := range segmentIDs { + states = append(states, segmentStates[id]) + } + loadSegmentRequest := &querypb.LoadSegmentRequest{ + CollectionID: collectionID, + PartitionID: partitionID, + SegmentIDs: segmentIDs, + SegmentStates: states, + } + dmChannels := []string{channel} + nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels) + err = qs.watchDmChannels(dmChannels, nodeIDs, collection) + if err != nil { + return fn(err), err + } + queryNode := qs.queryNodes[nodeIDs[0]] + //TODO:: seek when loadSegment may cause more msgs consumed + status, err := queryNode.LoadSegments(loadSegmentRequest) + if err != nil { + return status, err } } - id2segs[key] = selectedSegs } - qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory) + qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory) + } - // TODO:: filter channel for query node - for channels, i := range channel2id { - for key, node := range qs.queryNodes { - if channels == node.insertChannels { - statesID := id2segs[i][len(id2segs[i])-1] - //TODO :: should be start position - // position := segmentStates[statesID-1].StartPositions - // segmentStates[statesID].StartPositions = position - loadSegmentRequest := &querypb.LoadSegmentRequest{ - CollectionID: collectionID, - PartitionID: partitionID, - SegmentIDs: id2segs[i], - LastSegmentState: segmentStates[statesID], - } - status, err := qs.queryNodes[key].LoadSegments(loadSegmentRequest) - if err != nil { - return status, err - } + if len(collection.dmChannelNames) == 0 { + channelRequest := datapb.InsertChannelRequest{ + DbID: dbID, + CollectionID: collectionID, + } + + dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest) + if err != nil { + return fn(err), err + } + for _, partitionID := range partitionIDs { + loadSegmentRequest := &querypb.LoadSegmentRequest{ + CollectionID: collectionID, + PartitionID: partitionID, + } + for _, node := range qs.queryNodes { + _, err := node.LoadSegments(loadSegmentRequest) + if err != nil { + return fn(err), nil } } + nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels) + err = qs.watchDmChannels(dmChannels, nodeIDs, collection) + } + if err != nil { + return fn(err), err } - qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory) } + return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, }, nil @@ -407,6 +475,13 @@ func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest) } segmentIDs = append(segmentIDs, res...) + err = qs.replica.releasePartition(dbID, collectionID, partitionID) + if err != nil { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: err.Error(), + }, err + } } releaseSegmentRequest := &querypb.ReleaseSegmentRequest{ Base: req.Base, @@ -423,6 +498,7 @@ func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest) } } + //TODO:: queryNode cancel subscribe dmChannels return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, }, nil @@ -486,14 +562,14 @@ func (qs *QueryService) GetSegmentInfo(req *querypb.SegmentInfoRequest) (*queryp } func NewQueryService(ctx context.Context) (*QueryService, error) { - nodes := make([]*queryNodeInfo, 0) + nodes := make(map[UniqueID]*queryNodeInfo) ctx1, cancel := context.WithCancel(ctx) replica := newMetaReplica() service := &QueryService{ loopCtx: ctx1, loopCancel: cancel, - queryNodes: nodes, replica: replica, + queryNodes: nodes, numRegisterNode: 0, numQueryChannel: 0, enableGrpc: false, @@ -514,3 +590,77 @@ func (qs *QueryService) SetDataService(dataService DataServiceInterface) { func (qs *QueryService) SetEnableGrpc(en bool) { qs.enableGrpc = en } + +func (qs *QueryService) watchDmChannels(dmChannels []string, assignedNodeIDs []UniqueID, collection *collection) error { + err := qs.replica.addDmChannels(collection.id, dmChannels) + if err != nil { + return err + } + node2channels := make(map[UniqueID][]string) + for i, channel := range dmChannels { + nodeID := assignedNodeIDs[i] + findChannel := false + for _, ch := range collection.dmChannelNames { + if channel == ch { + findChannel = true + } + } + if !findChannel { + if _, ok := node2channels[nodeID]; ok { + node2channels[nodeID] = append(node2channels[nodeID], channel) + } else { + channels := make([]string, 0) + channels = append(channels, channel) + node2channels[nodeID] = channels + } + } + } + + for nodeID, channels := range node2channels { + node := qs.queryNodes[nodeID] + request := &querypb.WatchDmChannelsRequest{ + ChannelIDs: channels, + } + _, err := node.WatchDmChannels(request) + node.AddDmChannels(channels) + if err != nil { + return err + } + } + + return nil +} + +func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) []UniqueID { + maxNumDMChannel := 0 + res := make([]UniqueID, 0) + node2lens := make(map[UniqueID]int) + for id, node := range qs.queryNodes { + node2lens[id] = len(node.dmChannelNames) + } + offset := 0 + for { + lastOffset := offset + for id, len := range node2lens { + if len >= maxNumDMChannel { + maxNumDMChannel = len + } else { + res = append(res, id) + node2lens[id]++ + offset++ + } + } + if lastOffset == offset { + for id := range node2lens { + res = append(res, id) + node2lens[id]++ + offset++ + break + } + } + if offset == len(dmChannels) { + break + } + } + return res +} diff --git a/internal/queryservice/queryservice_test.go b/internal/queryservice/queryservice_test.go index e0f26c162f8c08445c5dde019bf423fea9dde2a6..a6e36bbee1c1cddb17b93dd8f30cc539b969ccc1 100644 --- a/internal/queryservice/queryservice_test.go +++ b/internal/queryservice/queryservice_test.go @@ -155,6 +155,9 @@ func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datap return ret, nil } +func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) { + return []string{"test-insert"}, nil +} func TestQueryService_Init(t *testing.T) { service, err := NewQueryService(context.Background()) diff --git a/internal/timesync/time_sync_producer.go b/internal/timesync/time_sync_producer.go index 1fc4288ff2ebf18c73c5f7556927f24544dc628d..55f74dfacb7af4b95605161bc238d8a3e8011b75 100644 --- a/internal/timesync/time_sync_producer.go +++ b/internal/timesync/time_sync_producer.go @@ -63,13 +63,19 @@ func (producer *MsgProducer) broadcastMsg() { func (producer *MsgProducer) Start(ctx context.Context) { producer.ctx, producer.cancel = context.WithCancel(ctx) - producer.wg.Add(1 + len(producer.watchers)) + producer.wg.Add(2 + len(producer.watchers)) + go producer.startTTBarrier() for _, watcher := range producer.watchers { go producer.startWatcher(watcher) } go producer.broadcastMsg() } +func (producer *MsgProducer) startTTBarrier() { + defer producer.wg.Done() + producer.ttBarrier.StartBackgroundLoop(producer.ctx) +} + func (producer *MsgProducer) startWatcher(watcher TimeTickWatcher) { defer producer.wg.Done() watcher.StartBackgroundLoop(producer.ctx) diff --git a/internal/timesync/timesync.go b/internal/timesync/timesync.go index 9df5c1e93222291ced1b6a0703539ce67b8c0f46..4f74aca5a798180c8b69868d2b029fa80cde61dd 100644 --- a/internal/timesync/timesync.go +++ b/internal/timesync/timesync.go @@ -18,7 +18,7 @@ type ( TimeTickBarrier interface { GetTimeTick() (Timestamp, error) - StartBackgroundLoop() + StartBackgroundLoop(ctx context.Context) } softTimeTickBarrier struct { @@ -38,7 +38,7 @@ type ( } ) -func NewSoftTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier { +func NewSoftTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier { if len(peerIds) <= 0 { log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!\n") return nil @@ -49,7 +49,6 @@ func NewSoftTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds sttbarrier.ttStream = ttStream sttbarrier.outTt = make(chan Timestamp, 1024) sttbarrier.peer2LastTt = make(map[UniqueID]Timestamp) - sttbarrier.ctx = ctx for _, id := range peerIds { sttbarrier.peer2LastTt[id] = Timestamp(0) } @@ -80,11 +79,12 @@ func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) { } } -func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop() { +func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop(ctx context.Context) { + ttBarrier.ctx = ctx for { select { - case <-ttBarrier.ctx.Done(): - log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err()) + case <-ctx.Done(): + log.Printf("[TtBarrierStart] %s\n", ctx.Err()) return default: } @@ -137,13 +137,14 @@ func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp, error) { } } -func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop() { +func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop(ctx context.Context) { + ttBarrier.ctx = ctx // Last timestamp synchronized state := Timestamp(0) for { select { - case <-ttBarrier.ctx.Done(): - log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err()) + case <-ctx.Done(): + log.Printf("[TtBarrierStart] %s\n", ctx.Err()) return default: } @@ -187,7 +188,7 @@ func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp { return tempMin } -func NewHardTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier { +func NewHardTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier { if len(peerIds) <= 0 { log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!") return nil @@ -198,7 +199,6 @@ func NewHardTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds sttbarrier.outTt = make(chan Timestamp, 1024) sttbarrier.peer2Tt = make(map[UniqueID]Timestamp) - sttbarrier.ctx = ctx for _, id := range peerIds { sttbarrier.peer2Tt[id] = Timestamp(0) }