service.go 9.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

X
xige-16 已提交
12
package grpcquerynode
13 14 15

import (
	"context"
G
godchen 已提交
16 17
	"fmt"
	"io"
18
	"net"
19
	"strconv"
20
	"sync"
21 22
	"time"

X
Xiangyu Wang 已提交
23
	"github.com/milvus-io/milvus/internal/util/retry"
S
sunby 已提交
24

X
Xiangyu Wang 已提交
25
	"github.com/milvus-io/milvus/internal/types"
T
ThreadDao 已提交
26

B
bigsheeper 已提交
27
	"go.uber.org/zap"
28 29
	"google.golang.org/grpc"

G
godchen 已提交
30
	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
31
	dsc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
32
	isc "github.com/milvus-io/milvus/internal/distributed/indexcoord/client"
33
	rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
X
Xiangyu Wang 已提交
34 35 36 37 38 39 40 41 42 43
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
	qn "github.com/milvus-io/milvus/internal/querynode"
	"github.com/milvus-io/milvus/internal/util/funcutil"
	"github.com/milvus-io/milvus/internal/util/trace"
	"github.com/milvus-io/milvus/internal/util/typeutil"
44 45
)

46 47
type UniqueID = typeutil.UniqueID

48
type Server struct {
G
godchen 已提交
49
	querynode   *qn.QueryNode
50 51 52 53
	wg          sync.WaitGroup
	ctx         context.Context
	cancel      context.CancelFunc
	grpcErrChan chan error
54

55 56
	grpcServer *grpc.Server

57 58 59
	dataCoord  *dsc.Client
	rootCoord  *rcc.GrpcClient
	indexCoord *isc.Client
G
godchen 已提交
60 61

	closer io.Closer
62
}
63

G
groot 已提交
64
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
65 66
	ctx1, cancel := context.WithCancel(ctx)

67
	s := &Server{
68 69
		ctx:         ctx1,
		cancel:      cancel,
70
		querynode:   qn.NewQueryNode(ctx, factory),
71
		grpcErrChan: make(chan error),
B
bigsheeper 已提交
72
	}
73 74
	return s, nil
}
B
bigsheeper 已提交
75

76 77 78 79 80
func (s *Server) init() error {
	Params.Init()
	Params.LoadFromEnv()
	Params.LoadFromArgs()

B
bigsheeper 已提交
81 82 83 84 85
	qn.Params.Init()
	qn.Params.QueryNodeIP = Params.QueryNodeIP
	qn.Params.QueryNodePort = int64(Params.QueryNodePort)
	qn.Params.QueryNodeID = Params.QueryNodeID

N
neza2017 已提交
86
	closer := trace.InitTracing(fmt.Sprintf("query_node ip: %s, port: %d", Params.QueryNodeIP, Params.QueryNodePort))
G
godchen 已提交
87 88
	s.closer = closer

B
bigsheeper 已提交
89
	log.Debug("QueryNode", zap.Int("port", Params.QueryNodePort))
90 91 92
	s.wg.Add(1)
	go s.startGrpcLoop(Params.QueryNodePort)
	// wait for grpc server loop start
N
neza2017 已提交
93
	err := <-s.grpcErrChan
B
bigsheeper 已提交
94
	if err != nil {
95 96
		return err
	}
97

98
	// --- RootCoord Client ---
99
	//ms.Params.Init()
100
	addr := Params.RootCoordAddress
101

102
	log.Debug("QueryNode start to new RootCoordClient", zap.Any("QueryCoordAddress", addr))
G
godchen 已提交
103
	rootCoord, err := rcc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints)
104
	if err != nil {
105
		log.Debug("QueryNode new RootCoordClient failed", zap.Error(err))
106 107 108
		panic(err)
	}

109 110
	if err = rootCoord.Init(); err != nil {
		log.Debug("QueryNode RootCoordClient Init failed", zap.Error(err))
111 112 113
		panic(err)
	}

114 115
	if err = rootCoord.Start(); err != nil {
		log.Debug("QueryNode RootCoordClient Start failed", zap.Error(err))
116 117
		panic(err)
	}
118 119
	log.Debug("QueryNode start to wait for RootCoord ready")
	err = funcutil.WaitForComponentHealthy(s.ctx, rootCoord, "RootCoord", 1000000, time.Millisecond*200)
120
	if err != nil {
121
		log.Debug("QueryNode wait for RootCoord ready failed", zap.Error(err))
122 123
		panic(err)
	}
124
	log.Debug("QueryNode report RootCoord is ready")
125

126
	if err := s.SetRootCoord(rootCoord); err != nil {
127 128 129
		panic(err)
	}

130
	// --- IndexCoord ---
131
	log.Debug("Index coord", zap.String("address", Params.IndexCoordAddress))
G
godchen 已提交
132
	indexCoord, err := isc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints)
G
godchen 已提交
133 134 135 136 137

	if err != nil {
		log.Debug("QueryNode new IndexCoordClient failed", zap.Error(err))
		panic(err)
	}
138

139 140
	if err := indexCoord.Init(); err != nil {
		log.Debug("QueryNode IndexCoordClient Init failed", zap.Error(err))
141 142 143
		panic(err)
	}

144 145
	if err := indexCoord.Start(); err != nil {
		log.Debug("QueryNode IndexCoordClient Start failed", zap.Error(err))
146 147
		panic(err)
	}
148 149 150
	// wait IndexCoord healthy
	log.Debug("QueryNode start to wait for IndexCoord ready")
	err = funcutil.WaitForComponentHealthy(s.ctx, indexCoord, "IndexCoord", 1000000, time.Millisecond*200)
151
	if err != nil {
152
		log.Debug("QueryNode wait for IndexCoord ready failed", zap.Error(err))
153 154
		panic(err)
	}
155
	log.Debug("QueryNode report IndexCoord is ready")
156

157
	if err := s.SetIndexCoord(indexCoord); err != nil {
158 159 160
		panic(err)
	}

G
godchen 已提交
161
	s.querynode.UpdateStateCode(internalpb.StateCode_Initializing)
162
	log.Debug("QueryNode", zap.Any("State", internalpb.StateCode_Initializing))
G
godchen 已提交
163
	if err := s.querynode.Init(); err != nil {
164
		log.Error("QueryNode init error: ", zap.Error(err))
165 166
		return err
	}
167 168 169 170

	if err := s.querynode.Register(); err != nil {
		return err
	}
171
	return nil
172 173
}

174
func (s *Server) start() error {
G
godchen 已提交
175
	return s.querynode.Start()
176 177
}

178 179 180
func (s *Server) startGrpcLoop(grpcPort int) {
	defer s.wg.Done()

S
sunby 已提交
181 182
	var lis net.Listener
	var err error
G
godchen 已提交
183
	err = retry.Do(s.ctx, func() error {
S
sunby 已提交
184 185 186
		addr := ":" + strconv.Itoa(grpcPort)
		lis, err = net.Listen("tcp", addr)
		if err == nil {
187
			qn.Params.QueryNodePort = int64(lis.Addr().(*net.TCPAddr).Port)
S
sunby 已提交
188 189 190 191 192
		} else {
			// set port=0 to get next available port
			grpcPort = 0
		}
		return err
G
godchen 已提交
193
	}, retry.Attempts(10))
194
	if err != nil {
B
bigsheeper 已提交
195
		log.Error("QueryNode GrpcServer:failed to listen", zap.Error(err))
196 197 198 199
		s.grpcErrChan <- err
		return
	}

G
godchen 已提交
200
	opts := trace.GetInterceptorOpts()
201
	s.grpcServer = grpc.NewServer(
202 203
		grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize),
		grpc.MaxSendMsgSize(Params.ServerMaxSendSize),
204
		grpc.UnaryInterceptor(
G
godchen 已提交
205
			grpc_opentracing.UnaryServerInterceptor(opts...)),
G
godchen 已提交
206
		grpc.StreamInterceptor(
G
godchen 已提交
207
			grpc_opentracing.StreamServerInterceptor(opts...)))
208 209 210 211 212 213 214
	querypb.RegisterQueryNodeServer(s.grpcServer, s)

	ctx, cancel := context.WithCancel(s.ctx)
	defer cancel()

	go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
	if err := s.grpcServer.Serve(lis); err != nil {
B
bigsheeper 已提交
215
		log.Debug("QueryNode Start Grpc Failed!!!!")
216 217 218 219 220 221 222 223 224 225
		s.grpcErrChan <- err
	}

}

func (s *Server) Run() error {

	if err := s.init(); err != nil {
		return err
	}
B
bigsheeper 已提交
226
	log.Debug("QueryNode init done ...")
227 228 229 230

	if err := s.start(); err != nil {
		return err
	}
B
bigsheeper 已提交
231
	log.Debug("QueryNode start done ...")
232
	return nil
233 234 235
}

func (s *Server) Stop() error {
N
neza2017 已提交
236 237 238 239
	if s.closer != nil {
		if err := s.closer.Close(); err != nil {
			return err
		}
G
godchen 已提交
240 241
	}

242
	s.cancel()
243 244 245 246
	if s.grpcServer != nil {
		s.grpcServer.GracefulStop()
	}

G
godchen 已提交
247
	err := s.querynode.Stop()
248 249 250 251 252
	if err != nil {
		return err
	}
	s.wg.Wait()
	return nil
253 254
}

255
func (s *Server) SetRootCoord(rootCoord types.RootCoord) error {
256
	return s.querynode.SetRootCoord(rootCoord)
257 258
}

259 260
func (s *Server) SetIndexCoord(indexCoord types.IndexCoord) error {
	return s.querynode.SetIndexCoord(indexCoord)
261 262
}

G
godchen 已提交
263 264
func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) {
	return s.querynode.GetTimeTickChannel(ctx)
265 266
}

G
godchen 已提交
267 268
func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
	return s.querynode.GetStatisticsChannel(ctx)
269 270
}

G
godchen 已提交
271
func (s *Server) GetComponentStates(ctx context.Context, req *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) {
272
	// ignore ctx and in
G
godchen 已提交
273
	return s.querynode.GetComponentStates(ctx)
274 275
}

G
godchen 已提交
276
func (s *Server) AddQueryChannel(ctx context.Context, req *querypb.AddQueryChannelRequest) (*commonpb.Status, error) {
277
	// ignore ctx
G
godchen 已提交
278
	return s.querynode.AddQueryChannel(ctx, req)
279 280
}

G
godchen 已提交
281
func (s *Server) RemoveQueryChannel(ctx context.Context, req *querypb.RemoveQueryChannelRequest) (*commonpb.Status, error) {
282
	// ignore ctx
G
godchen 已提交
283
	return s.querynode.RemoveQueryChannel(ctx, req)
284 285
}

G
godchen 已提交
286
func (s *Server) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
287
	// ignore ctx
G
godchen 已提交
288
	return s.querynode.WatchDmChannels(ctx, req)
289 290
}

G
godchen 已提交
291
func (s *Server) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) {
292
	// ignore ctx
G
godchen 已提交
293
	return s.querynode.LoadSegments(ctx, req)
294 295
}

G
godchen 已提交
296
func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
B
bigsheeper 已提交
297
	// ignore ctx
G
godchen 已提交
298
	return s.querynode.ReleaseCollection(ctx, req)
B
bigsheeper 已提交
299 300
}

G
godchen 已提交
301
func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
B
bigsheeper 已提交
302
	// ignore ctx
G
godchen 已提交
303
	return s.querynode.ReleasePartitions(ctx, req)
B
bigsheeper 已提交
304 305
}

G
godchen 已提交
306
func (s *Server) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
307
	// ignore ctx
G
godchen 已提交
308
	return s.querynode.ReleaseSegments(ctx, req)
309
}
B
bigsheeper 已提交
310

G
godchen 已提交
311 312
func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
	return s.querynode.GetSegmentInfo(ctx, req)
B
bigsheeper 已提交
313
}