service.go 8.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package grpcquerycoord
13 14

import (
X
xige-16 已提交
15
	"context"
B
bigsheeper 已提交
16
	"io"
X
xige-16 已提交
17 18 19
	"net"
	"strconv"
	"sync"
20 21
	"time"

G
godchen 已提交
22
	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
23
	dsc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
24
	rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
X
Xiangyu Wang 已提交
25 26
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
27
	qc "github.com/milvus-io/milvus/internal/querycoord"
X
Xiangyu Wang 已提交
28 29 30
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/funcutil"
	"github.com/milvus-io/milvus/internal/util/trace"
G
godchen 已提交
31 32 33
	"go.uber.org/zap"
	"google.golang.org/grpc"

X
Xiangyu Wang 已提交
34 35 36 37
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
38 39 40
)

type Server struct {
41
	wg         sync.WaitGroup
X
xige-16 已提交
42
	loopCtx    context.Context
X
XuanYang-cn 已提交
43
	loopCancel context.CancelFunc
44 45 46
	grpcServer *grpc.Server

	grpcErrChan chan error
X
XuanYang-cn 已提交
47

48
	queryCoord types.QueryCoordComponent
G
groot 已提交
49 50

	msFactory msgstream.Factory
51

G
groot 已提交
52 53
	dataCoord types.DataCoord
	rootCoord types.RootCoord
B
bigsheeper 已提交
54 55

	closer io.Closer
X
XuanYang-cn 已提交
56 57
}

G
groot 已提交
58
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
X
XuanYang-cn 已提交
59
	ctx1, cancel := context.WithCancel(ctx)
60
	svr, err := qc.NewQueryCoord(ctx1, factory)
X
XuanYang-cn 已提交
61 62 63 64 65 66
	if err != nil {
		cancel()
		return nil, err
	}

	return &Server{
67 68 69 70 71
		queryCoord:  svr,
		loopCtx:     ctx1,
		loopCancel:  cancel,
		msFactory:   factory,
		grpcErrChan: make(chan error),
X
XuanYang-cn 已提交
72
	}, nil
X
xige-16 已提交
73 74
}

75 76 77 78 79
func (s *Server) Run() error {

	if err := s.init(); err != nil {
		return err
	}
80
	log.Debug("QueryCoord init done ...")
81 82

	if err := s.start(); err != nil {
83
		return err
84
	}
X
xige-16 已提交
85 86 87
	return nil
}

88 89
func (s *Server) init() error {
	Params.Init()
90 91

	qc.Params.InitOnce()
92
	qc.Params.Port = Params.Port
B
bigsheeper 已提交
93

G
godchen 已提交
94
	closer := trace.InitTracing("querycoord")
B
bigsheeper 已提交
95
	s.closer = closer
X
XuanYang-cn 已提交
96

97
	if err := s.queryCoord.Register(); err != nil {
98 99 100
		return err
	}

101 102 103
	s.wg.Add(1)
	go s.startGrpcLoop(Params.Port)
	// wait for grpc server loop start
104 105
	err := <-s.grpcErrChan
	if err != nil {
X
xige-16 已提交
106 107
		return err
	}
X
XuanYang-cn 已提交
108

109
	// --- Master Server Client ---
110
	log.Debug("QueryCoord try to new RootCoord client", zap.Any("RootCoordAddress", Params.RootCoordAddress))
111 112 113 114 115 116 117

	if s.rootCoord == nil {
		s.rootCoord, err = rcc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints)
		if err != nil {
			log.Debug("QueryCoord try to new RootCoord client failed", zap.Error(err))
			panic(err)
		}
118 119
	}

120
	if err = s.rootCoord.Init(); err != nil {
121
		log.Debug("QueryCoord RootCoordClient Init failed", zap.Error(err))
122 123 124
		panic(err)
	}

125
	if err = s.rootCoord.Start(); err != nil {
126
		log.Debug("QueryCoord RootCoordClient Start failed", zap.Error(err))
127 128 129
		panic(err)
	}
	// wait for master init or healthy
130
	log.Debug("QueryCoord try to wait for RootCoord ready")
131
	err = funcutil.WaitForComponentInitOrHealthy(s.loopCtx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200)
132
	if err != nil {
133
		log.Debug("QueryCoord wait for RootCoord ready failed", zap.Error(err))
134 135 136
		panic(err)
	}

137
	if err := s.SetRootCoord(s.rootCoord); err != nil {
138 139
		panic(err)
	}
140
	log.Debug("QueryCoord report RootCoord ready")
141 142

	// --- Data service client ---
143
	log.Debug("QueryCoord try to new DataCoord client", zap.Any("DataCoordAddress", Params.DataCoordAddress))
144

145 146 147 148 149 150
	if s.dataCoord == nil {
		s.dataCoord, err = dsc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints)
		if err != nil {
			log.Debug("QueryCoord try to new DataCoord client failed", zap.Error(err))
			panic(err)
		}
G
godchen 已提交
151
	}
152 153

	if err = s.dataCoord.Init(); err != nil {
154
		log.Debug("QueryCoord DataCoordClient Init failed", zap.Error(err))
155 156
		panic(err)
	}
157
	if err = s.dataCoord.Start(); err != nil {
158
		log.Debug("QueryCoord DataCoordClient Start failed", zap.Error(err))
159 160
		panic(err)
	}
161
	log.Debug("QueryCoord try to wait for DataCoord ready")
162
	err = funcutil.WaitForComponentInitOrHealthy(s.loopCtx, s.dataCoord, "DataCoord", 1000000, time.Millisecond*200)
163
	if err != nil {
164
		log.Debug("QueryCoord wait for DataCoord ready failed", zap.Error(err))
165 166
		panic(err)
	}
167
	if err := s.SetDataCoord(s.dataCoord); err != nil {
168
		panic(err)
X
XuanYang-cn 已提交
169
	}
170
	log.Debug("QueryCoord report DataCoord ready")
X
XuanYang-cn 已提交
171

172 173 174
	s.queryCoord.UpdateStateCode(internalpb.StateCode_Initializing)
	log.Debug("QueryCoord", zap.Any("State", internalpb.StateCode_Initializing))
	if err := s.queryCoord.Init(); err != nil {
175 176
		return err
	}
X
xige-16 已提交
177
	return nil
X
xige-16 已提交
178 179
}

180 181 182 183
func (s *Server) startGrpcLoop(grpcPort int) {

	defer s.wg.Done()

B
bigsheeper 已提交
184
	log.Debug("network", zap.String("port", strconv.Itoa(grpcPort)))
185 186
	lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
	if err != nil {
B
bigsheeper 已提交
187
		log.Debug("GrpcServer:failed to listen:", zap.String("error", err.Error()))
188 189 190 191 192 193 194
		s.grpcErrChan <- err
		return
	}

	ctx, cancel := context.WithCancel(s.loopCtx)
	defer cancel()

G
godchen 已提交
195
	opts := trace.GetInterceptorOpts()
196
	s.grpcServer = grpc.NewServer(
197 198
		grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize),
		grpc.MaxSendMsgSize(Params.ServerMaxSendSize),
199
		grpc.UnaryInterceptor(
G
godchen 已提交
200
			grpc_opentracing.UnaryServerInterceptor(opts...)),
G
godchen 已提交
201
		grpc.StreamInterceptor(
G
godchen 已提交
202
			grpc_opentracing.StreamServerInterceptor(opts...)))
203
	querypb.RegisterQueryCoordServer(s.grpcServer, s)
204 205 206 207 208 209 210 211

	go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
	if err := s.grpcServer.Serve(lis); err != nil {
		s.grpcErrChan <- err
	}
}

func (s *Server) start() error {
212
	return s.queryCoord.Start()
213 214
}

N
neza2017 已提交
215
func (s *Server) Stop() error {
N
neza2017 已提交
216 217 218 219
	if s.closer != nil {
		if err := s.closer.Close(); err != nil {
			return err
		}
B
bigsheeper 已提交
220
	}
221
	err := s.queryCoord.Stop()
X
xige-16 已提交
222 223 224 225
	s.loopCancel()
	if s.grpcServer != nil {
		s.grpcServer.GracefulStop()
	}
X
XuanYang-cn 已提交
226
	return err
X
xige-16 已提交
227 228
}

229
func (s *Server) SetRootCoord(m types.RootCoord) error {
230
	s.queryCoord.SetRootCoord(m)
G
godchen 已提交
231
	return nil
X
xige-16 已提交
232 233
}

234
func (s *Server) SetDataCoord(d types.DataCoord) error {
235
	s.queryCoord.SetDataCoord(d)
G
godchen 已提交
236
	return nil
X
xige-16 已提交
237 238
}

G
godchen 已提交
239
func (s *Server) GetComponentStates(ctx context.Context, req *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) {
240
	return s.queryCoord.GetComponentStates(ctx)
X
xige-16 已提交
241 242
}

G
godchen 已提交
243
func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) {
244
	return s.queryCoord.GetTimeTickChannel(ctx)
245 246
}

G
godchen 已提交
247
func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
248
	return s.queryCoord.GetStatisticsChannel(ctx)
249 250
}

G
godchen 已提交
251
func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
252
	return s.queryCoord.ShowCollections(ctx, req)
253 254
}

X
xige-16 已提交
255
func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
256
	return s.queryCoord.LoadCollection(ctx, req)
257 258
}

X
xige-16 已提交
259
func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
260
	return s.queryCoord.ReleaseCollection(ctx, req)
261 262
}

G
godchen 已提交
263
func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
264
	return s.queryCoord.ShowPartitions(ctx, req)
265 266
}

G
godchen 已提交
267
func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
268
	return s.queryCoord.GetPartitionStates(ctx, req)
269 270
}

G
godchen 已提交
271
func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
272
	return s.queryCoord.LoadPartitions(ctx, req)
273 274
}

G
godchen 已提交
275
func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
276
	return s.queryCoord.ReleasePartitions(ctx, req)
277 278
}

G
godchen 已提交
279
func (s *Server) CreateQueryChannel(ctx context.Context, req *querypb.CreateQueryChannelRequest) (*querypb.CreateQueryChannelResponse, error) {
280
	return s.queryCoord.CreateQueryChannel(ctx, req)
281 282
}

G
godchen 已提交
283
func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
284
	return s.queryCoord.GetSegmentInfo(ctx, req)
B
bigsheeper 已提交
285
}
286 287 288 289

func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	return s.queryCoord.GetMetrics(ctx, req)
}