service.go 13.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12 13 14 15
package grpcproxynode

import (
	"context"
G
godchen 已提交
16
	"fmt"
S
sunby 已提交
17
	"io"
18
	"math"
19 20 21
	"net"
	"strconv"
	"sync"
22 23
	"time"

24
	"go.uber.org/zap"
25 26
	"google.golang.org/grpc"

X
Xiangyu Wang 已提交
27 28 29 30 31
	grpcdataserviceclient "github.com/milvus-io/milvus/internal/distributed/dataservice/client"
	grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client"
	grpcmasterserviceclient "github.com/milvus-io/milvus/internal/distributed/masterservice/client"
	grpcproxyserviceclient "github.com/milvus-io/milvus/internal/distributed/proxyservice/client"
	grpcqueryserviceclient "github.com/milvus-io/milvus/internal/distributed/queryservice/client"
G
godchen 已提交
32
	otgrpc "github.com/opentracing-contrib/go-grpc"
33

X
Xiangyu Wang 已提交
34 35 36 37 38 39 40 41
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proxynode"
	"github.com/milvus-io/milvus/internal/util/funcutil"
G
godchen 已提交
42
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
43
	"github.com/milvus-io/milvus/internal/util/trace"
44
	"github.com/opentracing/opentracing-go"
45 46
)

47 48 49 50
const (
	GRPCMaxMagSize = 2 << 30
)

51
type Server struct {
Z
zhenshan.cao 已提交
52 53
	ctx        context.Context
	wg         sync.WaitGroup
G
godchen 已提交
54
	proxynode  *proxynode.ProxyNode
Z
zhenshan.cao 已提交
55
	grpcServer *grpc.Server
56

Z
zhenshan.cao 已提交
57
	grpcErrChan chan error
58

59 60 61
	proxyServiceClient  *grpcproxyserviceclient.Client
	masterServiceClient *grpcmasterserviceclient.GrpcClient
	dataServiceClient   *grpcdataserviceclient.Client
Z
zhenshan.cao 已提交
62 63
	queryServiceClient  *grpcqueryserviceclient.Client
	indexServiceClient  *grpcindexserviceclient.Client
S
sunby 已提交
64 65 66

	tracer opentracing.Tracer
	closer io.Closer
Z
zhenshan.cao 已提交
67
}
68

G
groot 已提交
69
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
70

S
sunby 已提交
71
	var err error
Z
zhenshan.cao 已提交
72
	server := &Server{
X
Xiangyu Wang 已提交
73
		ctx:         ctx,
Z
zhenshan.cao 已提交
74
		grpcErrChan: make(chan error),
75 76
	}

G
godchen 已提交
77
	server.proxynode, err = proxynode.NewProxyNode(server.ctx, factory)
78
	if err != nil {
Z
zhenshan.cao 已提交
79
		return nil, err
80
	}
Z
zhenshan.cao 已提交
81 82
	return server, err
}
83

Z
zhenshan.cao 已提交
84
func (s *Server) startGrpcLoop(grpcPort int) {
85

Z
zhenshan.cao 已提交
86
	defer s.wg.Done()
87

88
	log.Debug("proxynode", zap.Int("network port", grpcPort))
Z
zhenshan.cao 已提交
89
	lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
90
	if err != nil {
91
		log.Warn("proxynode", zap.String("Server:failed to listen:", err.Error()))
Z
zhenshan.cao 已提交
92 93
		s.grpcErrChan <- err
		return
94 95
	}

Z
zhenshan.cao 已提交
96 97
	ctx, cancel := context.WithCancel(s.ctx)
	defer cancel()
98

G
godchen 已提交
99
	tracer := opentracing.GlobalTracer()
100 101 102 103 104
	s.grpcServer = grpc.NewServer(
		grpc.MaxRecvMsgSize(math.MaxInt32),
		grpc.MaxSendMsgSize(math.MaxInt32),
		grpc.UnaryInterceptor(
			otgrpc.OpenTracingServerInterceptor(tracer)),
G
godchen 已提交
105
		grpc.StreamInterceptor(
106 107
			otgrpc.OpenTracingStreamServerInterceptor(tracer)),
		grpc.MaxRecvMsgSize(GRPCMaxMagSize))
Z
zhenshan.cao 已提交
108 109
	proxypb.RegisterProxyNodeServiceServer(s.grpcServer, s)
	milvuspb.RegisterMilvusServiceServer(s.grpcServer, s)
110

Z
zhenshan.cao 已提交
111 112 113
	go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
	if err := s.grpcServer.Serve(lis); err != nil {
		s.grpcErrChan <- err
114 115
	}

Z
zhenshan.cao 已提交
116
}
117

Z
zhenshan.cao 已提交
118
func (s *Server) Run() error {
119

Z
zhenshan.cao 已提交
120
	if err := s.init(); err != nil {
121
		return err
122
	}
123
	log.Debug("proxy node init done ...")
124

Z
zhenshan.cao 已提交
125
	if err := s.start(); err != nil {
126 127
		return err
	}
128
	log.Debug("proxy node start done ...")
129 130 131
	return nil
}

Z
zhenshan.cao 已提交
132
func (s *Server) init() error {
133
	ctx := context.Background()
Z
zhenshan.cao 已提交
134
	var err error
135
	Params.Init()
Z
zhenshan.cao 已提交
136 137
	if !funcutil.CheckPortAvailable(Params.Port) {
		Params.Port = funcutil.GetAvailablePort()
138
		log.Warn("ProxyNode init", zap.Any("Port", Params.Port))
Z
zhenshan.cao 已提交
139
	}
Z
zhenshan.cao 已提交
140 141
	Params.LoadFromEnv()
	Params.LoadFromArgs()
Z
zhenshan.cao 已提交
142
	Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
D
dragondriver 已提交
143

B
bigsheeper 已提交
144 145 146 147 148 149 150
	proxynode.Params.Init()
	log.Debug("init params done ...")
	proxynode.Params.NetworkPort = Params.Port
	proxynode.Params.IP = Params.IP
	proxynode.Params.NetworkAddress = Params.Address
	// for purpose of ID Allocator
	proxynode.Params.MasterAddress = Params.MasterAddress
151

N
neza2017 已提交
152
	closer := trace.InitTracing(fmt.Sprintf("proxy_node ip: %s, port: %d", Params.IP, Params.Port))
G
godchen 已提交
153 154
	s.closer = closer

B
bigsheeper 已提交
155 156 157 158
	log.Debug("proxynode", zap.String("proxy host", Params.IP))
	log.Debug("proxynode", zap.Int("proxy port", Params.Port))
	log.Debug("proxynode", zap.String("proxy address", Params.Address))

Z
zhenshan.cao 已提交
159
	defer func() {
D
dragondriver 已提交
160
		if err != nil {
Z
zhenshan.cao 已提交
161 162
			err2 := s.Stop()
			if err2 != nil {
163
				log.Debug("Init failed, and Stop failed")
Z
zhenshan.cao 已提交
164
			}
D
dragondriver 已提交
165
		}
Z
zhenshan.cao 已提交
166
	}()
D
dragondriver 已提交
167

G
godchen 已提交
168 169 170 171 172
	self := sessionutil.NewSession("proxynode", funcutil.GetLocalIP()+":"+strconv.Itoa(Params.Port), false)
	sm := sessionutil.NewSessionManager(ctx, proxynode.Params.EtcdAddress, proxynode.Params.MetaRootPath, self)
	sm.Init()
	sessionutil.SetGlobalSessionManager(sm)

Z
zhenshan.cao 已提交
173
	s.wg.Add(1)
X
Xiangyu Wang 已提交
174
	go s.startGrpcLoop(Params.Port)
Z
zhenshan.cao 已提交
175 176
	// wait for grpc server loop start
	err = <-s.grpcErrChan
177
	log.Debug("create grpc server ...")
Z
zhenshan.cao 已提交
178 179
	if err != nil {
		return err
D
dragondriver 已提交
180
	}
Z
zhenshan.cao 已提交
181

182
	s.proxyServiceClient = grpcproxyserviceclient.NewClient(Params.ProxyServiceAddress)
Z
zhenshan.cao 已提交
183 184 185
	err = s.proxyServiceClient.Init()
	if err != nil {
		return err
D
dragondriver 已提交
186
	}
G
godchen 已提交
187
	s.proxynode.SetProxyServiceClient(s.proxyServiceClient)
188
	log.Debug("set proxy service client ...")
Z
zhenshan.cao 已提交
189 190

	masterServiceAddr := Params.MasterAddress
191
	log.Debug("proxynode", zap.String("master address", masterServiceAddr))
Z
zhenshan.cao 已提交
192
	timeout := 3 * time.Second
193
	s.masterServiceClient, err = grpcmasterserviceclient.NewClient(masterServiceAddr, timeout)
Z
zhenshan.cao 已提交
194 195
	if err != nil {
		return err
D
dragondriver 已提交
196
	}
Z
zhenshan.cao 已提交
197
	err = s.masterServiceClient.Init()
D
dragondriver 已提交
198
	if err != nil {
Z
zhenshan.cao 已提交
199
		return err
D
dragondriver 已提交
200
	}
Z
zhenshan.cao 已提交
201
	err = funcutil.WaitForComponentHealthy(ctx, s.masterServiceClient, "MasterService", 1000000, time.Millisecond*200)
202 203 204 205

	if err != nil {
		panic(err)
	}
G
godchen 已提交
206
	s.proxynode.SetMasterClient(s.masterServiceClient)
207
	log.Debug("set master client ...")
D
dragondriver 已提交
208

Z
zhenshan.cao 已提交
209
	dataServiceAddr := Params.DataServiceAddress
210
	log.Debug("proxynode", zap.String("data service address", dataServiceAddr))
211
	s.dataServiceClient = grpcdataserviceclient.NewClient(dataServiceAddr)
Z
zhenshan.cao 已提交
212 213 214 215
	err = s.dataServiceClient.Init()
	if err != nil {
		return err
	}
G
godchen 已提交
216
	s.proxynode.SetDataServiceClient(s.dataServiceClient)
217
	log.Debug("set data service address ...")
Z
zhenshan.cao 已提交
218 219

	indexServiceAddr := Params.IndexServerAddress
220
	log.Debug("proxynode", zap.String("index server address", indexServiceAddr))
Z
zhenshan.cao 已提交
221 222
	s.indexServiceClient = grpcindexserviceclient.NewClient(indexServiceAddr)
	err = s.indexServiceClient.Init()
223 224 225
	if err != nil {
		return err
	}
G
godchen 已提交
226
	s.proxynode.SetIndexServiceClient(s.indexServiceClient)
227
	log.Debug("set index service client ...")
228

D
dragondriver 已提交
229
	queryServiceAddr := Params.QueryServiceAddress
230
	log.Debug("proxynode", zap.String("query server address", queryServiceAddr))
D
dragondriver 已提交
231 232 233 234 235 236 237 238
	s.queryServiceClient, err = grpcqueryserviceclient.NewClient(queryServiceAddr, timeout)
	if err != nil {
		return err
	}
	err = s.queryServiceClient.Init()
	if err != nil {
		return err
	}
G
godchen 已提交
239
	s.proxynode.SetQueryServiceClient(s.queryServiceClient)
240
	log.Debug("set query service client ...")
241

G
godchen 已提交
242
	s.proxynode.UpdateStateCode(internalpb.StateCode_Initializing)
D
dragondriver 已提交
243 244
	log.Debug("proxynode",
		zap.Any("state of proxynode", internalpb.StateCode_Initializing))
245

G
godchen 已提交
246 247
	if err := s.proxynode.Init(); err != nil {
		log.Debug("proxynode", zap.String("proxynode init error", err.Error()))
Z
zhenshan.cao 已提交
248 249 250 251 252
		return err
	}

	return nil
}
253

Z
zhenshan.cao 已提交
254
func (s *Server) start() error {
G
godchen 已提交
255
	return s.proxynode.Start()
256 257 258 259
}

func (s *Server) Stop() error {
	var err error
N
neza2017 已提交
260 261 262 263 264
	if s.closer != nil {
		if err = s.closer.Close(); err != nil {
			return err
		}
	}
265 266 267 268 269

	if s.grpcServer != nil {
		s.grpcServer.GracefulStop()
	}

G
godchen 已提交
270
	err = s.proxynode.Stop()
271 272 273 274 275 276 277 278 279
	if err != nil {
		return err
	}

	s.wg.Wait()

	return nil
}

G
godchen 已提交
280 281 282 283 284 285 286 287
func (s *Server) GetComponentStates(ctx context.Context, request *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) {
	return s.proxynode.GetComponentStates(ctx)
}

func (s *Server) GetStatisticsChannel(ctx context.Context, request *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
	return s.proxynode.GetStatisticsChannel(ctx)
}

288
func (s *Server) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
G
godchen 已提交
289
	return s.proxynode.InvalidateCollectionMetaCache(ctx, request)
290 291 292
}

func (s *Server) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
293
	return s.proxynode.CreateCollection(ctx, request)
294 295 296
}

func (s *Server) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
297
	return s.proxynode.DropCollection(ctx, request)
298 299 300
}

func (s *Server) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
G
godchen 已提交
301
	return s.proxynode.HasCollection(ctx, request)
302 303 304
}

func (s *Server) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
305
	return s.proxynode.LoadCollection(ctx, request)
306 307 308
}

func (s *Server) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
309
	return s.proxynode.ReleaseCollection(ctx, request)
310 311 312
}

func (s *Server) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
G
godchen 已提交
313
	return s.proxynode.DescribeCollection(ctx, request)
314 315
}

G
godchen 已提交
316 317
func (s *Server) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
	return s.proxynode.GetCollectionStatistics(ctx, request)
318 319
}

G
godchen 已提交
320 321
func (s *Server) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
	return s.proxynode.ShowCollections(ctx, request)
322 323 324
}

func (s *Server) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
G
godchen 已提交
325
	return s.proxynode.CreatePartition(ctx, request)
326 327 328
}

func (s *Server) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
G
godchen 已提交
329
	return s.proxynode.DropPartition(ctx, request)
330 331 332
}

func (s *Server) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
G
godchen 已提交
333
	return s.proxynode.HasPartition(ctx, request)
334 335
}

G
godchen 已提交
336 337
func (s *Server) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
	return s.proxynode.LoadPartitions(ctx, request)
338 339
}

G
godchen 已提交
340 341
func (s *Server) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
	return s.proxynode.ReleasePartitions(ctx, request)
342 343
}

G
godchen 已提交
344 345
func (s *Server) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
	return s.proxynode.GetPartitionStatistics(ctx, request)
346 347
}

G
godchen 已提交
348 349
func (s *Server) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
	return s.proxynode.ShowPartitions(ctx, request)
350 351 352
}

func (s *Server) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
G
godchen 已提交
353
	return s.proxynode.CreateIndex(ctx, request)
354 355
}

B
BossZou 已提交
356
func (s *Server) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
G
godchen 已提交
357
	return s.proxynode.DropIndex(ctx, request)
B
BossZou 已提交
358 359
}

360
func (s *Server) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
G
godchen 已提交
361
	return s.proxynode.DescribeIndex(ctx, request)
362 363
}

364 365 366 367 368 369
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
func (s *Server) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
	return s.proxynode.GetIndexBuildProgress(ctx, request)
}

G
godchen 已提交
370 371
func (s *Server) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
	return s.proxynode.GetIndexState(ctx, request)
372 373 374
}

func (s *Server) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) {
G
godchen 已提交
375
	return s.proxynode.Insert(ctx, request)
376 377 378
}

func (s *Server) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
G
godchen 已提交
379
	return s.proxynode.Search(ctx, request)
380 381 382
}

func (s *Server) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) {
G
godchen 已提交
383
	return s.proxynode.Flush(ctx, request)
384 385
}

G
godchen 已提交
386 387
func (s *Server) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
	return s.proxynode.GetDdChannel(ctx, request)
388
}
Z
zhenshan.cao 已提交
389

G
godchen 已提交
390 391
func (s *Server) GetPersistentSegmentInfo(ctx context.Context, request *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
	return s.proxynode.GetPersistentSegmentInfo(ctx, request)
Z
zhenshan.cao 已提交
392
}
Z
zhenshan.cao 已提交
393

G
godchen 已提交
394 395
func (s *Server) GetQuerySegmentInfo(ctx context.Context, request *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
	return s.proxynode.GetQuerySegmentInfo(ctx, request)
Z
zhenshan.cao 已提交
396 397

}
398

G
godchen 已提交
399 400
func (s *Server) RegisterLink(ctx context.Context, request *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
	return s.proxynode.RegisterLink(ctx, request)
401
}