service.go 5.2 KB
Newer Older
X
xige-16 已提交
1
package grpcqueryservice
2 3

import (
X
xige-16 已提交
4 5 6 7 8 9
	"context"
	"log"
	"net"
	"strconv"
	"sync"

10 11
	"google.golang.org/grpc"

G
groot 已提交
12
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
X
xige-16 已提交
13
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
X
xige-16 已提交
14
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
X
xige-16 已提交
15
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
16
	"github.com/zilliztech/milvus-distributed/internal/proto/querypb"
X
XuanYang-cn 已提交
17
	qs "github.com/zilliztech/milvus-distributed/internal/queryservice"
18 19 20
)

type Server struct {
X
XuanYang-cn 已提交
21 22 23
	grpcServer *grpc.Server
	grpcError  error
	grpcErrMux sync.Mutex
X
xige-16 已提交
24 25

	loopCtx    context.Context
X
XuanYang-cn 已提交
26 27 28
	loopCancel context.CancelFunc

	queryService *qs.QueryService
G
groot 已提交
29 30

	msFactory msgstream.Factory
X
XuanYang-cn 已提交
31 32
}

G
groot 已提交
33
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
X
XuanYang-cn 已提交
34
	ctx1, cancel := context.WithCancel(ctx)
G
groot 已提交
35
	service, err := qs.NewQueryService(ctx1, factory)
X
XuanYang-cn 已提交
36 37 38 39 40 41 42 43 44
	if err != nil {
		cancel()
		return nil, err
	}

	return &Server{
		queryService: service,
		loopCtx:      ctx1,
		loopCancel:   cancel,
G
groot 已提交
45
		msFactory:    factory,
X
XuanYang-cn 已提交
46
	}, nil
X
xige-16 已提交
47 48
}

N
neza2017 已提交
49
func (s *Server) Init() error {
X
xige-16 已提交
50
	log.Println("query service init")
51
	if err := s.queryService.Init(); err != nil {
52
		return err
53
	}
X
xige-16 已提交
54 55 56
	return nil
}

N
neza2017 已提交
57
func (s *Server) Start() error {
X
XuanYang-cn 已提交
58 59 60 61 62 63
	log.Println("start query service ...")

	s.grpcServer = grpc.NewServer()
	querypb.RegisterQueryServiceServer(s.grpcServer, s)
	log.Println("Starting start query service Server")
	lis, err := net.Listen("tcp", ":"+strconv.Itoa(qs.Params.Port))
X
xige-16 已提交
64 65 66
	if err != nil {
		return err
	}
X
XuanYang-cn 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83

	go func() {
		if err := s.grpcServer.Serve(lis); err != nil {
			s.grpcErrMux.Lock()
			defer s.grpcErrMux.Unlock()
			s.grpcError = err
		}
	}()

	s.grpcErrMux.Lock()
	err = s.grpcError
	s.grpcErrMux.Unlock()

	if err != nil {
		return err
	}

X
xige-16 已提交
84 85
	s.queryService.Start()
	return nil
X
xige-16 已提交
86 87
}

N
neza2017 已提交
88
func (s *Server) Stop() error {
X
XuanYang-cn 已提交
89
	err := s.queryService.Stop()
X
xige-16 已提交
90 91 92 93
	s.loopCancel()
	if s.grpcServer != nil {
		s.grpcServer.GracefulStop()
	}
X
XuanYang-cn 已提交
94
	return err
X
xige-16 已提交
95 96
}

X
xige-16 已提交
97
func (s *Server) GetComponentStates(ctx context.Context, req *commonpb.Empty) (*internalpb2.ComponentStates, error) {
X
xige-16 已提交
98 99
	componentStates, err := s.queryService.GetComponentStates()
	if err != nil {
X
xige-16 已提交
100
		return &internalpb2.ComponentStates{
X
xige-16 已提交
101 102 103 104 105 106 107
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
		}, err
	}

X
xige-16 已提交
108
	return componentStates, nil
X
xige-16 已提交
109 110
}

X
xige-16 已提交
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
func (s *Server) GetTimeTickChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) {
	channel, err := s.queryService.GetTimeTickChannel()
	if err != nil {
		return &milvuspb.StringResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
		}, err
	}

	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
			Reason:    "",
		},
		Value: channel,
	}, nil
}

func (s *Server) GetStatisticsChannel(ctx context.Context, req *commonpb.Empty) (*milvuspb.StringResponse, error) {
	statisticsChannel, err := s.queryService.GetStatisticsChannel()
	if err != nil {
		return &milvuspb.StringResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			},
		}, err
	}

	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
			Reason:    "",
		},
		Value: statisticsChannel,
	}, nil
X
xige-16 已提交
149 150
}

X
XuanYang-cn 已提交
151
func (s *Server) SetMasterService(m qs.MasterServiceInterface) error {
152 153 154 155
	s.queryService.SetMasterService(m)
	return nil
}

X
XuanYang-cn 已提交
156
func (s *Server) SetDataService(d qs.DataServiceInterface) error {
157 158 159 160
	s.queryService.SetDataService(d)
	return nil
}

X
xige-16 已提交
161 162
func (s *Server) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
	return s.queryService.RegisterNode(req)
X
xige-16 已提交
163 164
}

X
xige-16 已提交
165 166
func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionRequest) (*querypb.ShowCollectionResponse, error) {
	return s.queryService.ShowCollections(req)
167 168
}

X
xige-16 已提交
169 170
func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
	return s.queryService.LoadCollection(req)
171 172
}

X
xige-16 已提交
173 174
func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
	return s.queryService.ReleaseCollection(req)
175 176
}

X
xige-16 已提交
177 178
func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionRequest) (*querypb.ShowPartitionResponse, error) {
	return s.queryService.ShowPartitions(req)
179 180
}

X
xige-16 已提交
181 182
func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) {
	return s.queryService.GetPartitionStates(req)
183 184
}

X
xige-16 已提交
185 186
func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionRequest) (*commonpb.Status, error) {
	return s.queryService.LoadPartitions(req)
187 188
}

X
xige-16 已提交
189 190
func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionRequest) (*commonpb.Status, error) {
	return s.queryService.ReleasePartitions(req)
191 192
}

X
xige-16 已提交
193 194
func (s *Server) CreateQueryChannel(ctx context.Context, req *commonpb.Empty) (*querypb.CreateQueryChannelResponse, error) {
	return s.queryService.CreateQueryChannel()
195 196
}

B
bigsheeper 已提交
197 198 199
func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.SegmentInfoRequest) (*querypb.SegmentInfoResponse, error) {
	return s.queryService.GetSegmentInfo(req)
}