提交 e0e8e160 编写于 作者: D dragondriver 提交者: yefu.chen

Implement the release-related task and load-related task in Proxy

Signed-off-by: Ndragondriver <jiquan.long@zilliz.com>
上级 7270d18c
......@@ -56,11 +56,12 @@ type (
}
DataNode struct {
ctx context.Context
cancel context.CancelFunc
NodeID UniqueID
Role string
State internalpb2.StateCode
ctx context.Context
cancel context.CancelFunc
NodeID UniqueID
Role string
State internalpb2.StateCode
watchDm chan struct{}
dataSyncService *dataSyncService
metaService *metaService
......@@ -81,11 +82,13 @@ func NewDataNode(ctx context.Context) *DataNode {
Params.Init()
ctx2, cancel2 := context.WithCancel(ctx)
node := &DataNode{
ctx: ctx2,
cancel: cancel2,
NodeID: Params.NodeID, // GOOSE TODO: How to init
Role: typeutil.DataNodeRole,
State: internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic
ctx: ctx2,
cancel: cancel2,
NodeID: Params.NodeID, // GOOSE TODO: How to init
Role: typeutil.DataNodeRole,
State: internalpb2.StateCode_INITIALIZING, // GOOSE TODO: atomic
watchDm: make(chan struct{}),
dataSyncService: nil,
metaService: nil,
masterService: nil,
......@@ -135,6 +138,13 @@ func (node *DataNode) Init() error {
return errors.Errorf("Register node failed: %v", err)
}
select {
case <-time.After(RPCConnectionTimeout):
return errors.New("Get DmChannels failed in 30 seconds")
case <-node.watchDm:
log.Println("insert channel names set")
}
for _, kv := range resp.InitParams.StartParams {
switch kv.Key {
case "DDChannelName":
......@@ -162,10 +172,10 @@ func (node *DataNode) Init() error {
node.flushChan = make(chan *flushMsg, chanSize)
node.dataSyncService = newDataSyncService(node.ctx, node.flushChan, replica, alloc)
node.dataSyncService.init()
node.metaService = newMetaService(node.ctx, replica, node.masterService)
node.replica = replica
node.dataSyncService.initNodes()
// --- Opentracing ---
cfg := &config.Configuration{
......@@ -191,14 +201,9 @@ func (node *DataNode) Init() error {
func (node *DataNode) Start() error {
node.metaService.init()
return nil
}
// DataNode is HEALTHY until StartSync() is called
func (node *DataNode) StartSync() {
node.dataSyncService.init()
go node.dataSyncService.start()
node.State = internalpb2.StateCode_HEALTHY
return nil
}
func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
......@@ -219,7 +224,7 @@ func (node *DataNode) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*common
default:
Params.InsertChannelNames = in.GetChannelNames()
status.ErrorCode = commonpb.ErrorCode_SUCCESS
node.StartSync()
node.watchDm <- struct{}{}
return status, nil
}
}
......
......@@ -94,7 +94,6 @@ type (
ddChannelName string
segmentInfoStream msgstream.MsgStream
insertChannels []string
ttBarrier timesync.TimeTickBarrier
}
)
......@@ -178,23 +177,23 @@ func (s *Server) initSegmentInfoChannel() {
s.segmentInfoStream.Start()
}
func (s *Server) initMsgProducer() error {
var err error
factory := pulsarms.NewFactory(Params.PulsarAddress, 1024, 1024)
if s.ttMsgStream, err = factory.NewMsgStream(s.ctx); err != nil {
return err
}
s.ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
ttMsgStream, _ := factory.NewMsgStream(s.ctx)
ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
s.ttMsgStream = ttMsgStream
s.ttMsgStream.Start()
if s.k2sMsgStream, err = factory.NewMsgStream(s.ctx); err != nil {
return err
}
s.k2sMsgStream.AsProducer(Params.K2SChannelNames)
s.k2sMsgStream.Start()
timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs())
dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
k2sStream, _ := factory.NewMsgStream(s.ctx)
k2sStream.AsProducer(Params.K2SChannelNames)
s.k2sMsgStream = k2sStream
s.k2sMsgStream.Start()
k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream)
if s.msgProducer, err = timesync.NewTimeSyncMsgProducer(s.ttBarrier, dataNodeTTWatcher, k2sMsgWatcher); err != nil {
producer, err := timesync.NewTimeSyncMsgProducer(timeTickBarrier, dataNodeTTWatcher, k2sMsgWatcher)
if err != nil {
return err
}
s.msgProducer = producer
s.msgProducer.Start(s.ctx)
return nil
}
......@@ -298,11 +297,10 @@ func (s *Server) checkMasterIsHealthy() error {
func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(4)
s.serverLoopWg.Add(3)
go s.startStatsChannel(s.serverLoopCtx)
go s.startSegmentFlushChannel(s.serverLoopCtx)
go s.startDDChannel(s.serverLoopCtx)
go s.startTTBarrier(s.serverLoopCtx)
}
func (s *Server) startStatsChannel(ctx context.Context) {
......@@ -390,12 +388,6 @@ func (s *Server) startDDChannel(ctx context.Context) {
}
}
func (s *Server) startTTBarrier(ctx context.Context) {
defer s.serverLoopWg.Done()
s.ttBarrier = timesync.NewHardTimeTickBarrier(ctx, s.ttMsgStream, s.cluster.GetNodeIDs())
s.ttBarrier.StartBackgroundLoop()
}
func (s *Server) waitDataNodeRegister() {
log.Println("waiting data node to register")
<-s.registerFinishCh
......
......@@ -76,7 +76,6 @@ func (s *Server) Start() error {
func (s *Server) Stop() error {
err := s.core.Stop()
s.cancel()
s.grpcServer.GracefulStop()
return err
}
......
......@@ -35,7 +35,7 @@ type Server struct {
func (s *Server) Init() error {
log.Println("query service init")
if err := s.queryService.Init(); err != nil {
panic(err)
return err
}
s.queryService.SetEnableGrpc(true)
return nil
......
......@@ -124,7 +124,7 @@ message LoadSegmentRequest {
int64 partitionID = 4;
repeated int64 segmentIDs = 5;
repeated int64 fieldIDs = 6;
data.SegmentStateInfo last_segment_state = 7;
repeated data.SegmentStateInfo segment_states = 7;
}
message ReleaseSegmentRequest {
......
......@@ -972,16 +972,16 @@ func (m *WatchDmChannelsRequest) GetChannelIDs() []string {
}
type LoadSegmentRequest struct {
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"`
CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
SegmentIDs []int64 `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"`
FieldIDs []int64 `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"`
LastSegmentState *datapb.SegmentStateInfo `protobuf:"bytes,7,opt,name=last_segment_state,json=lastSegmentState,proto3" json:"last_segment_state,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Base *commonpb.MsgBase `protobuf:"bytes,1,opt,name=base,proto3" json:"base,omitempty"`
DbID int64 `protobuf:"varint,2,opt,name=dbID,proto3" json:"dbID,omitempty"`
CollectionID int64 `protobuf:"varint,3,opt,name=collectionID,proto3" json:"collectionID,omitempty"`
PartitionID int64 `protobuf:"varint,4,opt,name=partitionID,proto3" json:"partitionID,omitempty"`
SegmentIDs []int64 `protobuf:"varint,5,rep,packed,name=segmentIDs,proto3" json:"segmentIDs,omitempty"`
FieldIDs []int64 `protobuf:"varint,6,rep,packed,name=fieldIDs,proto3" json:"fieldIDs,omitempty"`
SegmentStates []*datapb.SegmentStateInfo `protobuf:"bytes,7,rep,name=segment_states,json=segmentStates,proto3" json:"segment_states,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *LoadSegmentRequest) Reset() { *m = LoadSegmentRequest{} }
......@@ -1051,9 +1051,9 @@ func (m *LoadSegmentRequest) GetFieldIDs() []int64 {
return nil
}
func (m *LoadSegmentRequest) GetLastSegmentState() *datapb.SegmentStateInfo {
func (m *LoadSegmentRequest) GetSegmentStates() []*datapb.SegmentStateInfo {
if m != nil {
return m.LastSegmentState
return m.SegmentStates
}
return nil
}
......@@ -1395,91 +1395,90 @@ func init() {
func init() { proto.RegisterFile("query_service.proto", fileDescriptor_5fcb6756dc1afb8d) }
var fileDescriptor_5fcb6756dc1afb8d = []byte{
// 1331 bytes of a gzipped FileDescriptorProto
// 1324 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xcc, 0x57, 0x5f, 0x73, 0xdb, 0x44,
0x10, 0xb7, 0x6c, 0xc7, 0xa9, 0x37, 0xae, 0xed, 0x5e, 0xfe, 0x19, 0x51, 0x4a, 0x39, 0xa0, 0x4d,
0x13, 0x70, 0x98, 0x74, 0x60, 0x78, 0x82, 0x49, 0xe2, 0x4e, 0xc6, 0x33, 0x34, 0xa4, 0x72, 0x3a,
0x1d, 0x02, 0x1d, 0x23, 0x4b, 0x17, 0xe7, 0x5a, 0xfd, 0x71, 0x75, 0xe7, 0xa4, 0xc9, 0x0b, 0x30,
0xc3, 0x23, 0x03, 0x9f, 0x81, 0x81, 0x81, 0x19, 0x5e, 0xf8, 0x36, 0xbc, 0xf0, 0x02, 0xdf, 0x84,
0xd1, 0x49, 0x56, 0x24, 0x59, 0x8e, 0x9c, 0xba, 0x69, 0x78, 0xd3, 0x9d, 0xf6, 0xf6, 0xb7, 0xfb,
0xdb, 0xbd, 0xbd, 0x5d, 0x98, 0x7d, 0xd6, 0x27, 0xce, 0x71, 0x9b, 0x11, 0xe7, 0x90, 0x6a, 0xa4,
0xde, 0x73, 0x6c, 0x6e, 0x23, 0x64, 0x52, 0xe3, 0xb0, 0xcf, 0xbc, 0x55, 0x5d, 0x48, 0xc8, 0x25,
0xcd, 0x36, 0x4d, 0xdb, 0xf2, 0xf6, 0xe4, 0x52, 0x58, 0x42, 0x2e, 0x53, 0x8b, 0x13, 0xc7, 0x52,
0x0d, 0x7f, 0x8d, 0x74, 0x95, 0xab, 0x51, 0x9d, 0xf8, 0x1b, 0x98, 0x55, 0x48, 0x97, 0x32, 0x4e,
0x9c, 0x6d, 0x5b, 0x27, 0x0a, 0x79, 0xd6, 0x27, 0x8c, 0xa3, 0x0f, 0x20, 0xdf, 0x51, 0x19, 0xa9,
0x49, 0x37, 0xa5, 0xa5, 0x99, 0xb5, 0xeb, 0xf5, 0x08, 0xb2, 0x0f, 0x79, 0x9f, 0x75, 0x37, 0x54,
0x46, 0x14, 0x21, 0x89, 0x3e, 0x82, 0x69, 0x55, 0xd7, 0x1d, 0xc2, 0x58, 0x2d, 0x7b, 0xc6, 0xa1,
0x75, 0x4f, 0x46, 0x19, 0x08, 0xe3, 0x9f, 0x24, 0x98, 0x8b, 0x5a, 0xc0, 0x7a, 0xb6, 0xc5, 0x08,
0xba, 0x0b, 0x05, 0xc6, 0x55, 0xde, 0x67, 0xbe, 0x11, 0xaf, 0x27, 0xea, 0x6b, 0x09, 0x11, 0xc5,
0x17, 0x45, 0x1b, 0x30, 0x43, 0x2d, 0xca, 0xdb, 0x3d, 0xd5, 0x51, 0xcd, 0x81, 0x25, 0x6f, 0x45,
0x4f, 0x06, 0xac, 0x34, 0x2d, 0xca, 0x77, 0x84, 0xa0, 0x02, 0x34, 0xf8, 0xc6, 0x8f, 0x61, 0xbe,
0x75, 0x60, 0x1f, 0x6d, 0xda, 0x86, 0x41, 0x34, 0x4e, 0x6d, 0xeb, 0xc5, 0x49, 0x41, 0x90, 0xd7,
0x3b, 0xcd, 0x86, 0xb0, 0x23, 0xa7, 0x88, 0x6f, 0xcc, 0x60, 0x21, 0xae, 0x7e, 0x12, 0x8f, 0xdf,
0x81, 0xab, 0x5a, 0xa0, 0xaa, 0xd9, 0x70, 0x7d, 0xce, 0x2d, 0xe5, 0x94, 0xe8, 0x26, 0xfe, 0x4e,
0x82, 0xf9, 0xcf, 0x6c, 0x55, 0xbf, 0x20, 0xa7, 0x10, 0x86, 0x52, 0x18, 0xb0, 0x96, 0x13, 0xff,
0x22, 0x7b, 0xf8, 0x7b, 0x09, 0x6a, 0x0a, 0x31, 0x88, 0xca, 0xc8, 0x65, 0x9a, 0xf1, 0xad, 0x04,
0x73, 0x6e, 0x00, 0x76, 0x54, 0x87, 0xd3, 0xcb, 0x31, 0xa1, 0xe7, 0x65, 0x58, 0xc8, 0x82, 0x49,
0x32, 0x00, 0x43, 0xa9, 0x37, 0xd0, 0x74, 0x9a, 0x00, 0x91, 0x3d, 0x6c, 0x42, 0x25, 0x40, 0x73,
0x8f, 0x13, 0x86, 0x6e, 0xc2, 0x4c, 0x48, 0x44, 0x00, 0xe6, 0x94, 0xf0, 0x16, 0xfa, 0x18, 0xa6,
0x5c, 0x08, 0x22, 0xfc, 0x2b, 0xaf, 0xe1, 0xfa, 0x70, 0xfd, 0xa9, 0x47, 0xb5, 0x2a, 0xde, 0x01,
0xfc, 0x9b, 0x04, 0x0b, 0x31, 0xbc, 0x57, 0xce, 0xf2, 0x10, 0x2f, 0xf9, 0x04, 0x5e, 0xfe, 0x90,
0x60, 0x71, 0xc8, 0xd0, 0x49, 0x82, 0xb1, 0x07, 0x0b, 0x01, 0x40, 0x5b, 0x27, 0x4c, 0x73, 0x68,
0xcf, 0xfd, 0xf6, 0xc2, 0x32, 0xb3, 0xf6, 0x76, 0x3a, 0x89, 0x4c, 0x99, 0x0f, 0x54, 0x34, 0x42,
0x1a, 0xf0, 0xaf, 0x12, 0xcc, 0xb9, 0x97, 0xf8, 0xf2, 0x32, 0x77, 0x2c, 0x4e, 0x7f, 0x97, 0x60,
0xd1, 0xbf, 0xe7, 0xff, 0x73, 0x4b, 0x7f, 0x96, 0x40, 0xde, 0x74, 0x88, 0xca, 0xc9, 0x03, 0x37,
0x0e, 0x9b, 0x07, 0xaa, 0x65, 0x11, 0x63, 0xb2, 0x04, 0xb8, 0x0d, 0x15, 0xc7, 0x73, 0xb6, 0xad,
0x79, 0xfa, 0x84, 0xe9, 0x45, 0xa5, 0xec, 0x6f, 0xfb, 0x28, 0xe8, 0x5d, 0x28, 0x3b, 0x84, 0xf5,
0x8d, 0x53, 0xb9, 0x9c, 0x90, 0xbb, 0xea, 0xed, 0xfa, 0x62, 0xf8, 0x17, 0x09, 0x16, 0xd7, 0x75,
0x3d, 0x6c, 0xe0, 0x04, 0x77, 0x69, 0x05, 0xae, 0xc5, 0xac, 0xf3, 0xa9, 0x2d, 0x2a, 0xd5, 0xa8,
0x7d, 0xcd, 0x06, 0xba, 0x03, 0xd5, 0xa8, 0x85, 0x3e, 0xd5, 0x45, 0xa5, 0x12, 0xb1, 0xb1, 0xd9,
0xc0, 0x7f, 0x4b, 0x20, 0x2b, 0xc4, 0xb4, 0x0f, 0x49, 0xa2, 0xa1, 0x2f, 0xc4, 0xe4, 0xc0, 0xbb,
0xec, 0x64, 0xde, 0xe5, 0xce, 0xe1, 0x5d, 0x3e, 0xd9, 0xbb, 0x27, 0xb0, 0xf0, 0x48, 0xe5, 0xda,
0x41, 0xc3, 0x9c, 0x3c, 0x02, 0x37, 0x00, 0x02, 0x3c, 0xaf, 0x28, 0x14, 0x95, 0xd0, 0x0e, 0xfe,
0x33, 0x0b, 0xc8, 0xbd, 0xe4, 0x2d, 0xd2, 0x35, 0x89, 0xc5, 0x5f, 0xfd, 0xc5, 0x89, 0xbd, 0x0b,
0xf9, 0xe1, 0x77, 0xe1, 0x06, 0x00, 0xf3, 0xac, 0x73, 0x5d, 0x98, 0x12, 0x17, 0x2b, 0xb4, 0x83,
0x64, 0xb8, 0xb2, 0x4f, 0x89, 0xa1, 0xbb, 0x7f, 0x0b, 0xe2, 0x6f, 0xb0, 0x46, 0x0f, 0x00, 0x19,
0x2a, 0xe3, 0x6d, 0x5f, 0xbc, 0xed, 0x3d, 0x30, 0xd3, 0xc2, 0xab, 0x58, 0x6d, 0x74, 0xbb, 0xd5,
0xba, 0x4f, 0x83, 0x28, 0x8c, 0x4d, 0x6b, 0xdf, 0x56, 0xaa, 0xee, 0xf1, 0xf0, 0x2e, 0xfe, 0x57,
0x82, 0x79, 0xbf, 0xde, 0x5c, 0x1a, 0x69, 0x63, 0x54, 0x9b, 0x49, 0x68, 0xc3, 0x3f, 0x4a, 0xb0,
0xb8, 0x69, 0x9b, 0x3d, 0xdb, 0x1a, 0xb8, 0x3d, 0xe1, 0x3b, 0xf5, 0x89, 0x77, 0x88, 0x0c, 0x7a,
0xe4, 0x5b, 0x23, 0x7a, 0xe4, 0x38, 0xa8, 0x7f, 0x0a, 0xff, 0x23, 0xc1, 0x8c, 0xcf, 0xb6, 0x1b,
0x16, 0x74, 0x1d, 0x8a, 0x81, 0x2b, 0x7e, 0x2f, 0x71, 0xba, 0x31, 0x44, 0x61, 0x36, 0x3d, 0xef,
0x72, 0xc3, 0x79, 0xf7, 0x1a, 0x5c, 0x31, 0x89, 0xd9, 0x66, 0xf4, 0x84, 0xf8, 0x69, 0x39, 0x6d,
0x12, 0xb3, 0x45, 0x4f, 0x88, 0xfb, 0xcb, 0xea, 0x9b, 0x6d, 0xc7, 0x3e, 0x72, 0x99, 0x15, 0xbf,
0xac, 0xbe, 0xa9, 0xd8, 0x47, 0x0c, 0xbd, 0x01, 0x40, 0x2d, 0x9d, 0x3c, 0x6f, 0x5b, 0xaa, 0x49,
0x6a, 0x05, 0x71, 0xc3, 0x8b, 0x62, 0x67, 0x5b, 0x35, 0x09, 0xaa, 0xc1, 0xb4, 0x58, 0x34, 0x1b,
0x22, 0x0b, 0x73, 0xca, 0x60, 0x89, 0xf7, 0x01, 0x85, 0x3c, 0x9c, 0xe8, 0xc6, 0x87, 0xe2, 0x9e,
0x8d, 0xc7, 0xdd, 0xed, 0xcd, 0x67, 0x23, 0x40, 0x93, 0xc4, 0xf5, 0x43, 0x98, 0xa2, 0xd6, 0xbe,
0x3d, 0x68, 0x37, 0xde, 0x4c, 0x6a, 0x37, 0xc2, 0x60, 0x9e, 0xf4, 0xf2, 0x09, 0x94, 0xa3, 0x4d,
0x08, 0x2a, 0xc1, 0x95, 0x6d, 0x9b, 0xdf, 0x7b, 0x4e, 0x19, 0xaf, 0x66, 0x50, 0x19, 0x60, 0xdb,
0xe6, 0x3b, 0x0e, 0x61, 0xc4, 0xe2, 0x55, 0x09, 0x01, 0x14, 0x3e, 0xb7, 0x1a, 0x94, 0x3d, 0xad,
0x66, 0xd1, 0xac, 0xdf, 0x5b, 0xaa, 0x46, 0xd3, 0xba, 0x4f, 0x4c, 0xdb, 0x39, 0xae, 0xe6, 0xdc,
0xe3, 0xc1, 0x2a, 0x8f, 0xaa, 0x50, 0x0a, 0x44, 0xb6, 0x76, 0x1e, 0x56, 0xa7, 0x50, 0x11, 0xa6,
0xbc, 0xcf, 0xc2, 0xda, 0x0f, 0x00, 0x25, 0xf1, 0x6a, 0xb4, 0xbc, 0xc9, 0x14, 0x69, 0x50, 0x0a,
0x4f, 0x84, 0xe8, 0x76, 0x92, 0x13, 0x09, 0x53, 0xab, 0xbc, 0x94, 0x2e, 0xe8, 0x71, 0x8b, 0x33,
0xe8, 0x09, 0x54, 0xa2, 0x63, 0x18, 0x43, 0x77, 0x12, 0xc9, 0x4a, 0x1a, 0x05, 0xe5, 0xe5, 0x71,
0x44, 0x03, 0xac, 0x2e, 0x94, 0x23, 0xfd, 0x3e, 0x43, 0x4b, 0xa3, 0xce, 0xc7, 0x3b, 0x26, 0xf9,
0xce, 0x18, 0x92, 0x01, 0xd0, 0x17, 0x50, 0x8e, 0x34, 0x88, 0x23, 0x80, 0x92, 0x9a, 0x48, 0xf9,
0xac, 0xf4, 0xc2, 0x19, 0xd4, 0x86, 0x6b, 0xf1, 0xa6, 0x8e, 0xa1, 0x95, 0x64, 0xc2, 0x13, 0x7b,
0xbf, 0x34, 0x80, 0x3d, 0xcf, 0xf6, 0x53, 0x02, 0x93, 0xe3, 0x91, 0x38, 0xc5, 0xa6, 0xe9, 0xfe,
0x3a, 0x30, 0x3e, 0xa4, 0xfe, 0xbd, 0x33, 0x8c, 0x3f, 0x37, 0x42, 0x07, 0xd0, 0x70, 0x27, 0x89,
0xe4, 0xc4, 0x43, 0xf7, 0xcc, 0x1e, 0x3f, 0x96, 0xeb, 0x49, 0xf0, 0xa3, 0xbb, 0x51, 0x9c, 0x41,
0x8f, 0x00, 0x6d, 0x11, 0xbe, 0x4b, 0x4d, 0xb2, 0x4b, 0xb5, 0xa7, 0xe3, 0x60, 0xc4, 0x5e, 0x54,
0x7f, 0xd1, 0xe2, 0x0e, 0xb5, 0xba, 0x91, 0xb4, 0x99, 0xdb, 0x22, 0xa2, 0xc2, 0x53, 0xc6, 0xa9,
0xc6, 0x5e, 0xa2, 0x6a, 0x5b, 0xd8, 0x1c, 0x9f, 0x3d, 0x97, 0xc7, 0x99, 0x82, 0x7c, 0xe2, 0x57,
0xc6, 0x92, 0x0d, 0x00, 0xf7, 0x04, 0x60, 0xec, 0xd9, 0x3a, 0xd3, 0x93, 0x31, 0x9f, 0x3e, 0x9c,
0x41, 0x1a, 0x94, 0x5d, 0x9e, 0x42, 0xcf, 0xde, 0xad, 0xb4, 0xfa, 0xea, 0x3b, 0x71, 0x3b, 0x55,
0x6e, 0xe0, 0xc0, 0xda, 0x5f, 0x05, 0x28, 0x8a, 0x04, 0x10, 0xb5, 0xef, 0xc2, 0x62, 0xbe, 0x0b,
0x15, 0x3f, 0xe6, 0x2f, 0x33, 0xdc, 0xed, 0x73, 0xb3, 0x9f, 0x18, 0xde, 0x11, 0xad, 0x0e, 0xce,
0xa0, 0xc7, 0x50, 0x89, 0x4d, 0x43, 0xc9, 0x45, 0x68, 0xc4, 0xc8, 0x94, 0x76, 0x8d, 0x35, 0x40,
0xc3, 0x63, 0x0c, 0xaa, 0x27, 0x57, 0x8a, 0x51, 0xe3, 0x4e, 0x1a, 0xc8, 0x57, 0x50, 0x89, 0x8d,
0x13, 0xc9, 0x17, 0x22, 0x79, 0xe6, 0x48, 0xd3, 0xfe, 0x10, 0x4a, 0xa1, 0xf9, 0x81, 0x25, 0xa7,
0xe8, 0xf0, 0x84, 0x91, 0xa6, 0xf6, 0x4b, 0xa8, 0x44, 0x9b, 0xec, 0x11, 0xef, 0x65, 0x62, 0x27,
0x9e, 0x4e, 0xfb, 0xc5, 0x5f, 0xac, 0x8d, 0xf5, 0xbd, 0x4f, 0xbb, 0x94, 0x1f, 0xf4, 0x3b, 0x2e,
0xfc, 0xea, 0x09, 0x35, 0x0c, 0x7a, 0xc2, 0x89, 0x76, 0xb0, 0xea, 0x69, 0x78, 0x5f, 0xa7, 0x8c,
0x3b, 0xb4, 0xd3, 0xe7, 0x44, 0x5f, 0x1d, 0x14, 0x81, 0x55, 0xa1, 0x76, 0x55, 0xa8, 0xed, 0x75,
0x3a, 0x05, 0xb1, 0xbc, 0xfb, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xe1, 0xdf, 0xec, 0xde, 0x9f,
0x17, 0x00, 0x00,
0x5b, 0x70, 0x98, 0x74, 0x60, 0x78, 0x82, 0x69, 0xe3, 0x4e, 0xc6, 0x0c, 0x0d, 0x41, 0x4e, 0xa7,
0x43, 0xa0, 0x63, 0x64, 0xe9, 0xe2, 0x5c, 0x6b, 0x49, 0xae, 0xee, 0x9c, 0x34, 0x79, 0x01, 0x66,
0x78, 0x64, 0xe0, 0x33, 0x30, 0x30, 0xc0, 0xf0, 0x81, 0x78, 0xe1, 0x05, 0xbe, 0x09, 0xa3, 0x93,
0xac, 0x48, 0xf2, 0x39, 0x72, 0xea, 0xa6, 0xe1, 0x4d, 0x77, 0xda, 0xdb, 0xdf, 0xee, 0x6f, 0xf7,
0xf6, 0x76, 0x61, 0xfe, 0xe9, 0x80, 0xb8, 0x87, 0x6d, 0x46, 0xdc, 0x7d, 0x6a, 0x90, 0x7a, 0xdf,
0x75, 0xb8, 0x83, 0x90, 0x45, 0x7b, 0xfb, 0x03, 0xe6, 0xaf, 0xea, 0x42, 0x42, 0x2d, 0x19, 0x8e,
0x65, 0x39, 0xb6, 0xbf, 0xa7, 0x96, 0xa2, 0x12, 0x6a, 0x99, 0xda, 0x9c, 0xb8, 0xb6, 0xde, 0x0b,
0xd6, 0xc8, 0xd4, 0xb9, 0x1e, 0xd7, 0x89, 0xbf, 0x81, 0x79, 0x8d, 0x74, 0x29, 0xe3, 0xc4, 0xdd,
0x74, 0x4c, 0xa2, 0x91, 0xa7, 0x03, 0xc2, 0x38, 0x7a, 0x0f, 0xf2, 0x1d, 0x9d, 0x91, 0x9a, 0x72,
0x55, 0x59, 0x99, 0x5b, 0xbb, 0x5c, 0x8f, 0x21, 0x07, 0x90, 0xf7, 0x59, 0xf7, 0xae, 0xce, 0x88,
0x26, 0x24, 0xd1, 0x07, 0x30, 0xab, 0x9b, 0xa6, 0x4b, 0x18, 0xab, 0x65, 0x4f, 0x38, 0x74, 0xc7,
0x97, 0xd1, 0x86, 0xc2, 0xf8, 0x27, 0x05, 0x16, 0xe2, 0x16, 0xb0, 0xbe, 0x63, 0x33, 0x82, 0x6e,
0x43, 0x81, 0x71, 0x9d, 0x0f, 0x58, 0x60, 0xc4, 0xab, 0x52, 0x7d, 0x2d, 0x21, 0xa2, 0x05, 0xa2,
0xe8, 0x2e, 0xcc, 0x51, 0x9b, 0xf2, 0x76, 0x5f, 0x77, 0x75, 0x6b, 0x68, 0xc9, 0x1b, 0xf1, 0x93,
0x21, 0x2b, 0x4d, 0x9b, 0xf2, 0x2d, 0x21, 0xa8, 0x01, 0x0d, 0xbf, 0xf1, 0x23, 0x58, 0x6c, 0xed,
0x39, 0x07, 0xeb, 0x4e, 0xaf, 0x47, 0x0c, 0x4e, 0x1d, 0xfb, 0xf9, 0x49, 0x41, 0x90, 0x37, 0x3b,
0xcd, 0x86, 0xb0, 0x23, 0xa7, 0x89, 0x6f, 0xcc, 0x60, 0x29, 0xa9, 0x7e, 0x1a, 0x8f, 0xdf, 0x82,
0x8b, 0x46, 0xa8, 0xaa, 0xd9, 0xf0, 0x7c, 0xce, 0xad, 0xe4, 0xb4, 0xf8, 0x26, 0xfe, 0x4e, 0x81,
0xc5, 0x4f, 0x1d, 0xdd, 0x3c, 0x23, 0xa7, 0x10, 0x86, 0x52, 0x14, 0xb0, 0x96, 0x13, 0xff, 0x62,
0x7b, 0xf8, 0x7b, 0x05, 0x6a, 0x1a, 0xe9, 0x11, 0x9d, 0x91, 0xf3, 0x34, 0xe3, 0x5b, 0x05, 0x16,
0xbc, 0x00, 0x6c, 0xe9, 0x2e, 0xa7, 0xe7, 0x63, 0x42, 0xdf, 0xcf, 0xb0, 0x88, 0x05, 0xd3, 0x64,
0x00, 0x86, 0x52, 0x7f, 0xa8, 0xe9, 0x38, 0x01, 0x62, 0x7b, 0xd8, 0x82, 0x4a, 0x88, 0xe6, 0x1d,
0x27, 0x0c, 0x5d, 0x85, 0xb9, 0x88, 0x88, 0x00, 0xcc, 0x69, 0xd1, 0x2d, 0xf4, 0x21, 0xcc, 0x78,
0x10, 0x44, 0xf8, 0x57, 0x5e, 0xc3, 0xf5, 0xd1, 0xfa, 0x53, 0x8f, 0x6b, 0xd5, 0xfc, 0x03, 0xf8,
0x37, 0x05, 0x96, 0x12, 0x78, 0x2f, 0x9d, 0xe5, 0x11, 0x5e, 0xf2, 0x12, 0x5e, 0xfe, 0x54, 0x60,
0x79, 0xc4, 0xd0, 0x69, 0x82, 0xb1, 0x03, 0x4b, 0x21, 0x40, 0xdb, 0x24, 0xcc, 0x70, 0x69, 0xdf,
0xfb, 0xf6, 0xc3, 0x32, 0xb7, 0xf6, 0x66, 0x3a, 0x89, 0x4c, 0x5b, 0x0c, 0x55, 0x34, 0x22, 0x1a,
0xf0, 0xaf, 0x0a, 0x2c, 0x78, 0x97, 0xf8, 0xfc, 0x32, 0x77, 0x22, 0x4e, 0x7f, 0x57, 0x60, 0x39,
0xb8, 0xe7, 0xff, 0x73, 0x4b, 0x7f, 0x56, 0x40, 0x5d, 0x77, 0x89, 0xce, 0xc9, 0xe7, 0x5e, 0x1c,
0xd6, 0xf7, 0x74, 0xdb, 0x26, 0xbd, 0xe9, 0x12, 0xe0, 0x3a, 0x54, 0x5c, 0xdf, 0xd9, 0xb6, 0xe1,
0xeb, 0x13, 0xa6, 0x17, 0xb5, 0x72, 0xb0, 0x1d, 0xa0, 0xa0, 0xb7, 0xa1, 0xec, 0x12, 0x36, 0xe8,
0x1d, 0xcb, 0xe5, 0x84, 0xdc, 0x45, 0x7f, 0x37, 0x10, 0xc3, 0xbf, 0x28, 0xb0, 0x7c, 0xc7, 0x34,
0xa3, 0x06, 0x4e, 0x71, 0x97, 0x6e, 0xc1, 0xa5, 0x84, 0x75, 0x01, 0xb5, 0x45, 0xad, 0x1a, 0xb7,
0xaf, 0xd9, 0x40, 0x37, 0xa0, 0x1a, 0xb7, 0x30, 0xa0, 0xba, 0xa8, 0x55, 0x62, 0x36, 0x36, 0x1b,
0xf8, 0x6f, 0x05, 0x54, 0x8d, 0x58, 0xce, 0x3e, 0x91, 0x1a, 0xfa, 0x5c, 0x4c, 0x0e, 0xbd, 0xcb,
0x4e, 0xe7, 0x5d, 0xee, 0x14, 0xde, 0xe5, 0xe5, 0xde, 0x3d, 0x86, 0xa5, 0x87, 0x3a, 0x37, 0xf6,
0x1a, 0xd6, 0xf4, 0x11, 0xb8, 0x02, 0x10, 0xe2, 0xf9, 0x45, 0xa1, 0xa8, 0x45, 0x76, 0xf0, 0x1f,
0x59, 0x40, 0xde, 0x25, 0x6f, 0x91, 0xae, 0x45, 0x6c, 0xfe, 0xf2, 0x2f, 0x4e, 0xe2, 0x5d, 0xc8,
0x8f, 0xbe, 0x0b, 0x57, 0x00, 0x98, 0x6f, 0x9d, 0xe7, 0xc2, 0x8c, 0xb8, 0x58, 0x91, 0x1d, 0xa4,
0xc2, 0x85, 0x5d, 0x4a, 0x7a, 0xa6, 0xf7, 0xb7, 0x20, 0xfe, 0x86, 0x6b, 0xf4, 0x09, 0x94, 0x03,
0xc9, 0xb6, 0x78, 0x2a, 0x58, 0x6d, 0x56, 0x56, 0x17, 0xbd, 0x4e, 0xb5, 0x1e, 0x50, 0x20, 0x8a,
0x62, 0xd3, 0xde, 0x75, 0xb4, 0x8b, 0x2c, 0xb2, 0xc3, 0xf0, 0xbf, 0x0a, 0x2c, 0x06, 0x85, 0xe6,
0xdc, 0xd8, 0x9a, 0xa0, 0xcc, 0x4c, 0xc3, 0x17, 0xfe, 0x51, 0x81, 0xe5, 0x75, 0xc7, 0xea, 0x3b,
0x76, 0xe8, 0xf7, 0x74, 0xf5, 0xe9, 0x23, 0xff, 0x10, 0x19, 0x36, 0xc7, 0xd7, 0xc6, 0x34, 0xc7,
0x49, 0xd0, 0xe0, 0x14, 0xfe, 0x47, 0x81, 0xb9, 0x80, 0x6d, 0x2f, 0x26, 0xe8, 0x32, 0x14, 0x43,
0x57, 0x82, 0x26, 0xe2, 0x78, 0x63, 0x84, 0xc2, 0x6c, 0x7a, 0xc2, 0xe5, 0x46, 0x13, 0xee, 0x15,
0xb8, 0x60, 0x11, 0xab, 0xcd, 0xe8, 0x11, 0x09, 0xf2, 0x71, 0xd6, 0x22, 0x56, 0x8b, 0x1e, 0x11,
0xef, 0x97, 0x3d, 0xb0, 0xda, 0xae, 0x73, 0xe0, 0x31, 0x2b, 0x7e, 0xd9, 0x03, 0x4b, 0x73, 0x0e,
0x18, 0x7a, 0x0d, 0x80, 0xda, 0x26, 0x79, 0xd6, 0xb6, 0x75, 0x8b, 0xd4, 0x0a, 0xe2, 0x6a, 0x17,
0xc5, 0xce, 0xa6, 0x6e, 0x11, 0x54, 0x83, 0x59, 0xb1, 0x68, 0x36, 0x6a, 0xb3, 0xfe, 0xc1, 0x60,
0x89, 0x77, 0x01, 0x45, 0x3c, 0x9c, 0xea, 0xaa, 0x47, 0xe2, 0x9e, 0x4d, 0xc6, 0xdd, 0x6b, 0xca,
0xe7, 0x63, 0x40, 0xd3, 0xc4, 0xf5, 0x7d, 0x98, 0xa1, 0xf6, 0xae, 0x33, 0xec, 0x33, 0x5e, 0x97,
0xf5, 0x19, 0x51, 0x30, 0x5f, 0xfa, 0xe6, 0x11, 0x94, 0xe3, 0xdd, 0x07, 0x2a, 0xc1, 0x85, 0x4d,
0x87, 0xdf, 0x7b, 0x46, 0x19, 0xaf, 0x66, 0x50, 0x19, 0x60, 0xd3, 0xe1, 0x5b, 0x2e, 0x61, 0xc4,
0xe6, 0x55, 0x05, 0x01, 0x14, 0x3e, 0xb3, 0x1b, 0x94, 0x3d, 0xa9, 0x66, 0xd1, 0x7c, 0xd0, 0x54,
0xea, 0xbd, 0xa6, 0x7d, 0x9f, 0x58, 0x8e, 0x7b, 0x58, 0xcd, 0x79, 0xc7, 0xc3, 0x55, 0x1e, 0x55,
0xa1, 0x14, 0x8a, 0x6c, 0x6c, 0x3d, 0xa8, 0xce, 0xa0, 0x22, 0xcc, 0xf8, 0x9f, 0x85, 0xb5, 0x1f,
0x00, 0x4a, 0xe2, 0xb9, 0x68, 0xf9, 0x23, 0x29, 0x32, 0xa0, 0x14, 0x1d, 0x05, 0xd1, 0x75, 0x99,
0x13, 0x92, 0x71, 0x55, 0x5d, 0x49, 0x17, 0xf4, 0xb9, 0xc5, 0x19, 0xf4, 0x18, 0x2a, 0xf1, 0xf9,
0x8b, 0xa1, 0x1b, 0x52, 0xb2, 0x64, 0x33, 0xa0, 0x7a, 0x73, 0x12, 0xd1, 0x10, 0xab, 0x0b, 0xe5,
0x58, 0xa3, 0xcf, 0xd0, 0xca, 0xb8, 0xf3, 0xc9, 0x56, 0x49, 0xbd, 0x31, 0x81, 0x64, 0x08, 0xf4,
0x05, 0x94, 0x63, 0x9d, 0xe1, 0x18, 0x20, 0x59, 0xf7, 0xa8, 0x9e, 0x94, 0x5e, 0x38, 0x83, 0xda,
0x70, 0x29, 0xd9, 0xcd, 0x31, 0x74, 0x4b, 0x4e, 0xb8, 0xb4, 0xe9, 0x4b, 0x03, 0xd8, 0xf1, 0x6d,
0x3f, 0x26, 0x50, 0x1e, 0x0f, 0xe9, 0xf8, 0x9a, 0xa6, 0xfb, 0xeb, 0xd0, 0xf8, 0x88, 0xfa, 0x77,
0x4e, 0x30, 0xfe, 0xd4, 0x08, 0x1d, 0x40, 0xa3, 0x2d, 0x24, 0x52, 0xa5, 0x87, 0xee, 0x59, 0x7d,
0x7e, 0xa8, 0xd6, 0x65, 0xf0, 0xe3, 0xdb, 0x50, 0x9c, 0x41, 0x0f, 0x01, 0x6d, 0x10, 0xbe, 0x4d,
0x2d, 0xb2, 0x4d, 0x8d, 0x27, 0x93, 0x60, 0x24, 0x9e, 0xd3, 0x60, 0xd1, 0xe2, 0x2e, 0xb5, 0xbb,
0xb1, 0xb4, 0x59, 0xd8, 0x20, 0xa2, 0xc2, 0x53, 0xc6, 0xa9, 0xc1, 0x5e, 0xa0, 0x6a, 0x47, 0xd8,
0x9c, 0x1c, 0x3a, 0x6f, 0x4e, 0x32, 0xfe, 0x04, 0xc4, 0xdf, 0x9a, 0x48, 0x36, 0x04, 0xdc, 0x11,
0x80, 0x89, 0x67, 0xeb, 0x44, 0x4f, 0x26, 0x7c, 0xfa, 0x70, 0x06, 0x19, 0x50, 0xf6, 0x78, 0x8a,
0x3c, 0x7b, 0xd7, 0xd2, 0xea, 0x6b, 0xe0, 0xc4, 0xf5, 0x54, 0xb9, 0xa1, 0x03, 0x6b, 0x7f, 0x15,
0xa0, 0x28, 0x12, 0x40, 0xd4, 0xbe, 0x33, 0x8b, 0xf9, 0x36, 0x54, 0x82, 0x98, 0xbf, 0xc8, 0x70,
0xb7, 0x4f, 0xcd, 0xbe, 0x34, 0xbc, 0x63, 0x5a, 0x1d, 0x9c, 0x41, 0x8f, 0xa0, 0x92, 0x18, 0x83,
0xe4, 0x45, 0x68, 0xcc, 0xac, 0x94, 0x76, 0x8d, 0x0d, 0x40, 0xa3, 0xf3, 0x0b, 0xaa, 0xcb, 0x2b,
0xc5, 0xb8, 0x39, 0x27, 0x0d, 0xe4, 0x2b, 0xa8, 0x24, 0xe6, 0x08, 0xf9, 0x85, 0x90, 0x0f, 0x1b,
0x69, 0xda, 0x1f, 0x40, 0x29, 0x32, 0x38, 0x30, 0x79, 0x8a, 0x8e, 0x8e, 0x16, 0x69, 0x6a, 0xbf,
0x84, 0x4a, 0xbc, 0xc9, 0x1e, 0xf3, 0x5e, 0x4a, 0x3b, 0xf1, 0x74, 0xda, 0xcf, 0xfe, 0x62, 0xdd,
0xbd, 0xb3, 0xf3, 0x71, 0x97, 0xf2, 0xbd, 0x41, 0xc7, 0x83, 0x5f, 0x3d, 0xa2, 0xbd, 0x1e, 0x3d,
0xe2, 0xc4, 0xd8, 0x5b, 0xf5, 0x35, 0xbc, 0x6b, 0x52, 0xc6, 0x5d, 0xda, 0x19, 0x70, 0x62, 0xae,
0x0e, 0x8b, 0xc0, 0xaa, 0x50, 0xbb, 0x2a, 0xd4, 0xf6, 0x3b, 0x9d, 0x82, 0x58, 0xde, 0xfe, 0x2f,
0x00, 0x00, 0xff, 0xff, 0x36, 0xce, 0x45, 0x9b, 0x98, 0x17, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
......
......@@ -132,11 +132,63 @@ func (node *NodeImpl) HasCollection(request *milvuspb.HasCollectionRequest) (*mi
}
func (node *NodeImpl) LoadCollection(request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
panic("implement me")
log.Println("load collection: ", request)
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
lct := &LoadCollectionTask{
Condition: NewTaskCondition(ctx),
LoadCollectionRequest: request,
queryserviceClient: node.queryServiceClient,
}
err := node.sched.DdQueue.Enqueue(lct)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
err = lct.WaitToFinish()
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
return lct.result, nil
}
func (node *NodeImpl) ReleaseCollection(request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
panic("implement me")
log.Println("release collection: ", request)
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
rct := &ReleaseCollectionTask{
Condition: NewTaskCondition(ctx),
ReleaseCollectionRequest: request,
queryserviceClient: node.queryServiceClient,
}
err := node.sched.DdQueue.Enqueue(rct)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
err = rct.WaitToFinish()
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
return rct.result, nil
}
func (node *NodeImpl) DescribeCollection(request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
......@@ -332,11 +384,63 @@ func (node *NodeImpl) HasPartition(request *milvuspb.HasPartitionRequest) (*milv
}
func (node *NodeImpl) LoadPartitions(request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error) {
panic("implement me")
log.Println("load partitions: ", request)
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
lpt := &LoadPartitionTask{
Condition: NewTaskCondition(ctx),
LoadPartitonRequest: request,
queryserviceClient: node.queryServiceClient,
}
err := node.sched.DdQueue.Enqueue(lpt)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
err = lpt.WaitToFinish()
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
return lpt.result, nil
}
func (node *NodeImpl) ReleasePartitions(request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) {
panic("implement me")
log.Println("load partitions: ", request)
ctx, cancel := context.WithTimeout(context.Background(), reqTimeoutInterval)
defer cancel()
rpt := &ReleasePartitionTask{
Condition: NewTaskCondition(ctx),
ReleasePartitionRequest: request,
queryserviceClient: node.queryServiceClient,
}
err := node.sched.DdQueue.Enqueue(rpt)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
err = rpt.WaitToFinish()
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, nil
}
return rpt.result, nil
}
func (node *NodeImpl) GetPartitionStatistics(request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error) {
......
package proxynode
import (
"context"
"errors"
"log"
"math"
......@@ -1519,7 +1518,6 @@ type LoadCollectionTask struct {
*milvuspb.LoadCollectionRequest
queryserviceClient QueryServiceClient
result *commonpb.Status
ctx context.Context
}
func (lct *LoadCollectionTask) OnEnqueue() error {
......@@ -1592,7 +1590,6 @@ type ReleaseCollectionTask struct {
*milvuspb.ReleaseCollectionRequest
queryserviceClient QueryServiceClient
result *commonpb.Status
ctx context.Context
}
func (rct *ReleaseCollectionTask) OnEnqueue() error {
......@@ -1665,7 +1662,6 @@ type LoadPartitionTask struct {
*milvuspb.LoadPartitonRequest
queryserviceClient QueryServiceClient
result *commonpb.Status
ctx context.Context
}
func (lpt *LoadPartitionTask) OnEnqueue() error {
......@@ -1747,7 +1743,6 @@ type ReleasePartitionTask struct {
*milvuspb.ReleasePartitionRequest
queryserviceClient QueryServiceClient
result *commonpb.Status
ctx context.Context
}
func (rpt *ReleasePartitionTask) OnEnqueue() error {
......
......@@ -445,18 +445,20 @@ func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.S
}
// segments are ordered before LoadSegments calling
if in.LastSegmentState.State == commonpb.SegmentState_SegmentGrowing {
segmentNum := len(segmentIDs)
position := in.LastSegmentState.StartPosition
err = node.loadService.seekSegment(position)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
for i, state := range in.SegmentStates {
if state.State == commonpb.SegmentState_SegmentGrowing {
position := state.StartPosition
err = node.loadService.seekSegment(position)
if err != nil {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
return status, err
}
return status, err
segmentIDs = segmentIDs[:i]
break
}
segmentIDs = segmentIDs[:segmentNum-1]
}
err = node.loadService.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs)
......
......@@ -13,6 +13,9 @@ type metaReplica interface {
loadPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error)
updatePartitionState(dbID UniqueID, collectionID UniqueID, partitionID UniqueID, state querypb.PartitionState) error
getPartitionStates(dbID UniqueID, collectionID UniqueID, partitionIDs []UniqueID) ([]*querypb.PartitionStates, error)
releaseCollection(dbID UniqueID, collectionID UniqueID) error
releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error
addDmChannels(collectionID UniqueID, channels []string) error
}
type segment struct {
......@@ -26,9 +29,9 @@ type partition struct {
}
type collection struct {
id UniqueID
partitions map[UniqueID]*partition
node2channel map[int][]string
id UniqueID
partitions map[UniqueID]*partition
dmChannelNames []string
}
type metaReplicaImpl struct {
......@@ -48,18 +51,18 @@ func newMetaReplica() metaReplica {
}
func (mp *metaReplicaImpl) addCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) {
//TODO:: assert dbID = 0 exist
if _, ok := mp.db2collections[dbID]; ok {
partitions := make(map[UniqueID]*partition)
node2channel := make(map[int][]string)
newCollection := &collection{
id: collectionID,
partitions: partitions,
node2channel: node2channel,
id: collectionID,
partitions: partitions,
}
mp.db2collections[dbID] = append(mp.db2collections[dbID], newCollection)
return newCollection, nil
}
return nil, errors.New("can't find dbID when add collection")
return nil, errors.New("addCollection: can't find dbID when add collection")
}
func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) (*partition, error) {
......@@ -78,7 +81,7 @@ func (mp *metaReplicaImpl) addPartition(dbID UniqueID, collectionID UniqueID, pa
}
}
}
return nil, errors.New("can't find collection when add partition")
return nil, errors.New("addPartition: can't find collection when add partition")
}
func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error) {
......@@ -86,7 +89,7 @@ func (mp *metaReplicaImpl) getCollections(dbID UniqueID) ([]*collection, error)
return collections, nil
}
return nil, errors.New("can't find collectionID")
return nil, errors.New("getCollections: can't find collectionID")
}
func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) ([]*partition, error) {
......@@ -102,7 +105,7 @@ func (mp *metaReplicaImpl) getPartitions(dbID UniqueID, collectionID UniqueID) (
}
}
return nil, errors.New("can't find partitionIDs")
return nil, errors.New("getPartitions: can't find partitionIDs")
}
func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) ([]*segment, error) {
......@@ -119,7 +122,7 @@ func (mp *metaReplicaImpl) getSegments(dbID UniqueID, collectionID UniqueID, par
}
}
}
return nil, errors.New("can't find segmentID")
return nil, errors.New("getSegments: can't find segmentID")
}
func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID) (*collection, error) {
......@@ -127,14 +130,16 @@ func (mp *metaReplicaImpl) loadCollection(dbID UniqueID, collectionID UniqueID)
if collections, err := mp.getCollections(dbID); err == nil {
for _, collection := range collections {
if collectionID == collection.id {
return res, nil
res = collection
}
}
} else {
res, err = mp.addCollection(dbID, collectionID)
}
if res == nil {
collection, err := mp.addCollection(dbID, collectionID)
if err != nil {
return nil, err
}
res = collection
}
return res, nil
}
......@@ -177,7 +182,7 @@ func (mp *metaReplicaImpl) updatePartitionState(dbID UniqueID,
}
}
}
return errors.New("update partition state fail")
return errors.New("updatePartitionState: update partition state fail")
}
func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID,
......@@ -203,3 +208,54 @@ func (mp *metaReplicaImpl) getPartitionStates(dbID UniqueID,
}
return partitionStates, nil
}
func (mp *metaReplicaImpl) releaseCollection(dbID UniqueID, collectionID UniqueID) error {
if collections, ok := mp.db2collections[dbID]; ok {
for i, collection := range collections {
if collectionID == collection.id {
collections = append(collections[:i], collections[i+1:]...)
return nil
}
}
}
return errors.New("releaseCollection: can't find dbID or collectionID")
}
func (mp *metaReplicaImpl) releasePartition(dbID UniqueID, collectionID UniqueID, partitionID UniqueID) error {
if collections, ok := mp.db2collections[dbID]; ok {
for _, collection := range collections {
if collectionID == collection.id {
if _, ok := collection.partitions[partitionID]; ok {
delete(collection.partitions, partitionID)
return nil
}
}
}
}
return errors.New("releasePartition: can't find dbID or collectionID or partitionID")
}
func (mp *metaReplicaImpl) addDmChannels(collectionID UniqueID, channels []string) error {
//TODO :: use dbID
if collections, ok := mp.db2collections[0]; ok {
for _, collection := range collections {
if collectionID == collection.id {
dmChannels := collection.dmChannelNames
for _, channel := range channels {
match := false
for _, existedChannel := range dmChannels {
if channel == existedChannel {
match = true
break
}
}
if !match {
dmChannels = append(dmChannels, channel)
}
}
return nil
}
}
}
return errors.New("addDmChannels: can't find dbID or collectionID")
}
......@@ -8,8 +8,6 @@ import (
type queryNodeInfo struct {
client QueryNodeInterface
insertChannels string
nodeID uint64
segments []UniqueID
dmChannelNames []string
}
......@@ -25,3 +23,21 @@ func (qn *queryNodeInfo) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb
func (qn *queryNodeInfo) GetSegmentInfo(in *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) {
return qn.client.GetSegmentInfo(in)
}
func (qn *queryNodeInfo) WatchDmChannels(in *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
return qn.client.WatchDmChannels(in)
}
func (qn *queryNodeInfo) AddDmChannels(channels []string) {
qn.dmChannelNames = append(qn.dmChannelNames, channels...)
}
func newQueryNodeInfo(client QueryNodeInterface) *queryNodeInfo {
segments := make([]UniqueID, 0)
dmChannelNames := make([]string, 0)
return &queryNodeInfo{
client: client,
segments: segments,
dmChannelNames: dmChannelNames,
}
}
......@@ -3,7 +3,6 @@ package queryservice
import (
"context"
"fmt"
"log"
"sort"
"strconv"
"sync/atomic"
......@@ -25,6 +24,7 @@ type MasterServiceInterface interface {
type DataServiceInterface interface {
GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error)
GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error)
}
type QueryNodeInterface interface {
......@@ -47,7 +47,7 @@ type QueryService struct {
dataServiceClient DataServiceInterface
masterServiceClient MasterServiceInterface
queryNodes []*queryNodeInfo
queryNodes map[UniqueID]*queryNodeInfo
numRegisterNode uint64
numQueryChannel uint64
......@@ -87,7 +87,7 @@ func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, erro
componentStates, err := node.GetComponentStates()
if err != nil {
subComponentInfos = append(subComponentInfos, &internalpb2.ComponentInfo{
NodeID: int64(nodeID),
NodeID: nodeID,
StateCode: internalpb2.StateCode_ABNORMAL,
})
continue
......@@ -111,28 +111,21 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) {
return Params.StatsChannelName, nil
}
// TODO:: do addWatchDmChannel to query node after registerNode
func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
fmt.Println("register query node =", req.Address)
// TODO:: add mutex
allocatedID := uint64(len(qs.queryNodes))
allocatedID := len(qs.queryNodes)
registerNodeAddress := req.Address.Ip + ":" + strconv.FormatInt(req.Address.Port, 10)
var node *queryNodeInfo
if qs.enableGrpc {
client := nodeclient.NewClient(registerNodeAddress)
node = &queryNodeInfo{
client: client,
nodeID: allocatedID,
}
node = newQueryNodeInfo(client)
} else {
client := querynode.NewQueryNode(qs.loopCtx, allocatedID)
node = &queryNodeInfo{
client: client,
nodeID: allocatedID,
}
client := querynode.NewQueryNode(qs.loopCtx, uint64(allocatedID))
node = newQueryNodeInfo(client)
}
qs.queryNodes = append(qs.queryNodes, node)
qs.queryNodes[UniqueID(allocatedID)] = node
//TODO::return init params to queryNode
return &querypb.RegisterNodeResponse{
......@@ -186,10 +179,20 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com
if err != nil {
return fn(err), err
}
if collection == nil {
if len(collection.dmChannelNames) != 0 {
return fn(nil), nil
}
channelRequest := datapb.InsertChannelRequest{
DbID: req.DbID,
CollectionID: req.CollectionID,
}
dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
if err != nil {
return fn(err), err
}
// get partitionIDs
showPartitionRequest := &milvuspb.ShowPartitionRequest{
Base: &commonpb.MsgBase{
......@@ -207,6 +210,21 @@ func (qs *QueryService) LoadCollection(req *querypb.LoadCollectionRequest) (*com
}
partitionIDs := showPartitionResponse.PartitionIDs
if len(partitionIDs) == 0 {
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
}
for _, node := range qs.queryNodes {
_, err := node.LoadSegments(loadSegmentRequest)
if err != nil {
return fn(err), err
}
}
nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels)
err = qs.watchDmChannels(dmChannels, nodeIDs, collection)
return fn(err), err
}
loadPartitionsRequest := &querypb.LoadPartitionRequest{
Base: req.Base,
DbID: dbID,
......@@ -244,6 +262,14 @@ func (qs *QueryService) ReleaseCollection(req *querypb.ReleaseCollectionRequest)
status, err := qs.ReleasePartitions(releasePartitionRequest)
err = qs.replica.releaseCollection(dbID, collectionID)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, err
}
//TODO:: queryNode cancel subscribe dmChannels
return status, err
}
......@@ -276,16 +302,40 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
dbID := req.DbID
collectionID := req.CollectionID
partitionIDs := req.PartitionIDs
qs.replica.loadPartition(dbID, collectionID, partitionIDs[0])
fn := func(err error) *commonpb.Status {
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
}
// get segments and load segment to query node
if len(partitionIDs) == 0 {
err := errors.New("partitionIDs are empty")
return fn(err), err
}
var collection *collection = nil
var err error
if collection, err = qs.replica.loadCollection(dbID, collectionID); err != nil {
return fn(err), err
}
for _, partitionID := range partitionIDs {
partition, err := qs.replica.loadPartition(dbID, collectionID, partitionID)
if err != nil {
return fn(err), err
}
if partition == nil {
return fn(err), nil
}
showSegmentRequest := &milvuspb.ShowSegmentRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_kShowSegment,
......@@ -297,92 +347,110 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm
if err != nil {
return fn(err), err
}
if showSegmentResponse.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Fatal("showSegment fail, v%", showSegmentResponse.Status.Reason)
}
segmentIDs := showSegmentResponse.SegmentIDs
segmentStates := make(map[UniqueID]*datapb.SegmentStateInfo)
channel2id := make(map[string]int)
//id2channels := make(map[int][]string)
id2segs := make(map[int][]UniqueID)
offset := 0
channel2segs := make(map[string][]UniqueID)
resp, err := qs.dataServiceClient.GetSegmentStates(&datapb.SegmentStatesRequest{
SegmentIDs: segmentIDs,
})
if err != nil {
log.Fatal("get segment states fail")
return fn(err), err
}
for _, state := range resp.States {
segmentID := state.SegmentID
segmentStates[segmentID] = state
var flatChannelName string
// channelNames := make([]string, 0)
// for i, str := range state.StartPositions {
// flatChannelName += str.ChannelName
// channelNames = append(channelNames, str.ChannelName)
// if i+1 < len(state.StartPositions) {
// flatChannelName += "/"
// }
// }
if flatChannelName == "" {
log.Fatal("segmentState's channel name is empty")
}
if _, ok := channel2id[flatChannelName]; !ok {
channel2id[flatChannelName] = offset
//id2channels[offset] = channelNames
id2segs[offset] = make([]UniqueID, 0)
id2segs[offset] = append(id2segs[offset], segmentID)
offset++
channelName := state.StartPosition.ChannelName
if _, ok := channel2segs[channelName]; !ok {
segments := make([]UniqueID, 0)
segments = append(segments, segmentID)
channel2segs[channelName] = segments
} else {
//TODO::check channel name
id := channel2id[flatChannelName]
id2segs[id] = append(id2segs[id], segmentID)
channel2segs[channelName] = append(channel2segs[channelName], segmentID)
}
}
for key, value := range id2segs {
sort.Slice(value, func(i, j int) bool { return segmentStates[value[i]].CreateTime < segmentStates[value[j]].CreateTime })
selectedSegs := make([]UniqueID, 0)
for i, v := range value {
if segmentStates[v].State == commonpb.SegmentState_SegmentFlushed {
selectedSegs = append(selectedSegs, v)
} else {
if i > 0 && segmentStates[selectedSegs[i-1]].State != commonpb.SegmentState_SegmentFlushed {
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory)
for channel, segmentIDs := range channel2segs {
sort.Slice(segmentIDs, func(i, j int) bool {
return segmentStates[segmentIDs[i]].StartPosition.Timestamp < segmentStates[segmentIDs[j]].StartPosition.Timestamp
})
var channelLoadDone = false
for _, node := range qs.queryNodes {
channels2node := node.dmChannelNames
for _, ch := range channels2node {
if channel == ch {
channelLoadDone = true
break
}
selectedSegs = append(selectedSegs, v)
}
if channelLoadDone {
break
}
}
if !channelLoadDone {
states := make([]*datapb.SegmentStateInfo, 0)
for _, id := range segmentIDs {
states = append(states, segmentStates[id])
}
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
PartitionID: partitionID,
SegmentIDs: segmentIDs,
SegmentStates: states,
}
dmChannels := []string{channel}
nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels)
err = qs.watchDmChannels(dmChannels, nodeIDs, collection)
if err != nil {
return fn(err), err
}
queryNode := qs.queryNodes[nodeIDs[0]]
//TODO:: seek when loadSegment may cause more msgs consumed
status, err := queryNode.LoadSegments(loadSegmentRequest)
if err != nil {
return status, err
}
}
id2segs[key] = selectedSegs
}
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_PartialInMemory)
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory)
}
// TODO:: filter channel for query node
for channels, i := range channel2id {
for key, node := range qs.queryNodes {
if channels == node.insertChannels {
statesID := id2segs[i][len(id2segs[i])-1]
//TODO :: should be start position
// position := segmentStates[statesID-1].StartPositions
// segmentStates[statesID].StartPositions = position
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
PartitionID: partitionID,
SegmentIDs: id2segs[i],
LastSegmentState: segmentStates[statesID],
}
status, err := qs.queryNodes[key].LoadSegments(loadSegmentRequest)
if err != nil {
return status, err
}
if len(collection.dmChannelNames) == 0 {
channelRequest := datapb.InsertChannelRequest{
DbID: dbID,
CollectionID: collectionID,
}
dmChannels, err := qs.dataServiceClient.GetInsertChannels(&channelRequest)
if err != nil {
return fn(err), err
}
for _, partitionID := range partitionIDs {
loadSegmentRequest := &querypb.LoadSegmentRequest{
CollectionID: collectionID,
PartitionID: partitionID,
}
for _, node := range qs.queryNodes {
_, err := node.LoadSegments(loadSegmentRequest)
if err != nil {
return fn(err), nil
}
}
nodeIDs := qs.shuffleChannelsToQueryNode(dmChannels)
err = qs.watchDmChannels(dmChannels, nodeIDs, collection)
}
if err != nil {
return fn(err), err
}
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory)
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
......@@ -407,6 +475,13 @@ func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest)
}
segmentIDs = append(segmentIDs, res...)
err = qs.replica.releasePartition(dbID, collectionID, partitionID)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: err.Error(),
}, err
}
}
releaseSegmentRequest := &querypb.ReleaseSegmentRequest{
Base: req.Base,
......@@ -423,6 +498,7 @@ func (qs *QueryService) ReleasePartitions(req *querypb.ReleasePartitionRequest)
}
}
//TODO:: queryNode cancel subscribe dmChannels
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}, nil
......@@ -486,14 +562,14 @@ func (qs *QueryService) GetSegmentInfo(req *querypb.SegmentInfoRequest) (*queryp
}
func NewQueryService(ctx context.Context) (*QueryService, error) {
nodes := make([]*queryNodeInfo, 0)
nodes := make(map[UniqueID]*queryNodeInfo)
ctx1, cancel := context.WithCancel(ctx)
replica := newMetaReplica()
service := &QueryService{
loopCtx: ctx1,
loopCancel: cancel,
queryNodes: nodes,
replica: replica,
queryNodes: nodes,
numRegisterNode: 0,
numQueryChannel: 0,
enableGrpc: false,
......@@ -514,3 +590,77 @@ func (qs *QueryService) SetDataService(dataService DataServiceInterface) {
func (qs *QueryService) SetEnableGrpc(en bool) {
qs.enableGrpc = en
}
func (qs *QueryService) watchDmChannels(dmChannels []string, assignedNodeIDs []UniqueID, collection *collection) error {
err := qs.replica.addDmChannels(collection.id, dmChannels)
if err != nil {
return err
}
node2channels := make(map[UniqueID][]string)
for i, channel := range dmChannels {
nodeID := assignedNodeIDs[i]
findChannel := false
for _, ch := range collection.dmChannelNames {
if channel == ch {
findChannel = true
}
}
if !findChannel {
if _, ok := node2channels[nodeID]; ok {
node2channels[nodeID] = append(node2channels[nodeID], channel)
} else {
channels := make([]string, 0)
channels = append(channels, channel)
node2channels[nodeID] = channels
}
}
}
for nodeID, channels := range node2channels {
node := qs.queryNodes[nodeID]
request := &querypb.WatchDmChannelsRequest{
ChannelIDs: channels,
}
_, err := node.WatchDmChannels(request)
node.AddDmChannels(channels)
if err != nil {
return err
}
}
return nil
}
func (qs *QueryService) shuffleChannelsToQueryNode(dmChannels []string) []UniqueID {
maxNumDMChannel := 0
res := make([]UniqueID, 0)
node2lens := make(map[UniqueID]int)
for id, node := range qs.queryNodes {
node2lens[id] = len(node.dmChannelNames)
}
offset := 0
for {
lastOffset := offset
for id, len := range node2lens {
if len >= maxNumDMChannel {
maxNumDMChannel = len
} else {
res = append(res, id)
node2lens[id]++
offset++
}
}
if lastOffset == offset {
for id := range node2lens {
res = append(res, id)
node2lens[id]++
offset++
break
}
}
if offset == len(dmChannels) {
break
}
}
return res
}
......@@ -155,6 +155,9 @@ func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datap
return ret, nil
}
func (data *dataMock) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) {
return []string{"test-insert"}, nil
}
func TestQueryService_Init(t *testing.T) {
service, err := NewQueryService(context.Background())
......
......@@ -63,13 +63,19 @@ func (producer *MsgProducer) broadcastMsg() {
func (producer *MsgProducer) Start(ctx context.Context) {
producer.ctx, producer.cancel = context.WithCancel(ctx)
producer.wg.Add(1 + len(producer.watchers))
producer.wg.Add(2 + len(producer.watchers))
go producer.startTTBarrier()
for _, watcher := range producer.watchers {
go producer.startWatcher(watcher)
}
go producer.broadcastMsg()
}
func (producer *MsgProducer) startTTBarrier() {
defer producer.wg.Done()
producer.ttBarrier.StartBackgroundLoop(producer.ctx)
}
func (producer *MsgProducer) startWatcher(watcher TimeTickWatcher) {
defer producer.wg.Done()
watcher.StartBackgroundLoop(producer.ctx)
......
......@@ -18,7 +18,7 @@ type (
TimeTickBarrier interface {
GetTimeTick() (Timestamp, error)
StartBackgroundLoop()
StartBackgroundLoop(ctx context.Context)
}
softTimeTickBarrier struct {
......@@ -38,7 +38,7 @@ type (
}
)
func NewSoftTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier {
func NewSoftTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID, minTtInterval Timestamp) *softTimeTickBarrier {
if len(peerIds) <= 0 {
log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!\n")
return nil
......@@ -49,7 +49,6 @@ func NewSoftTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds
sttbarrier.ttStream = ttStream
sttbarrier.outTt = make(chan Timestamp, 1024)
sttbarrier.peer2LastTt = make(map[UniqueID]Timestamp)
sttbarrier.ctx = ctx
for _, id := range peerIds {
sttbarrier.peer2LastTt[id] = Timestamp(0)
}
......@@ -80,11 +79,12 @@ func (ttBarrier *softTimeTickBarrier) GetTimeTick() (Timestamp, error) {
}
}
func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop() {
func (ttBarrier *softTimeTickBarrier) StartBackgroundLoop(ctx context.Context) {
ttBarrier.ctx = ctx
for {
select {
case <-ttBarrier.ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err())
case <-ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ctx.Err())
return
default:
}
......@@ -137,13 +137,14 @@ func (ttBarrier *hardTimeTickBarrier) GetTimeTick() (Timestamp, error) {
}
}
func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop() {
func (ttBarrier *hardTimeTickBarrier) StartBackgroundLoop(ctx context.Context) {
ttBarrier.ctx = ctx
// Last timestamp synchronized
state := Timestamp(0)
for {
select {
case <-ttBarrier.ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ttBarrier.ctx.Err())
case <-ctx.Done():
log.Printf("[TtBarrierStart] %s\n", ctx.Err())
return
default:
}
......@@ -187,7 +188,7 @@ func (ttBarrier *hardTimeTickBarrier) minTimestamp() Timestamp {
return tempMin
}
func NewHardTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier {
func NewHardTimeTickBarrier(ttStream ms.MsgStream, peerIds []UniqueID) *hardTimeTickBarrier {
if len(peerIds) <= 0 {
log.Printf("[newSoftTimeTickBarrier] Error: peerIds is empty!")
return nil
......@@ -198,7 +199,6 @@ func NewHardTimeTickBarrier(ctx context.Context, ttStream ms.MsgStream, peerIds
sttbarrier.outTt = make(chan Timestamp, 1024)
sttbarrier.peer2Tt = make(map[UniqueID]Timestamp)
sttbarrier.ctx = ctx
for _, id := range peerIds {
sttbarrier.peer2Tt[id] = Timestamp(0)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册