service.go 5.4 KB
Newer Older
X
xige-16 已提交
1
package grpcqueryservice
2 3

import (
X
xige-16 已提交
4 5 6 7 8
	"context"
	"log"
	"net"
	"strconv"
	"sync"
X
xige-16 已提交
9
	"time"
X
xige-16 已提交
10

11 12
	"google.golang.org/grpc"

X
xige-16 已提交
13 14
	"github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
	masterservice "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
X
xige-16 已提交
15 16
	"github.com/zilliztech/milvus-distributed/internal/errors"
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
X
xige-16 已提交
17
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
X
xige-16 已提交
18
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
19
	"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
X
xige-16 已提交
20
	"github.com/zilliztech/milvus-distributed/internal/queryservice"
21 22
)

X
xige-16 已提交
23 24
type QueryService = queryservice.QueryService

25 26
type Server struct {
	grpcServer   *grpc.Server
X
xige-16 已提交
27 28 29 30 31
	queryService *QueryService

	loopCtx    context.Context
	loopCancel func()
	loopWg     sync.WaitGroup
X
xige-16 已提交
32 33
}

N
neza2017 已提交
34
func (s *Server) Init() error {
X
xige-16 已提交
35
	log.Println("query service init")
X
xige-16 已提交
36
	s.queryService.Init()
X
xige-16 已提交
37
	s.queryService.SetEnableGrpc(true)
X
xige-16 已提交
38 39 40
	return nil
}

N
neza2017 已提交
41
func (s *Server) Start() error {
X
xige-16 已提交
42 43 44 45 46 47 48
	masterServiceClient, err := masterservice.NewGrpcClient(queryservice.Params.MasterServiceAddress, 30*time.Second)
	if err != nil {
		return err
	}
	s.queryService.SetMasterService(masterServiceClient)
	dataServiceClient := dataservice.NewClient(queryservice.Params.DataServiceAddress)
	s.queryService.SetDataService(dataServiceClient)
X
xige-16 已提交
49 50 51 52 53
	log.Println("start query service ...")
	s.loopWg.Add(1)
	go s.grpcLoop()
	s.queryService.Start()
	return nil
X
xige-16 已提交
54 55
}

N
neza2017 已提交
56
func (s *Server) Stop() error {
X
xige-16 已提交
57 58 59 60 61 62 63 64 65
	s.queryService.Stop()
	s.loopCancel()
	if s.grpcServer != nil {
		s.grpcServer.GracefulStop()
	}
	s.loopWg.Wait()
	return nil
}

X
xige-16 已提交
66
func (s *Server) GetComponentStates(ctx context.Context, req *commonpb.Empty) (*internalpb2.ComponentStates, error) {
X
xige-16 已提交
67 68
	componentStates, err := s.queryService.GetComponentStates()
	if err != nil {
X
xige-16 已提交
69
		return &internalpb2.ComponentStates{
X
xige-16 已提交
70 71 72 73 74 75 76
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
		}, err
	}

X
xige-16 已提交
77
	return componentStates, nil
X
xige-16 已提交
78 79
}

X
xige-16 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
func (s *Server) GetTimeTickChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) {
	channel, err := s.queryService.GetTimeTickChannel()
	if err != nil {
		return &milvuspb.StringResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
		}, err
	}

	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
			Reason:    "",
		},
		Value: channel,
	}, nil
}

func (s *Server) GetStatisticsChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) {
	statisticsChannel, err := s.queryService.GetStatisticsChannel()
	if err != nil {
		return &milvuspb.StringResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
		}, err
	}

	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
			Reason:    "",
		},
		Value: statisticsChannel,
	}, nil
X
xige-16 已提交
118 119
}

X
xige-16 已提交
120 121
func (s *Server) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
	return s.queryService.RegisterNode(req)
X
xige-16 已提交
122 123
}

X
xige-16 已提交
124 125
func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error) {
	return s.queryService.ShowCollections(req)
126 127
}

X
xige-16 已提交
128 129
func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
	return s.queryService.LoadCollection(req)
130 131
}

X
xige-16 已提交
132 133
func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
	return s.queryService.ReleaseCollection(req)
134 135
}

X
xige-16 已提交
136 137
func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionRequest) (*querypb.ShowPartitionResponse, error) {
	return s.queryService.ShowPartitions(req)
138 139
}

X
xige-16 已提交
140 141
func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) {
	return s.queryService.GetPartitionStates(req)
142 143
}

X
xige-16 已提交
144 145
func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionRequest) (*commonpb.Status, error) {
	return s.queryService.LoadPartitions(req)
146 147
}

X
xige-16 已提交
148 149
func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionRequest) (*commonpb.Status, error) {
	return s.queryService.ReleasePartitions(req)
150 151
}

X
xige-16 已提交
152 153
func (s *Server) CreateQueryChannel(ctx context.Context, req *commonpb.Empty) (*querypb.CreateQueryChannelResponse, error) {
	return s.queryService.CreateQueryChannel()
154 155
}

X
xige-16 已提交
156 157
func (s *Server) NewServer(ctx context.Context) *Server {
	ctx1, cancel := context.WithCancel(ctx)
X
xige-16 已提交
158
	service, err := queryservice.NewQueryService(ctx)
X
xige-16 已提交
159 160 161 162 163
	if err != nil {
		log.Fatal(errors.New("create QueryService failed"))
	}

	return &Server{
X
xige-16 已提交
164
		queryService: service,
X
xige-16 已提交
165 166 167
		loopCtx:      ctx1,
		loopCancel:   cancel,
	}
168 169
}

X
xige-16 已提交
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
func (s *Server) grpcLoop() {
	defer s.loopWg.Done()

	log.Println("Starting start query service Server")
	lis, err := net.Listen("tcp", ":"+strconv.Itoa(queryservice.Params.Port))
	if err != nil {
		log.Fatalf("query service grpc server fatal error=%v", err)
	}

	s.grpcServer = grpc.NewServer()
	querypb.RegisterQueryServiceServer(s.grpcServer, s)

	log.Println("queryService's server register finished")
	if err = s.grpcServer.Serve(lis); err != nil {
		log.Fatalf("queryService grpc server fatal error=%v", err)
	}
	log.Println("query service grpc server starting...")
187
}