From 90d471b133a6f5abf7d1170277748274a0ef9e6b Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Tue, 2 Feb 2021 09:52:42 +0800 Subject: [PATCH] Refactor main of queryservice and querynode Signed-off-by: XuanYang-cn --- Makefile | 2 + cmd/distributed/components/query_node.go | 231 +++++++++++++++++- cmd/distributed/components/query_service.go | 131 +++++++++- cmd/querynode/querynode.go | 40 +-- cmd/queryservice/queryservice.go | 17 +- internal/distributed/datanode/service.go | 12 +- .../distributed/querynode/client/client.go | 52 ++-- internal/distributed/querynode/service.go | 105 ++++---- .../distributed/queryservice/client/client.go | 73 ++++-- internal/distributed/queryservice/service.go | 13 +- 10 files changed, 536 insertions(+), 140 deletions(-) diff --git a/Makefile b/Makefile index f814f645f..c629cf89f 100644 --- a/Makefile +++ b/Makefile @@ -122,6 +122,8 @@ build-go: build-cpp @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxynode $(PWD)/cmd/proxy/node/proxy_node.go 1>/dev/null @echo "Building query service ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/queryservice $(PWD)/cmd/queryservice/queryservice.go 1>/dev/null + @echo "Building query node ..." + @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/querynode.go 1>/dev/null @echo "Building binlog ..." @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/binlog/main.go 1>/dev/null @echo "Building singlenode ..." diff --git a/cmd/distributed/components/query_node.go b/cmd/distributed/components/query_node.go index ffbb154fd..baa96e83a 100644 --- a/cmd/distributed/components/query_node.go +++ b/cmd/distributed/components/query_node.go @@ -2,19 +2,238 @@ package components import ( "context" + "fmt" + "log" + "time" + + dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" + isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" + msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + qns "github.com/zilliztech/milvus-distributed/internal/distributed/querynode" + qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" + + ds "github.com/zilliztech/milvus-distributed/internal/dataservice" + is "github.com/zilliztech/milvus-distributed/internal/indexservice" + ms "github.com/zilliztech/milvus-distributed/internal/masterservice" + qs "github.com/zilliztech/milvus-distributed/internal/queryservice" + + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) -func NewQueryNode(ctx context.Context) (*QueryNode, error) { - return nil, nil +type QueryNode struct { + ctx context.Context + svr *qns.Server + + dataService *dsc.Client + masterService *msc.GrpcClient + indexService *isc.Client + queryService *qsc.Client } -type QueryNode struct { +func NewQueryNode(ctx context.Context) (*QueryNode, error) { + const retry = 10 + const interval = 500 + + svr, err := qns.NewServer(ctx) + if err != nil { + panic(err) + } + + // --- QueryService --- + qs.Params.Init() + log.Println("QueryService address:", qs.Params.Address) + log.Println("Init Query service client ...") + queryService, err := qsc.NewClient(qs.Params.Address, 20*time.Second) + if err != nil { + panic(err) + } + + if err = queryService.Init(); err != nil { + panic(err) + } + + if err = queryService.Start(); err != nil { + panic(err) + } + + var cnt int + for cnt = 0; cnt < retry; cnt++ { + if cnt != 0 { + log.Println("Query service isn't ready ...") + log.Printf("Retrying getting query service's states in ... %v ms", interval) + } + + qsStates, err := queryService.GetComponentStates() + + if err != nil { + continue + } + if qsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + continue + } + if qsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && qsStates.State.StateCode != internalpb2.StateCode_HEALTHY { + continue + } + break + } + if cnt >= retry { + panic("Query service isn't ready") + } + if err := svr.SetQueryService(queryService); err != nil { + panic(err) + } + + // --- Master Service Client --- + ms.Params.Init() + addr := fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port) + log.Println("Master service address:", addr) + log.Println("Init master service client ...") + masterService, err := msc.NewGrpcClient(addr, 20*time.Second) + if err != nil { + panic(err) + } + + if err = masterService.Init(); err != nil { + panic(err) + } + + if err = masterService.Start(); err != nil { + panic(err) + } + + ticker := time.NewTicker(interval * time.Millisecond) + tctx, tcancel := context.WithTimeout(ctx, 10*interval*time.Millisecond) + defer func() { + ticker.Stop() + tcancel() + }() + + for { + var states *internalpb2.ComponentStates + select { + case <-ticker.C: + states, err = masterService.GetComponentStates() + if err != nil { + continue + } + case <-tctx.Done(): + return nil, errors.New("master client connect timeout") + } + if states.State.StateCode == internalpb2.StateCode_HEALTHY { + break + } + } + + if err := svr.SetMasterService(masterService); err != nil { + panic(err) + } + + // --- IndexService --- + is.Params.Init() + log.Println("Index service address:", is.Params.Address) + indexService := isc.NewClient(is.Params.Address) + + if err := indexService.Init(); err != nil { + panic(err) + } + + if err := indexService.Start(); err != nil { + panic(err) + } + + ticker = time.NewTicker(interval * time.Millisecond) + tctx, tcancel = context.WithTimeout(ctx, 10*interval*time.Millisecond) + defer func() { + ticker.Stop() + tcancel() + }() + + for { + var states *internalpb2.ComponentStates + select { + case <-ticker.C: + states, err = indexService.GetComponentStates() + if err != nil { + continue + } + case <-tctx.Done(): + return nil, errors.New("Index service client connect timeout") + } + if states.State.StateCode == internalpb2.StateCode_HEALTHY { + break + } + } + + if err := svr.SetIndexService(indexService); err != nil { + panic(err) + } + + // --- DataService --- + ds.Params.Init() + log.Printf("Data service address: %s:%d", ds.Params.Address, ds.Params.Port) + log.Println("Init data service client ...") + dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) + if err = dataService.Init(); err != nil { + panic(err) + } + if err = dataService.Start(); err != nil { + panic(err) + } + + for cnt = 0; cnt < retry; cnt++ { + dsStates, err := dataService.GetComponentStates() + if err != nil { + log.Printf("retry cout = %d, error = %s", cnt, err.Error()) + continue + } + if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + log.Printf("retry cout = %d, error = %s", cnt, err.Error()) + continue + } + if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { + continue + } + break + } + if cnt >= retry { + panic("Data service isn't ready") + } + + if err := svr.SetDataService(dataService); err != nil { + panic(err) + } + + return &QueryNode{ + + ctx: ctx, + svr: svr, + + dataService: dataService, + masterService: masterService, + indexService: indexService, + queryService: queryService, + }, nil + } -func (ps *QueryNode) Run() error { +func (q *QueryNode) Run() error { + if err := q.svr.Init(); err != nil { + panic(err) + } + + if err := q.svr.Start(); err != nil { + panic(err) + } + log.Println("Query node successfully started ...") return nil } -func (ps *QueryNode) Stop() error { - return nil +func (q *QueryNode) Stop() error { + _ = q.dataService.Stop() + _ = q.masterService.Stop() + _ = q.queryService.Stop() + _ = q.indexService.Stop() + return q.svr.Stop() } diff --git a/cmd/distributed/components/query_service.go b/cmd/distributed/components/query_service.go index fbcd28bed..93a8e569b 100644 --- a/cmd/distributed/components/query_service.go +++ b/cmd/distributed/components/query_service.go @@ -2,19 +2,138 @@ package components import ( "context" + "fmt" + "log" + "time" + + dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" + msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + qs "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice" + + ds "github.com/zilliztech/milvus-distributed/internal/dataservice" + ms "github.com/zilliztech/milvus-distributed/internal/masterservice" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/queryservice" ) -func NewQueryService(ctx context.Context) (*QueryService, error) { - return nil, nil +type QueryService struct { + ctx context.Context + svr *qs.Server + + dataService *dsc.Client + masterService *msc.GrpcClient } -type QueryService struct { +func NewQueryService(ctx context.Context) (*QueryService, error) { + const retry = 10 + const interval = 200 + + queryservice.Params.Init() + svr := qs.NewServer(ctx) + log.Println("Queryservice id is", queryservice.Params.QueryServiceID) + + // --- Master Service Client --- + ms.Params.Init() + log.Printf("Master service address: %s:%d", ms.Params.Address, ms.Params.Port) + log.Println("Init master service client ...") + masterService, err := msc.NewGrpcClient(fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port), 20*time.Second) + if err != nil { + panic(err) + } + + if err = masterService.Init(); err != nil { + panic(err) + } + + if err = masterService.Start(); err != nil { + panic(err) + } + + var cnt int + for cnt = 0; cnt < retry; cnt++ { + time.Sleep(time.Duration(cnt*interval) * time.Millisecond) + if cnt != 0 { + log.Println("Master service isn't ready ...") + log.Printf("Retrying getting master service's states in ... %v ms", interval) + } + + msStates, err := masterService.GetComponentStates() + + if err != nil { + continue + } + if msStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + continue + } + if msStates.State.StateCode != internalpb2.StateCode_HEALTHY { + continue + } + break + } + if cnt >= retry { + panic("Master service isn't ready") + } + + if err := svr.SetMasterService(masterService); err != nil { + panic(err) + } + + // --- Data service client --- + ds.Params.Init() + log.Printf("Data service address: %s:%d", ds.Params.Address, ds.Params.Port) + log.Println("Init data service client ...") + dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port)) + if err = dataService.Init(); err != nil { + panic(err) + } + if err = dataService.Start(); err != nil { + panic(err) + } + + for cnt = 0; cnt < retry; cnt++ { + dsStates, err := dataService.GetComponentStates() + if err != nil { + continue + } + if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + continue + } + if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY { + continue + } + break + } + if cnt >= retry { + panic("Data service isn't ready") + } + + if err := svr.SetDataService(dataService); err != nil { + panic(err) + } + + return &QueryService{ + ctx: ctx, + svr: svr, + dataService: dataService, + masterService: masterService, + }, nil } -func (ps *QueryService) Run() error { +func (qs *QueryService) Run() error { + if err := qs.svr.Init(); err != nil { + panic(err) + } + + if err := qs.svr.Start(); err != nil { + panic(err) + } + log.Println("Data node successfully started ...") return nil } -func (ps *QueryService) Stop() error { - return nil +func (qs *QueryService) Stop() error { + _ = qs.dataService.Stop() + _ = qs.masterService.Stop() + return qs.svr.Stop() } diff --git a/cmd/querynode/querynode.go b/cmd/querynode/querynode.go index 060abd695..86e984cff 100644 --- a/cmd/querynode/querynode.go +++ b/cmd/querynode/querynode.go @@ -7,16 +7,20 @@ import ( "os/signal" "syscall" - "go.uber.org/zap" - - grpcquerynode "github.com/zilliztech/milvus-distributed/internal/distributed/querynode" + distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components" ) func main() { - // Creates server. ctx, cancel := context.WithCancel(context.Background()) - svr := grpcquerynode.NewServer(ctx) - if err := svr.Init(); err != nil { + defer cancel() + + svr, err := distributed.NewQueryNode(ctx) + + if err != nil { + panic(err) + } + + if err = svr.Run(); err != nil { panic(err) } @@ -27,30 +31,10 @@ func main() { syscall.SIGTERM, syscall.SIGQUIT) - var sig os.Signal - go func() { - sig = <-sc - cancel() - }() - - if err := svr.Start(); err != nil { - panic(err) - } - - <-ctx.Done() - log.Print("Got signal to exit", zap.String("signal", sig.String())) + sig := <-sc + log.Print("Got signal to exit", sig.String()) if err := svr.Stop(); err != nil { panic(err) } - switch sig { - case syscall.SIGTERM: - exit(0) - default: - exit(1) - } -} - -func exit(code int) { - os.Exit(code) } diff --git a/cmd/queryservice/queryservice.go b/cmd/queryservice/queryservice.go index 7ec8a9ce2..1390e307f 100644 --- a/cmd/queryservice/queryservice.go +++ b/cmd/queryservice/queryservice.go @@ -7,23 +7,19 @@ import ( "os/signal" "syscall" - grpcqueryservice "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice" - "github.com/zilliztech/milvus-distributed/internal/queryservice" + distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components" ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - svr := grpcqueryservice.NewServer(ctx) - - if err := svr.Init(); err != nil { + svr, err := distributed.NewQueryService(ctx) + if err != nil { panic(err) } - log.Printf("query service address : %s", queryservice.Params.Address) - - if err := svr.Start(); err != nil { + if err := svr.Run(); err != nil { panic(err) } @@ -35,5 +31,8 @@ func main() { syscall.SIGQUIT) sig := <-sc log.Printf("Got %s signal to exit", sig.String()) - _ = svr.Stop() + + if err := svr.Stop(); err != nil { + panic(err) + } } diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index b0903bd48..2c605b93f 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -7,6 +7,7 @@ import ( "sync" dn "github.com/zilliztech/milvus-distributed/internal/datanode" + "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -74,7 +75,10 @@ func (s *Server) Start() error { } func (s *Server) Stop() error { - return s.core.Stop() + err := s.core.Stop() + s.cancel() + s.grpcServer.GracefulStop() + return err } func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { @@ -86,6 +90,12 @@ func (s *Server) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelR } func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) (*commonpb.Status, error) { + if s.core.State != internalpb2.StateCode_HEALTHY { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "DataNode isn't healthy.", + }, errors.Errorf("DataNode is not ready yet") + } return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, }, s.core.FlushSegments(in) diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 77bf8b5af..463b8c800 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -2,7 +2,6 @@ package grpcquerynodeclient import ( "context" - "log" "time" "google.golang.org/grpc" @@ -12,9 +11,46 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) +const ( + RPCConnectionTimeout = 30 * time.Second + Retry = 3 +) + type Client struct { ctx context.Context grpcClient querypb.QueryNodeClient + conn *grpc.ClientConn + addr string +} + +func NewClient(address string) *Client { + return &Client{ + addr: address, + } +} + +func (c *Client) Init() error { + ctx, cancel := context.WithTimeout(context.Background(), RPCConnectionTimeout) + defer cancel() + var err error + for i := 0; i < Retry; i++ { + if c.conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock()); err == nil { + break + } + } + if err != nil { + return err + } + c.grpcClient = querypb.NewQueryNodeClient(c.conn) + return nil +} + +func (c *Client) Start() error { + return nil +} + +func (c *Client) Stop() error { + return c.conn.Close() } func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) { @@ -60,17 +96,3 @@ func (c *Client) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, func (c *Client) ReleaseSegments(in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error) { return c.grpcClient.ReleaseSegments(context.TODO(), in) } - -func NewClient(address string) *Client { - ctx1, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - conn, err := grpc.DialContext(ctx1, address, grpc.WithInsecure(), grpc.WithBlock()) - if err != nil { - log.Printf("connect to queryNode failed, error= %v", err) - } - log.Printf("connected to queryNode, queryNode=%s", address) - - return &Client{ - grpcClient: querypb.NewQueryNodeClient(conn), - } -} diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 9c16007fb..2a6e0c880 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -5,78 +5,58 @@ import ( "fmt" "log" "net" - "time" + "sync" "google.golang.org/grpc" - "github.com/zilliztech/milvus-distributed/internal/dataservice" - grpcdataserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" - grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client" - grpcmasterserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" - grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client" - "github.com/zilliztech/milvus-distributed/internal/indexservice" - "github.com/zilliztech/milvus-distributed/internal/masterservice" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb" - "github.com/zilliztech/milvus-distributed/internal/querynode" - "github.com/zilliztech/milvus-distributed/internal/queryservice" + qn "github.com/zilliztech/milvus-distributed/internal/querynode" ) type Server struct { + node *qn.QueryNode + grpcServer *grpc.Server - node *querynode.QueryNode -} + grpcError error + grpcErrMux sync.Mutex -func NewServer(ctx context.Context) *Server { - server := &Server{ - node: querynode.NewQueryNodeWithoutID(ctx), - } + ctx context.Context + cancel context.CancelFunc +} - queryservice.Params.Init() - queryClient := grpcqueryserviceclient.NewClient(queryservice.Params.Address) - if err := server.node.SetQueryService(queryClient); err != nil { - panic(err) +func NewServer(ctx context.Context) (*Server, error) { + s := &Server{ + ctx: ctx, + node: qn.NewQueryNodeWithoutID(ctx), } - masterservice.Params.Init() - masterConnectTimeout := 10 * time.Second - masterClient, err := grpcmasterserviceclient.NewGrpcClient(masterservice.Params.Address, masterConnectTimeout) + qn.Params.Init() + s.grpcServer = grpc.NewServer() + querypb.RegisterQueryNodeServer(s.grpcServer, s) + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", qn.Params.QueryNodePort)) if err != nil { - panic(err) - } - if err = server.node.SetMasterService(masterClient); err != nil { - panic(err) - } - - indexservice.Params.Init() - indexClient := grpcindexserviceclient.NewClient(indexservice.Params.Address) - if err := server.node.SetIndexService(indexClient); err != nil { - panic(err) + return nil, err } - dataservice.Params.Init() - log.Println("connect to data service, address =", fmt.Sprint(dataservice.Params.Address, ":", dataservice.Params.Port)) - dataClient := grpcdataserviceclient.NewClient(fmt.Sprint(dataservice.Params.Address, ":", dataservice.Params.Port)) - if err := server.node.SetDataService(dataClient); err != nil { - panic(err) - } + go func() { + log.Println("start query node grpc server...") + if err = s.grpcServer.Serve(lis); err != nil { + s.grpcErrMux.Lock() + defer s.grpcErrMux.Unlock() + s.grpcError = err + } + }() - return server -} + s.grpcErrMux.Lock() + err = s.grpcError + s.grpcErrMux.Unlock() -func (s *Server) StartGrpcServer() { - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", querynode.Params.QueryNodePort)) if err != nil { - panic(err) - } - - s.grpcServer = grpc.NewServer() - querypb.RegisterQueryNodeServer(s.grpcServer, s) - fmt.Println("start query node grpc server...") - if err = s.grpcServer.Serve(lis); err != nil { - panic(err) + return nil, err } + return s, nil } func (s *Server) Init() error { @@ -84,13 +64,30 @@ func (s *Server) Init() error { } func (s *Server) Start() error { - go s.StartGrpcServer() return s.node.Start() } func (s *Server) Stop() error { - s.grpcServer.Stop() - return s.node.Stop() + err := s.node.Stop() + s.cancel() + s.grpcServer.GracefulStop() + return err +} + +func (s *Server) SetMasterService(master qn.MasterServiceInterface) error { + return s.node.SetMasterService(master) +} + +func (s *Server) SetQueryService(query qn.QueryServiceInterface) error { + return s.node.SetQueryService(query) +} + +func (s *Server) SetIndexService(index qn.IndexServiceInterface) error { + return s.node.SetIndexService(index) +} + +func (s *Server) SetDataService(data qn.DataServiceInterface) error { + return s.node.SetDataService(data) } func (s *Server) GetTimeTickChannel(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) { diff --git a/internal/distributed/queryservice/client/client.go b/internal/distributed/queryservice/client/client.go index 78737799b..5b1eaf09c 100644 --- a/internal/distributed/queryservice/client/client.go +++ b/internal/distributed/queryservice/client/client.go @@ -2,6 +2,7 @@ package grpcqueryserviceclient import ( "context" + "errors" "log" "time" @@ -14,30 +15,76 @@ import ( type Client struct { grpcClient querypb.QueryServiceClient + conn *grpc.ClientConn + + addr string + timeout time.Duration + retry int +} + +func NewClient(address string, timeout time.Duration) (*Client, error) { + + return &Client{ + grpcClient: nil, + conn: nil, + addr: address, + timeout: timeout, + retry: 3, + }, nil } func (c *Client) Init() error { - panic("implement me") + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + var err error + for i := 0; i < c.retry; i++ { + if c.conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock()); err != nil { + break + } + } + + if err != nil { + return err + } + + c.grpcClient = querypb.NewQueryServiceClient(c.conn) + log.Printf("connected to queryService, queryService=%s", c.addr) + return nil } func (c *Client) Start() error { - panic("implement me") + return nil } func (c *Client) Stop() error { - panic("implement me") + return c.conn.Close() } func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) { - panic("implement me") + return c.grpcClient.GetComponentStates(context.Background(), &commonpb.Empty{}) } func (c *Client) GetTimeTickChannel() (string, error) { - panic("implement me") + resp, err := c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{}) + if err != nil { + return "", err + } + if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return "", errors.New(resp.Status.Reason) + } + return resp.Value, nil } func (c *Client) GetStatisticsChannel() (string, error) { - panic("implement me") + resp, err := c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{}) + if err != nil { + return "", err + } + if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return "", errors.New(resp.Status.Reason) + } + return resp.Value, nil } func (c *Client) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { @@ -75,17 +122,3 @@ func (c *Client) CreateQueryChannel() (*querypb.CreateQueryChannelResponse, erro func (c *Client) GetPartitionStates(req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) { return c.grpcClient.GetPartitionStates(context.TODO(), req) } - -func NewClient(address string) *Client { - ctx1, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - conn, err := grpc.DialContext(ctx1, address, grpc.WithInsecure(), grpc.WithBlock()) - if err != nil { - log.Printf("connect to queryService failed, error= %v", err) - } - log.Printf("connected to queryService, queryService=%s", address) - - return &Client{ - grpcClient: querypb.NewQueryServiceClient(conn), - } -} diff --git a/internal/distributed/queryservice/service.go b/internal/distributed/queryservice/service.go index 72a2f801d..f0119cfc3 100644 --- a/internal/distributed/queryservice/service.go +++ b/internal/distributed/queryservice/service.go @@ -12,6 +12,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -119,6 +120,16 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, req *commonpb.Empty) }, nil } +func (s *Server) SetMasterService(m queryservice.MasterServiceInterface) error { + s.queryService.SetMasterService(m) + return nil +} + +func (s *Server) SetDataService(d queryservice.DataServiceInterface) error { + s.queryService.SetDataService(d) + return nil +} + func (s *Server) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { return s.queryService.RegisterNode(req) } @@ -157,7 +168,7 @@ func (s *Server) CreateQueryChannel(ctx context.Context, req *commonpb.Empty) (* func NewServer(ctx context.Context) *Server { ctx1, cancel := context.WithCancel(ctx) - service, err := queryservice.NewQueryService(ctx) + service, err := queryservice.NewQueryService(ctx1) if err != nil { log.Fatal(errors.New("create QueryService failed")) } -- GitLab