service.go 8.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package grpcquerycoord
13 14

import (
X
xige-16 已提交
15
	"context"
B
bigsheeper 已提交
16
	"io"
17
	"math"
X
xige-16 已提交
18 19 20
	"net"
	"strconv"
	"sync"
21 22
	"time"

G
godchen 已提交
23
	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
24
	dsc "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
25
	rcc "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
X
Xiangyu Wang 已提交
26 27
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
28
	qc "github.com/milvus-io/milvus/internal/querycoord"
X
Xiangyu Wang 已提交
29 30 31
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/funcutil"
	"github.com/milvus-io/milvus/internal/util/trace"
G
godchen 已提交
32 33 34
	"go.uber.org/zap"
	"google.golang.org/grpc"

X
Xiangyu Wang 已提交
35 36 37 38
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
39 40 41
)

type Server struct {
42
	wg         sync.WaitGroup
X
xige-16 已提交
43
	loopCtx    context.Context
X
XuanYang-cn 已提交
44
	loopCancel context.CancelFunc
45 46 47
	grpcServer *grpc.Server

	grpcErrChan chan error
X
XuanYang-cn 已提交
48

49
	queryCoord *qc.QueryCoord
G
groot 已提交
50 51

	msFactory msgstream.Factory
52

53 54
	dataCoord *dsc.Client
	rootCoord *rcc.GrpcClient
B
bigsheeper 已提交
55 56

	closer io.Closer
X
XuanYang-cn 已提交
57 58
}

G
groot 已提交
59
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
X
XuanYang-cn 已提交
60
	ctx1, cancel := context.WithCancel(ctx)
61
	svr, err := qc.NewQueryCoord(ctx1, factory)
X
XuanYang-cn 已提交
62 63 64 65 66 67
	if err != nil {
		cancel()
		return nil, err
	}

	return &Server{
68 69 70 71 72
		queryCoord:  svr,
		loopCtx:     ctx1,
		loopCancel:  cancel,
		msFactory:   factory,
		grpcErrChan: make(chan error),
X
XuanYang-cn 已提交
73
	}, nil
X
xige-16 已提交
74 75
}

76 77 78 79 80
func (s *Server) Run() error {

	if err := s.init(); err != nil {
		return err
	}
81
	log.Debug("QueryCoord init done ...")
82 83

	if err := s.start(); err != nil {
84
		return err
85
	}
X
xige-16 已提交
86 87 88
	return nil
}

89 90
func (s *Server) init() error {
	Params.Init()
91 92
	qc.Params.Init()
	qc.Params.Port = Params.Port
B
bigsheeper 已提交
93

G
godchen 已提交
94
	closer := trace.InitTracing("querycoord")
B
bigsheeper 已提交
95
	s.closer = closer
X
XuanYang-cn 已提交
96

97
	if err := s.queryCoord.Register(); err != nil {
98 99 100
		return err
	}

101 102 103 104
	s.wg.Add(1)
	go s.startGrpcLoop(Params.Port)
	// wait for grpc server loop start
	if err := <-s.grpcErrChan; err != nil {
X
xige-16 已提交
105 106
		return err
	}
X
XuanYang-cn 已提交
107

108
	// --- Master Server Client ---
109
	log.Debug("QueryCoord try to new RootCoord client", zap.Any("RootCoordAddress", Params.RootCoordAddress))
G
godchen 已提交
110
	rootCoord, err := rcc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints)
X
XuanYang-cn 已提交
111
	if err != nil {
112
		log.Debug("QueryCoord try to new RootCoord client failed", zap.Error(err))
113 114 115
		panic(err)
	}

116
	if err = rootCoord.Init(); err != nil {
117
		log.Debug("QueryCoord RootCoordClient Init failed", zap.Error(err))
118 119 120
		panic(err)
	}

121
	if err = rootCoord.Start(); err != nil {
122
		log.Debug("QueryCoord RootCoordClient Start failed", zap.Error(err))
123 124 125
		panic(err)
	}
	// wait for master init or healthy
126
	log.Debug("QueryCoord try to wait for RootCoord ready")
127
	err = funcutil.WaitForComponentInitOrHealthy(s.loopCtx, rootCoord, "RootCoord", 1000000, time.Millisecond*200)
128
	if err != nil {
129
		log.Debug("QueryCoord wait for RootCoord ready failed", zap.Error(err))
130 131 132
		panic(err)
	}

133
	if err := s.SetRootCoord(rootCoord); err != nil {
134 135
		panic(err)
	}
136
	log.Debug("QueryCoord report RootCoord ready")
137 138

	// --- Data service client ---
139
	log.Debug("QueryCoord try to new DataCoord client", zap.Any("DataCoordAddress", Params.DataCoordAddress))
140

G
godchen 已提交
141
	dataCoord, err := dsc.NewClient(s.loopCtx, qc.Params.MetaRootPath, qc.Params.EtcdEndpoints)
G
godchen 已提交
142
	if err != nil {
C
Cai Yudong 已提交
143
		log.Debug("QueryCoord try to new DataCoord client failed", zap.Error(err))
G
godchen 已提交
144 145
		panic(err)
	}
146
	if err = dataCoord.Init(); err != nil {
147
		log.Debug("QueryCoord DataCoordClient Init failed", zap.Error(err))
148 149
		panic(err)
	}
150
	if err = dataCoord.Start(); err != nil {
151
		log.Debug("QueryCoord DataCoordClient Start failed", zap.Error(err))
152 153
		panic(err)
	}
154
	log.Debug("QueryCoord try to wait for DataCoord ready")
155
	err = funcutil.WaitForComponentInitOrHealthy(s.loopCtx, dataCoord, "DataCoord", 1000000, time.Millisecond*200)
156
	if err != nil {
157
		log.Debug("QueryCoord wait for DataCoord ready failed", zap.Error(err))
158 159
		panic(err)
	}
160
	if err := s.SetDataCoord(dataCoord); err != nil {
161
		panic(err)
X
XuanYang-cn 已提交
162
	}
163
	log.Debug("QueryCoord report DataCoord ready")
X
XuanYang-cn 已提交
164

165 166 167
	s.queryCoord.UpdateStateCode(internalpb.StateCode_Initializing)
	log.Debug("QueryCoord", zap.Any("State", internalpb.StateCode_Initializing))
	if err := s.queryCoord.Init(); err != nil {
168 169
		return err
	}
X
xige-16 已提交
170
	return nil
X
xige-16 已提交
171 172
}

173 174 175 176
func (s *Server) startGrpcLoop(grpcPort int) {

	defer s.wg.Done()

B
bigsheeper 已提交
177
	log.Debug("network", zap.String("port", strconv.Itoa(grpcPort)))
178 179
	lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
	if err != nil {
B
bigsheeper 已提交
180
		log.Debug("GrpcServer:failed to listen:", zap.String("error", err.Error()))
181 182 183 184 185 186 187
		s.grpcErrChan <- err
		return
	}

	ctx, cancel := context.WithCancel(s.loopCtx)
	defer cancel()

G
godchen 已提交
188
	opts := trace.GetInterceptorOpts()
189 190 191 192
	s.grpcServer = grpc.NewServer(
		grpc.MaxRecvMsgSize(math.MaxInt32),
		grpc.MaxSendMsgSize(math.MaxInt32),
		grpc.UnaryInterceptor(
G
godchen 已提交
193
			grpc_opentracing.UnaryServerInterceptor(opts...)),
G
godchen 已提交
194
		grpc.StreamInterceptor(
G
godchen 已提交
195
			grpc_opentracing.StreamServerInterceptor(opts...)))
196
	querypb.RegisterQueryCoordServer(s.grpcServer, s)
197 198 199 200 201 202 203 204

	go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
	if err := s.grpcServer.Serve(lis); err != nil {
		s.grpcErrChan <- err
	}
}

func (s *Server) start() error {
205
	return s.queryCoord.Start()
206 207
}

N
neza2017 已提交
208
func (s *Server) Stop() error {
N
neza2017 已提交
209 210 211 212
	if s.closer != nil {
		if err := s.closer.Close(); err != nil {
			return err
		}
B
bigsheeper 已提交
213
	}
214
	err := s.queryCoord.Stop()
X
xige-16 已提交
215 216 217 218
	s.loopCancel()
	if s.grpcServer != nil {
		s.grpcServer.GracefulStop()
	}
X
XuanYang-cn 已提交
219
	return err
X
xige-16 已提交
220 221
}

222
func (s *Server) SetRootCoord(m types.RootCoord) error {
223
	s.queryCoord.SetRootCoord(m)
G
godchen 已提交
224
	return nil
X
xige-16 已提交
225 226
}

227
func (s *Server) SetDataCoord(d types.DataCoord) error {
228
	s.queryCoord.SetDataCoord(d)
G
godchen 已提交
229
	return nil
X
xige-16 已提交
230 231
}

G
godchen 已提交
232
func (s *Server) GetComponentStates(ctx context.Context, req *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) {
233
	return s.queryCoord.GetComponentStates(ctx)
X
xige-16 已提交
234 235
}

G
godchen 已提交
236
func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) {
237
	return s.queryCoord.GetTimeTickChannel(ctx)
238 239
}

G
godchen 已提交
240
func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
241
	return s.queryCoord.GetStatisticsChannel(ctx)
242 243
}

G
godchen 已提交
244
func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
245
	return s.queryCoord.ShowCollections(ctx, req)
246 247
}

X
xige-16 已提交
248
func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
249
	return s.queryCoord.LoadCollection(ctx, req)
250 251
}

X
xige-16 已提交
252
func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
253
	return s.queryCoord.ReleaseCollection(ctx, req)
254 255
}

G
godchen 已提交
256
func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
257
	return s.queryCoord.ShowPartitions(ctx, req)
258 259
}

G
godchen 已提交
260
func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
261
	return s.queryCoord.GetPartitionStates(ctx, req)
262 263
}

G
godchen 已提交
264
func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
265
	return s.queryCoord.LoadPartitions(ctx, req)
266 267
}

G
godchen 已提交
268
func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
269
	return s.queryCoord.ReleasePartitions(ctx, req)
270 271
}

G
godchen 已提交
272
func (s *Server) CreateQueryChannel(ctx context.Context, req *querypb.CreateQueryChannelRequest) (*querypb.CreateQueryChannelResponse, error) {
273
	return s.queryCoord.CreateQueryChannel(ctx, req)
274 275
}

G
godchen 已提交
276
func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
277
	return s.queryCoord.GetSegmentInfo(ctx, req)
B
bigsheeper 已提交
278
}