service.go 8.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package grpcquerycoord
13 14

import (
X
xige-16 已提交
15
	"context"
B
bigsheeper 已提交
16
	"io"
X
xige-16 已提交
17 18 19
	"net"
	"strconv"
	"sync"
20 21
	"time"

G
godchen 已提交
22
	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
23
	dsc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
24
	rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
X
Xiangyu Wang 已提交
25 26
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
27
	qc "github.com/milvus-io/milvus/internal/querycoord"
X
Xiangyu Wang 已提交
28 29 30
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/funcutil"
	"github.com/milvus-io/milvus/internal/util/trace"
G
godchen 已提交
31 32 33
	"go.uber.org/zap"
	"google.golang.org/grpc"

X
Xiangyu Wang 已提交
34 35 36 37
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
38 39 40
)

type Server struct {
41
	wg         sync.WaitGroup
X
xige-16 已提交
42
	loopCtx    context.Context
X
XuanYang-cn 已提交
43
	loopCancel context.CancelFunc
44 45 46
	grpcServer *grpc.Server

	grpcErrChan chan error
X
XuanYang-cn 已提交
47

48
	queryCoord *qc.QueryCoord
G
groot 已提交
49 50

	msFactory msgstream.Factory
51

52 53
	dataCoord *dsc.Client
	rootCoord *rcc.GrpcClient
B
bigsheeper 已提交
54 55

	closer io.Closer
X
XuanYang-cn 已提交
56 57
}

G
groot 已提交
58
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
X
XuanYang-cn 已提交
59
	ctx1, cancel := context.WithCancel(ctx)
60
	svr, err := qc.NewQueryCoord(ctx1, factory)
X
XuanYang-cn 已提交
61 62 63 64 65 66
	if err != nil {
		cancel()
		return nil, err
	}

	return &Server{
67 68 69 70 71
		queryCoord:  svr,
		loopCtx:     ctx1,
		loopCancel:  cancel,
		msFactory:   factory,
		grpcErrChan: make(chan error),
X
XuanYang-cn 已提交
72
	}, nil
X
xige-16 已提交
73 74
}

75 76 77 78 79
func (s *Server) Run() error {

	if err := s.init(); err != nil {
		return err
	}
80
	log.Debug("QueryCoord init done ...")
81 82

	if err := s.start(); err != nil {
83
		return err
84
	}
X
xige-16 已提交
85 86 87
	return nil
}

88 89
func (s *Server) init() error {
	Params.Init()
90 91
	qc.Params.Init()
	qc.Params.Port = Params.Port
B
bigsheeper 已提交
92

G
godchen 已提交
93
	closer := trace.InitTracing("querycoord")
B
bigsheeper 已提交
94
	s.closer = closer
X
XuanYang-cn 已提交
95

96
	if err := s.queryCoord.Register(); err != nil {
97 98 99
		return err
	}

100 101 102 103
	s.wg.Add(1)
	go s.startGrpcLoop(Params.Port)
	// wait for grpc server loop start
	if err := <-s.grpcErrChan; err != nil {
X
xige-16 已提交
104 105
		return err
	}
X
XuanYang-cn 已提交
106

107
	// --- Master Server Client ---
108
	log.Debug("QueryCoord try to new RootCoord client", zap.Any("RootCoordAddress", Params.RootCoordAddress))
G
godchen 已提交
109
	rootCoord, err := rcc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints)
X
XuanYang-cn 已提交
110
	if err != nil {
111
		log.Debug("QueryCoord try to new RootCoord client failed", zap.Error(err))
112 113 114
		panic(err)
	}

115
	if err = rootCoord.Init(); err != nil {
116
		log.Debug("QueryCoord RootCoordClient Init failed", zap.Error(err))
117 118 119
		panic(err)
	}

120
	if err = rootCoord.Start(); err != nil {
121
		log.Debug("QueryCoord RootCoordClient Start failed", zap.Error(err))
122 123 124
		panic(err)
	}
	// wait for master init or healthy
125
	log.Debug("QueryCoord try to wait for RootCoord ready")
126
	err = funcutil.WaitForComponentInitOrHealthy(s.loopCtx, rootCoord, "RootCoord", 1000000, time.Millisecond*200)
127
	if err != nil {
128
		log.Debug("QueryCoord wait for RootCoord ready failed", zap.Error(err))
129 130 131
		panic(err)
	}

132
	if err := s.SetRootCoord(rootCoord); err != nil {
133 134
		panic(err)
	}
135
	log.Debug("QueryCoord report RootCoord ready")
136 137

	// --- Data service client ---
138
	log.Debug("QueryCoord try to new DataCoord client", zap.Any("DataCoordAddress", Params.DataCoordAddress))
139

G
godchen 已提交
140
	dataCoord, err := dsc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints)
G
godchen 已提交
141
	if err != nil {
C
Cai Yudong 已提交
142
		log.Debug("QueryCoord try to new DataCoord client failed", zap.Error(err))
G
godchen 已提交
143 144
		panic(err)
	}
145
	if err = dataCoord.Init(); err != nil {
146
		log.Debug("QueryCoord DataCoordClient Init failed", zap.Error(err))
147 148
		panic(err)
	}
149
	if err = dataCoord.Start(); err != nil {
150
		log.Debug("QueryCoord DataCoordClient Start failed", zap.Error(err))
151 152
		panic(err)
	}
153
	log.Debug("QueryCoord try to wait for DataCoord ready")
154
	err = funcutil.WaitForComponentInitOrHealthy(s.loopCtx, dataCoord, "DataCoord", 1000000, time.Millisecond*200)
155
	if err != nil {
156
		log.Debug("QueryCoord wait for DataCoord ready failed", zap.Error(err))
157 158
		panic(err)
	}
159
	if err := s.SetDataCoord(dataCoord); err != nil {
160
		panic(err)
X
XuanYang-cn 已提交
161
	}
162
	log.Debug("QueryCoord report DataCoord ready")
X
XuanYang-cn 已提交
163

164 165 166
	s.queryCoord.UpdateStateCode(internalpb.StateCode_Initializing)
	log.Debug("QueryCoord", zap.Any("State", internalpb.StateCode_Initializing))
	if err := s.queryCoord.Init(); err != nil {
167 168
		return err
	}
X
xige-16 已提交
169
	return nil
X
xige-16 已提交
170 171
}

172 173 174 175
func (s *Server) startGrpcLoop(grpcPort int) {

	defer s.wg.Done()

B
bigsheeper 已提交
176
	log.Debug("network", zap.String("port", strconv.Itoa(grpcPort)))
177 178
	lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
	if err != nil {
B
bigsheeper 已提交
179
		log.Debug("GrpcServer:failed to listen:", zap.String("error", err.Error()))
180 181 182 183 184 185 186
		s.grpcErrChan <- err
		return
	}

	ctx, cancel := context.WithCancel(s.loopCtx)
	defer cancel()

G
godchen 已提交
187
	opts := trace.GetInterceptorOpts()
188
	s.grpcServer = grpc.NewServer(
189 190
		grpc.MaxRecvMsgSize(Params.ServerMaxRecvSize),
		grpc.MaxSendMsgSize(Params.ServerMaxSendSize),
191
		grpc.UnaryInterceptor(
G
godchen 已提交
192
			grpc_opentracing.UnaryServerInterceptor(opts...)),
G
godchen 已提交
193
		grpc.StreamInterceptor(
G
godchen 已提交
194
			grpc_opentracing.StreamServerInterceptor(opts...)))
195
	querypb.RegisterQueryCoordServer(s.grpcServer, s)
196 197 198 199 200 201 202 203

	go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
	if err := s.grpcServer.Serve(lis); err != nil {
		s.grpcErrChan <- err
	}
}

func (s *Server) start() error {
204
	return s.queryCoord.Start()
205 206
}

N
neza2017 已提交
207
func (s *Server) Stop() error {
N
neza2017 已提交
208 209 210 211
	if s.closer != nil {
		if err := s.closer.Close(); err != nil {
			return err
		}
B
bigsheeper 已提交
212
	}
213
	err := s.queryCoord.Stop()
X
xige-16 已提交
214 215 216 217
	s.loopCancel()
	if s.grpcServer != nil {
		s.grpcServer.GracefulStop()
	}
X
XuanYang-cn 已提交
218
	return err
X
xige-16 已提交
219 220
}

221
func (s *Server) SetRootCoord(m types.RootCoord) error {
222
	s.queryCoord.SetRootCoord(m)
G
godchen 已提交
223
	return nil
X
xige-16 已提交
224 225
}

226
func (s *Server) SetDataCoord(d types.DataCoord) error {
227
	s.queryCoord.SetDataCoord(d)
G
godchen 已提交
228
	return nil
X
xige-16 已提交
229 230
}

G
godchen 已提交
231
func (s *Server) GetComponentStates(ctx context.Context, req *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) {
232
	return s.queryCoord.GetComponentStates(ctx)
X
xige-16 已提交
233 234
}

G
godchen 已提交
235
func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) {
236
	return s.queryCoord.GetTimeTickChannel(ctx)
237 238
}

G
godchen 已提交
239
func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
240
	return s.queryCoord.GetStatisticsChannel(ctx)
241 242
}

G
godchen 已提交
243
func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
244
	return s.queryCoord.ShowCollections(ctx, req)
245 246
}

X
xige-16 已提交
247
func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
248
	return s.queryCoord.LoadCollection(ctx, req)
249 250
}

X
xige-16 已提交
251
func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
252
	return s.queryCoord.ReleaseCollection(ctx, req)
253 254
}

G
godchen 已提交
255
func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
256
	return s.queryCoord.ShowPartitions(ctx, req)
257 258
}

G
godchen 已提交
259
func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
260
	return s.queryCoord.GetPartitionStates(ctx, req)
261 262
}

G
godchen 已提交
263
func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
264
	return s.queryCoord.LoadPartitions(ctx, req)
265 266
}

G
godchen 已提交
267
func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
268
	return s.queryCoord.ReleasePartitions(ctx, req)
269 270
}

G
godchen 已提交
271
func (s *Server) CreateQueryChannel(ctx context.Context, req *querypb.CreateQueryChannelRequest) (*querypb.CreateQueryChannelResponse, error) {
272
	return s.queryCoord.CreateQueryChannel(ctx, req)
273 274
}

G
godchen 已提交
275
func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
276
	return s.queryCoord.GetSegmentInfo(ctx, req)
B
bigsheeper 已提交
277
}
278 279 280 281

func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	return s.queryCoord.GetMetrics(ctx, req)
}