service.go 13.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12 13 14 15
package grpcproxynode

import (
	"context"
G
godchen 已提交
16
	"fmt"
S
sunby 已提交
17
	"io"
18
	"math"
19 20 21
	"net"
	"strconv"
	"sync"
22 23
	"time"

24
	"go.uber.org/zap"
25 26
	"google.golang.org/grpc"

X
Xiangyu Wang 已提交
27 28 29 30 31
	grpcdataserviceclient "github.com/milvus-io/milvus/internal/distributed/dataservice/client"
	grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client"
	grpcmasterserviceclient "github.com/milvus-io/milvus/internal/distributed/masterservice/client"
	grpcproxyserviceclient "github.com/milvus-io/milvus/internal/distributed/proxyservice/client"
	grpcqueryserviceclient "github.com/milvus-io/milvus/internal/distributed/queryservice/client"
G
godchen 已提交
32
	otgrpc "github.com/opentracing-contrib/go-grpc"
33

X
Xiangyu Wang 已提交
34 35 36 37 38 39 40 41 42
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proxynode"
	"github.com/milvus-io/milvus/internal/util/funcutil"
	"github.com/milvus-io/milvus/internal/util/trace"
43
	"github.com/opentracing/opentracing-go"
44 45
)

46 47 48 49
const (
	GRPCMaxMagSize = 2 << 30
)

50
type Server struct {
Z
zhenshan.cao 已提交
51 52
	ctx        context.Context
	wg         sync.WaitGroup
G
godchen 已提交
53
	proxynode  *proxynode.ProxyNode
Z
zhenshan.cao 已提交
54
	grpcServer *grpc.Server
55

Z
zhenshan.cao 已提交
56
	grpcErrChan chan error
57

58 59 60
	proxyServiceClient  *grpcproxyserviceclient.Client
	masterServiceClient *grpcmasterserviceclient.GrpcClient
	dataServiceClient   *grpcdataserviceclient.Client
Z
zhenshan.cao 已提交
61 62
	queryServiceClient  *grpcqueryserviceclient.Client
	indexServiceClient  *grpcindexserviceclient.Client
S
sunby 已提交
63 64 65

	tracer opentracing.Tracer
	closer io.Closer
Z
zhenshan.cao 已提交
66
}
67

G
groot 已提交
68
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
69

S
sunby 已提交
70
	var err error
Z
zhenshan.cao 已提交
71
	server := &Server{
X
Xiangyu Wang 已提交
72
		ctx:         ctx,
Z
zhenshan.cao 已提交
73
		grpcErrChan: make(chan error),
74 75
	}

G
godchen 已提交
76
	server.proxynode, err = proxynode.NewProxyNode(server.ctx, factory)
77
	if err != nil {
Z
zhenshan.cao 已提交
78
		return nil, err
79
	}
Z
zhenshan.cao 已提交
80 81
	return server, err
}
82

Z
zhenshan.cao 已提交
83
func (s *Server) startGrpcLoop(grpcPort int) {
84

Z
zhenshan.cao 已提交
85
	defer s.wg.Done()
86

87
	log.Debug("proxynode", zap.Int("network port", grpcPort))
Z
zhenshan.cao 已提交
88
	lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
89
	if err != nil {
90
		log.Warn("proxynode", zap.String("Server:failed to listen:", err.Error()))
Z
zhenshan.cao 已提交
91 92
		s.grpcErrChan <- err
		return
93 94
	}

Z
zhenshan.cao 已提交
95 96
	ctx, cancel := context.WithCancel(s.ctx)
	defer cancel()
97

G
godchen 已提交
98
	tracer := opentracing.GlobalTracer()
99 100 101 102 103
	s.grpcServer = grpc.NewServer(
		grpc.MaxRecvMsgSize(math.MaxInt32),
		grpc.MaxSendMsgSize(math.MaxInt32),
		grpc.UnaryInterceptor(
			otgrpc.OpenTracingServerInterceptor(tracer)),
G
godchen 已提交
104
		grpc.StreamInterceptor(
105 106
			otgrpc.OpenTracingStreamServerInterceptor(tracer)),
		grpc.MaxRecvMsgSize(GRPCMaxMagSize))
Z
zhenshan.cao 已提交
107 108
	proxypb.RegisterProxyNodeServiceServer(s.grpcServer, s)
	milvuspb.RegisterMilvusServiceServer(s.grpcServer, s)
109

Z
zhenshan.cao 已提交
110 111 112
	go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
	if err := s.grpcServer.Serve(lis); err != nil {
		s.grpcErrChan <- err
113 114
	}

Z
zhenshan.cao 已提交
115
}
116

Z
zhenshan.cao 已提交
117
func (s *Server) Run() error {
118

Z
zhenshan.cao 已提交
119
	if err := s.init(); err != nil {
120
		return err
121
	}
122
	log.Debug("proxy node init done ...")
123

Z
zhenshan.cao 已提交
124
	if err := s.start(); err != nil {
125 126
		return err
	}
127
	log.Debug("proxy node start done ...")
128 129 130
	return nil
}

Z
zhenshan.cao 已提交
131
func (s *Server) init() error {
132
	ctx := context.Background()
Z
zhenshan.cao 已提交
133
	var err error
134
	Params.Init()
Z
zhenshan.cao 已提交
135 136
	if !funcutil.CheckPortAvailable(Params.Port) {
		Params.Port = funcutil.GetAvailablePort()
137
		log.Warn("ProxyNode init", zap.Any("Port", Params.Port))
Z
zhenshan.cao 已提交
138
	}
Z
zhenshan.cao 已提交
139 140
	Params.LoadFromEnv()
	Params.LoadFromArgs()
Z
zhenshan.cao 已提交
141
	Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
D
dragondriver 已提交
142

B
bigsheeper 已提交
143 144 145 146 147 148 149
	proxynode.Params.Init()
	log.Debug("init params done ...")
	proxynode.Params.NetworkPort = Params.Port
	proxynode.Params.IP = Params.IP
	proxynode.Params.NetworkAddress = Params.Address
	// for purpose of ID Allocator
	proxynode.Params.MasterAddress = Params.MasterAddress
150

N
neza2017 已提交
151
	closer := trace.InitTracing(fmt.Sprintf("proxy_node ip: %s, port: %d", Params.IP, Params.Port))
G
godchen 已提交
152 153
	s.closer = closer

B
bigsheeper 已提交
154 155 156 157
	log.Debug("proxynode", zap.String("proxy host", Params.IP))
	log.Debug("proxynode", zap.Int("proxy port", Params.Port))
	log.Debug("proxynode", zap.String("proxy address", Params.Address))

Z
zhenshan.cao 已提交
158
	defer func() {
D
dragondriver 已提交
159
		if err != nil {
Z
zhenshan.cao 已提交
160 161
			err2 := s.Stop()
			if err2 != nil {
162
				log.Debug("Init failed, and Stop failed")
Z
zhenshan.cao 已提交
163
			}
D
dragondriver 已提交
164
		}
Z
zhenshan.cao 已提交
165
	}()
D
dragondriver 已提交
166

Z
zhenshan.cao 已提交
167
	s.wg.Add(1)
X
Xiangyu Wang 已提交
168
	go s.startGrpcLoop(Params.Port)
Z
zhenshan.cao 已提交
169 170
	// wait for grpc server loop start
	err = <-s.grpcErrChan
171
	log.Debug("create grpc server ...")
Z
zhenshan.cao 已提交
172 173
	if err != nil {
		return err
D
dragondriver 已提交
174
	}
Z
zhenshan.cao 已提交
175

176
	s.proxyServiceClient = grpcproxyserviceclient.NewClient(Params.ProxyServiceAddress)
Z
zhenshan.cao 已提交
177 178 179
	err = s.proxyServiceClient.Init()
	if err != nil {
		return err
D
dragondriver 已提交
180
	}
G
godchen 已提交
181
	s.proxynode.SetProxyServiceClient(s.proxyServiceClient)
182
	log.Debug("set proxy service client ...")
Z
zhenshan.cao 已提交
183 184

	masterServiceAddr := Params.MasterAddress
185
	log.Debug("proxynode", zap.String("master address", masterServiceAddr))
Z
zhenshan.cao 已提交
186
	timeout := 3 * time.Second
187
	s.masterServiceClient, err = grpcmasterserviceclient.NewClient(masterServiceAddr, []string{proxynode.Params.EtcdAddress}, timeout)
Z
zhenshan.cao 已提交
188 189
	if err != nil {
		return err
D
dragondriver 已提交
190
	}
Z
zhenshan.cao 已提交
191
	err = s.masterServiceClient.Init()
D
dragondriver 已提交
192
	if err != nil {
Z
zhenshan.cao 已提交
193
		return err
D
dragondriver 已提交
194
	}
Z
zhenshan.cao 已提交
195
	err = funcutil.WaitForComponentHealthy(ctx, s.masterServiceClient, "MasterService", 1000000, time.Millisecond*200)
196 197 198 199

	if err != nil {
		panic(err)
	}
G
godchen 已提交
200
	s.proxynode.SetMasterClient(s.masterServiceClient)
201
	log.Debug("set master client ...")
D
dragondriver 已提交
202

Z
zhenshan.cao 已提交
203
	dataServiceAddr := Params.DataServiceAddress
204
	log.Debug("proxynode", zap.String("data service address", dataServiceAddr))
205
	s.dataServiceClient = grpcdataserviceclient.NewClient(dataServiceAddr)
Z
zhenshan.cao 已提交
206 207 208 209
	err = s.dataServiceClient.Init()
	if err != nil {
		return err
	}
G
godchen 已提交
210
	s.proxynode.SetDataServiceClient(s.dataServiceClient)
211
	log.Debug("set data service address ...")
Z
zhenshan.cao 已提交
212 213

	indexServiceAddr := Params.IndexServerAddress
214
	log.Debug("proxynode", zap.String("index server address", indexServiceAddr))
Z
zhenshan.cao 已提交
215 216
	s.indexServiceClient = grpcindexserviceclient.NewClient(indexServiceAddr)
	err = s.indexServiceClient.Init()
217 218 219
	if err != nil {
		return err
	}
G
godchen 已提交
220
	s.proxynode.SetIndexServiceClient(s.indexServiceClient)
221
	log.Debug("set index service client ...")
222

D
dragondriver 已提交
223
	queryServiceAddr := Params.QueryServiceAddress
224
	log.Debug("proxynode", zap.String("query server address", queryServiceAddr))
D
dragondriver 已提交
225 226 227 228 229 230 231 232
	s.queryServiceClient, err = grpcqueryserviceclient.NewClient(queryServiceAddr, timeout)
	if err != nil {
		return err
	}
	err = s.queryServiceClient.Init()
	if err != nil {
		return err
	}
G
godchen 已提交
233
	s.proxynode.SetQueryServiceClient(s.queryServiceClient)
234
	log.Debug("set query service client ...")
235

G
godchen 已提交
236
	s.proxynode.UpdateStateCode(internalpb.StateCode_Initializing)
D
dragondriver 已提交
237 238
	log.Debug("proxynode",
		zap.Any("state of proxynode", internalpb.StateCode_Initializing))
239

G
godchen 已提交
240 241
	if err := s.proxynode.Init(); err != nil {
		log.Debug("proxynode", zap.String("proxynode init error", err.Error()))
Z
zhenshan.cao 已提交
242 243 244 245 246
		return err
	}

	return nil
}
247

Z
zhenshan.cao 已提交
248
func (s *Server) start() error {
G
godchen 已提交
249
	return s.proxynode.Start()
250 251 252 253
}

func (s *Server) Stop() error {
	var err error
N
neza2017 已提交
254 255 256 257 258
	if s.closer != nil {
		if err = s.closer.Close(); err != nil {
			return err
		}
	}
259 260 261 262 263

	if s.grpcServer != nil {
		s.grpcServer.GracefulStop()
	}

G
godchen 已提交
264
	err = s.proxynode.Stop()
265 266 267 268 269 270 271 272 273
	if err != nil {
		return err
	}

	s.wg.Wait()

	return nil
}

G
godchen 已提交
274 275 276 277 278 279 280 281
func (s *Server) GetComponentStates(ctx context.Context, request *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) {
	return s.proxynode.GetComponentStates(ctx)
}

func (s *Server) GetStatisticsChannel(ctx context.Context, request *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
	return s.proxynode.GetStatisticsChannel(ctx)
}

282
func (s *Server) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
G
godchen 已提交
283
	return s.proxynode.InvalidateCollectionMetaCache(ctx, request)
284 285 286
}

func (s *Server) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
287
	return s.proxynode.CreateCollection(ctx, request)
288 289 290
}

func (s *Server) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
291
	return s.proxynode.DropCollection(ctx, request)
292 293 294
}

func (s *Server) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
G
godchen 已提交
295
	return s.proxynode.HasCollection(ctx, request)
296 297 298
}

func (s *Server) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
299
	return s.proxynode.LoadCollection(ctx, request)
300 301 302
}

func (s *Server) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
303
	return s.proxynode.ReleaseCollection(ctx, request)
304 305 306
}

func (s *Server) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
G
godchen 已提交
307
	return s.proxynode.DescribeCollection(ctx, request)
308 309
}

G
godchen 已提交
310 311
func (s *Server) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
	return s.proxynode.GetCollectionStatistics(ctx, request)
312 313
}

G
godchen 已提交
314 315
func (s *Server) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
	return s.proxynode.ShowCollections(ctx, request)
316 317 318
}

func (s *Server) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
G
godchen 已提交
319
	return s.proxynode.CreatePartition(ctx, request)
320 321 322
}

func (s *Server) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
G
godchen 已提交
323
	return s.proxynode.DropPartition(ctx, request)
324 325 326
}

func (s *Server) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
G
godchen 已提交
327
	return s.proxynode.HasPartition(ctx, request)
328 329
}

G
godchen 已提交
330 331
func (s *Server) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
	return s.proxynode.LoadPartitions(ctx, request)
332 333
}

G
godchen 已提交
334 335
func (s *Server) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
	return s.proxynode.ReleasePartitions(ctx, request)
336 337
}

G
godchen 已提交
338 339
func (s *Server) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
	return s.proxynode.GetPartitionStatistics(ctx, request)
340 341
}

G
godchen 已提交
342 343
func (s *Server) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
	return s.proxynode.ShowPartitions(ctx, request)
344 345 346
}

func (s *Server) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
G
godchen 已提交
347
	return s.proxynode.CreateIndex(ctx, request)
348 349
}

B
BossZou 已提交
350
func (s *Server) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
G
godchen 已提交
351
	return s.proxynode.DropIndex(ctx, request)
B
BossZou 已提交
352 353
}

354
func (s *Server) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
G
godchen 已提交
355
	return s.proxynode.DescribeIndex(ctx, request)
356 357
}

358 359 360 361 362 363
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
func (s *Server) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
	return s.proxynode.GetIndexBuildProgress(ctx, request)
}

G
godchen 已提交
364 365
func (s *Server) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
	return s.proxynode.GetIndexState(ctx, request)
366 367 368
}

func (s *Server) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) {
G
godchen 已提交
369
	return s.proxynode.Insert(ctx, request)
370 371 372
}

func (s *Server) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
G
godchen 已提交
373
	return s.proxynode.Search(ctx, request)
374 375
}

376 377 378 379
func (s *Server) Retrieve(ctx context.Context, request *milvuspb.RetrieveRequest) (*milvuspb.RetrieveResults, error) {
	return s.proxynode.Retrieve(ctx, request)
}

380
func (s *Server) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) {
G
godchen 已提交
381
	return s.proxynode.Flush(ctx, request)
382 383
}

G
godchen 已提交
384 385
func (s *Server) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
	return s.proxynode.GetDdChannel(ctx, request)
386
}
Z
zhenshan.cao 已提交
387

G
godchen 已提交
388 389
func (s *Server) GetPersistentSegmentInfo(ctx context.Context, request *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
	return s.proxynode.GetPersistentSegmentInfo(ctx, request)
Z
zhenshan.cao 已提交
390
}
Z
zhenshan.cao 已提交
391

G
godchen 已提交
392 393
func (s *Server) GetQuerySegmentInfo(ctx context.Context, request *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
	return s.proxynode.GetQuerySegmentInfo(ctx, request)
Z
zhenshan.cao 已提交
394 395

}
396

G
godchen 已提交
397 398
func (s *Server) RegisterLink(ctx context.Context, request *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
	return s.proxynode.RegisterLink(ctx, request)
399
}