service.go 13.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12 13 14 15
package grpcproxynode

import (
	"context"
G
godchen 已提交
16
	"fmt"
S
sunby 已提交
17
	"io"
18
	"math"
19 20 21
	"net"
	"strconv"
	"sync"
22 23
	"time"

24
	"go.uber.org/zap"
25 26
	"google.golang.org/grpc"

X
Xiangyu Wang 已提交
27 28 29 30 31
	grpcdataserviceclient "github.com/milvus-io/milvus/internal/distributed/dataservice/client"
	grpcindexserviceclient "github.com/milvus-io/milvus/internal/distributed/indexservice/client"
	grpcmasterserviceclient "github.com/milvus-io/milvus/internal/distributed/masterservice/client"
	grpcproxyserviceclient "github.com/milvus-io/milvus/internal/distributed/proxyservice/client"
	grpcqueryserviceclient "github.com/milvus-io/milvus/internal/distributed/queryservice/client"
G
godchen 已提交
32
	otgrpc "github.com/opentracing-contrib/go-grpc"
33

X
Xiangyu Wang 已提交
34 35 36 37 38 39 40 41 42
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/proxynode"
	"github.com/milvus-io/milvus/internal/util/funcutil"
	"github.com/milvus-io/milvus/internal/util/trace"
43
	"github.com/opentracing/opentracing-go"
44 45
)

46 47 48 49
const (
	GRPCMaxMagSize = 2 << 30
)

50
type Server struct {
Z
zhenshan.cao 已提交
51 52
	ctx        context.Context
	wg         sync.WaitGroup
G
godchen 已提交
53
	proxynode  *proxynode.ProxyNode
Z
zhenshan.cao 已提交
54
	grpcServer *grpc.Server
55

Z
zhenshan.cao 已提交
56
	grpcErrChan chan error
57

58 59 60
	proxyServiceClient  *grpcproxyserviceclient.Client
	masterServiceClient *grpcmasterserviceclient.GrpcClient
	dataServiceClient   *grpcdataserviceclient.Client
Z
zhenshan.cao 已提交
61 62
	queryServiceClient  *grpcqueryserviceclient.Client
	indexServiceClient  *grpcindexserviceclient.Client
S
sunby 已提交
63 64 65

	tracer opentracing.Tracer
	closer io.Closer
Z
zhenshan.cao 已提交
66
}
67

G
groot 已提交
68
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
69

S
sunby 已提交
70
	var err error
Z
zhenshan.cao 已提交
71
	server := &Server{
X
Xiangyu Wang 已提交
72
		ctx:         ctx,
Z
zhenshan.cao 已提交
73
		grpcErrChan: make(chan error),
74 75
	}

G
godchen 已提交
76
	server.proxynode, err = proxynode.NewProxyNode(server.ctx, factory)
77
	if err != nil {
Z
zhenshan.cao 已提交
78
		return nil, err
79
	}
Z
zhenshan.cao 已提交
80 81
	return server, err
}
82

Z
zhenshan.cao 已提交
83
func (s *Server) startGrpcLoop(grpcPort int) {
84

Z
zhenshan.cao 已提交
85
	defer s.wg.Done()
86

87
	log.Debug("proxynode", zap.Int("network port", grpcPort))
Z
zhenshan.cao 已提交
88
	lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
89
	if err != nil {
90
		log.Warn("proxynode", zap.String("Server:failed to listen:", err.Error()))
Z
zhenshan.cao 已提交
91 92
		s.grpcErrChan <- err
		return
93 94
	}

Z
zhenshan.cao 已提交
95 96
	ctx, cancel := context.WithCancel(s.ctx)
	defer cancel()
97

G
godchen 已提交
98
	tracer := opentracing.GlobalTracer()
99 100 101 102 103
	s.grpcServer = grpc.NewServer(
		grpc.MaxRecvMsgSize(math.MaxInt32),
		grpc.MaxSendMsgSize(math.MaxInt32),
		grpc.UnaryInterceptor(
			otgrpc.OpenTracingServerInterceptor(tracer)),
G
godchen 已提交
104
		grpc.StreamInterceptor(
105 106
			otgrpc.OpenTracingStreamServerInterceptor(tracer)),
		grpc.MaxRecvMsgSize(GRPCMaxMagSize))
Z
zhenshan.cao 已提交
107 108
	proxypb.RegisterProxyNodeServiceServer(s.grpcServer, s)
	milvuspb.RegisterMilvusServiceServer(s.grpcServer, s)
109

Z
zhenshan.cao 已提交
110 111 112
	go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
	if err := s.grpcServer.Serve(lis); err != nil {
		s.grpcErrChan <- err
113 114
	}

Z
zhenshan.cao 已提交
115
}
116

Z
zhenshan.cao 已提交
117
func (s *Server) Run() error {
118

Z
zhenshan.cao 已提交
119
	if err := s.init(); err != nil {
120
		return err
121
	}
122
	log.Debug("proxy node init done ...")
123

Z
zhenshan.cao 已提交
124
	if err := s.start(); err != nil {
125 126
		return err
	}
127
	log.Debug("proxy node start done ...")
128 129 130
	return nil
}

Z
zhenshan.cao 已提交
131
func (s *Server) init() error {
132
	ctx := context.Background()
Z
zhenshan.cao 已提交
133
	var err error
134
	Params.Init()
Z
zhenshan.cao 已提交
135 136
	if !funcutil.CheckPortAvailable(Params.Port) {
		Params.Port = funcutil.GetAvailablePort()
137
		log.Warn("ProxyNode init", zap.Any("Port", Params.Port))
Z
zhenshan.cao 已提交
138
	}
Z
zhenshan.cao 已提交
139 140
	Params.LoadFromEnv()
	Params.LoadFromArgs()
Z
zhenshan.cao 已提交
141
	Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
D
dragondriver 已提交
142

B
bigsheeper 已提交
143 144 145 146 147 148 149
	proxynode.Params.Init()
	log.Debug("init params done ...")
	proxynode.Params.NetworkPort = Params.Port
	proxynode.Params.IP = Params.IP
	proxynode.Params.NetworkAddress = Params.Address
	// for purpose of ID Allocator
	proxynode.Params.MasterAddress = Params.MasterAddress
150

N
neza2017 已提交
151
	closer := trace.InitTracing(fmt.Sprintf("proxy_node ip: %s, port: %d", Params.IP, Params.Port))
G
godchen 已提交
152 153
	s.closer = closer

B
bigsheeper 已提交
154 155 156 157
	log.Debug("proxynode", zap.String("proxy host", Params.IP))
	log.Debug("proxynode", zap.Int("proxy port", Params.Port))
	log.Debug("proxynode", zap.String("proxy address", Params.Address))

Z
zhenshan.cao 已提交
158
	defer func() {
D
dragondriver 已提交
159
		if err != nil {
Z
zhenshan.cao 已提交
160 161
			err2 := s.Stop()
			if err2 != nil {
162
				log.Debug("Init failed, and Stop failed")
Z
zhenshan.cao 已提交
163
			}
D
dragondriver 已提交
164
		}
Z
zhenshan.cao 已提交
165
	}()
D
dragondriver 已提交
166

167 168 169 170 171
	err = s.proxynode.Register()
	if err != nil {
		return err
	}

Z
zhenshan.cao 已提交
172
	s.wg.Add(1)
X
Xiangyu Wang 已提交
173
	go s.startGrpcLoop(Params.Port)
Z
zhenshan.cao 已提交
174 175
	// wait for grpc server loop start
	err = <-s.grpcErrChan
176
	log.Debug("create grpc server ...")
Z
zhenshan.cao 已提交
177 178
	if err != nil {
		return err
D
dragondriver 已提交
179
	}
Z
zhenshan.cao 已提交
180

181
	s.proxyServiceClient = grpcproxyserviceclient.NewClient(Params.ProxyServiceAddress)
Z
zhenshan.cao 已提交
182 183 184
	err = s.proxyServiceClient.Init()
	if err != nil {
		return err
D
dragondriver 已提交
185
	}
G
godchen 已提交
186
	s.proxynode.SetProxyServiceClient(s.proxyServiceClient)
187
	log.Debug("set proxy service client ...")
Z
zhenshan.cao 已提交
188 189

	masterServiceAddr := Params.MasterAddress
190
	log.Debug("proxynode", zap.String("master address", masterServiceAddr))
Z
zhenshan.cao 已提交
191
	timeout := 3 * time.Second
N
neza2017 已提交
192
	s.masterServiceClient, err = grpcmasterserviceclient.NewClient(masterServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout)
Z
zhenshan.cao 已提交
193 194
	if err != nil {
		return err
D
dragondriver 已提交
195
	}
Z
zhenshan.cao 已提交
196
	err = s.masterServiceClient.Init()
D
dragondriver 已提交
197
	if err != nil {
Z
zhenshan.cao 已提交
198
		return err
D
dragondriver 已提交
199
	}
Z
zhenshan.cao 已提交
200
	err = funcutil.WaitForComponentHealthy(ctx, s.masterServiceClient, "MasterService", 1000000, time.Millisecond*200)
201 202 203 204

	if err != nil {
		panic(err)
	}
G
godchen 已提交
205
	s.proxynode.SetMasterClient(s.masterServiceClient)
206
	log.Debug("set master client ...")
D
dragondriver 已提交
207

Z
zhenshan.cao 已提交
208
	dataServiceAddr := Params.DataServiceAddress
209
	log.Debug("proxynode", zap.String("data service address", dataServiceAddr))
N
neza2017 已提交
210
	s.dataServiceClient = grpcdataserviceclient.NewClient(dataServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, 10)
Z
zhenshan.cao 已提交
211 212 213 214
	err = s.dataServiceClient.Init()
	if err != nil {
		return err
	}
G
godchen 已提交
215
	s.proxynode.SetDataServiceClient(s.dataServiceClient)
216
	log.Debug("set data service address ...")
Z
zhenshan.cao 已提交
217 218

	indexServiceAddr := Params.IndexServerAddress
219
	log.Debug("proxynode", zap.String("index server address", indexServiceAddr))
G
godchen 已提交
220
	s.indexServiceClient = grpcindexserviceclient.NewClient(indexServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, 10)
Z
zhenshan.cao 已提交
221
	err = s.indexServiceClient.Init()
222 223 224
	if err != nil {
		return err
	}
G
godchen 已提交
225
	s.proxynode.SetIndexServiceClient(s.indexServiceClient)
226
	log.Debug("set index service client ...")
227

D
dragondriver 已提交
228
	queryServiceAddr := Params.QueryServiceAddress
229
	log.Debug("proxynode", zap.String("query server address", queryServiceAddr))
G
godchen 已提交
230
	s.queryServiceClient, err = grpcqueryserviceclient.NewClient(ctx, queryServiceAddr, proxynode.Params.MetaRootPath, []string{proxynode.Params.EtcdAddress}, timeout)
D
dragondriver 已提交
231 232 233 234 235 236 237
	if err != nil {
		return err
	}
	err = s.queryServiceClient.Init()
	if err != nil {
		return err
	}
G
godchen 已提交
238
	s.proxynode.SetQueryServiceClient(s.queryServiceClient)
239
	log.Debug("set query service client ...")
240

G
godchen 已提交
241
	s.proxynode.UpdateStateCode(internalpb.StateCode_Initializing)
D
dragondriver 已提交
242 243
	log.Debug("proxynode",
		zap.Any("state of proxynode", internalpb.StateCode_Initializing))
244

G
godchen 已提交
245 246
	if err := s.proxynode.Init(); err != nil {
		log.Debug("proxynode", zap.String("proxynode init error", err.Error()))
Z
zhenshan.cao 已提交
247 248 249 250 251
		return err
	}

	return nil
}
252

Z
zhenshan.cao 已提交
253
func (s *Server) start() error {
G
godchen 已提交
254
	return s.proxynode.Start()
255 256 257 258
}

func (s *Server) Stop() error {
	var err error
N
neza2017 已提交
259 260 261 262 263
	if s.closer != nil {
		if err = s.closer.Close(); err != nil {
			return err
		}
	}
264 265 266 267 268

	if s.grpcServer != nil {
		s.grpcServer.GracefulStop()
	}

G
godchen 已提交
269
	err = s.proxynode.Stop()
270 271 272 273 274 275 276 277 278
	if err != nil {
		return err
	}

	s.wg.Wait()

	return nil
}

G
godchen 已提交
279 280 281 282 283 284 285 286
func (s *Server) GetComponentStates(ctx context.Context, request *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) {
	return s.proxynode.GetComponentStates(ctx)
}

func (s *Server) GetStatisticsChannel(ctx context.Context, request *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) {
	return s.proxynode.GetStatisticsChannel(ctx)
}

287
func (s *Server) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
G
godchen 已提交
288
	return s.proxynode.InvalidateCollectionMetaCache(ctx, request)
289 290 291
}

func (s *Server) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
292
	return s.proxynode.CreateCollection(ctx, request)
293 294 295
}

func (s *Server) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
296
	return s.proxynode.DropCollection(ctx, request)
297 298 299
}

func (s *Server) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
G
godchen 已提交
300
	return s.proxynode.HasCollection(ctx, request)
301 302 303
}

func (s *Server) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
304
	return s.proxynode.LoadCollection(ctx, request)
305 306 307
}

func (s *Server) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
308
	return s.proxynode.ReleaseCollection(ctx, request)
309 310 311
}

func (s *Server) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
G
godchen 已提交
312
	return s.proxynode.DescribeCollection(ctx, request)
313 314
}

G
godchen 已提交
315 316
func (s *Server) GetCollectionStatistics(ctx context.Context, request *milvuspb.GetCollectionStatisticsRequest) (*milvuspb.GetCollectionStatisticsResponse, error) {
	return s.proxynode.GetCollectionStatistics(ctx, request)
317 318
}

G
godchen 已提交
319 320
func (s *Server) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
	return s.proxynode.ShowCollections(ctx, request)
321 322 323
}

func (s *Server) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
G
godchen 已提交
324
	return s.proxynode.CreatePartition(ctx, request)
325 326 327
}

func (s *Server) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
G
godchen 已提交
328
	return s.proxynode.DropPartition(ctx, request)
329 330 331
}

func (s *Server) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
G
godchen 已提交
332
	return s.proxynode.HasPartition(ctx, request)
333 334
}

G
godchen 已提交
335 336
func (s *Server) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitionsRequest) (*commonpb.Status, error) {
	return s.proxynode.LoadPartitions(ctx, request)
337 338
}

G
godchen 已提交
339 340
func (s *Server) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionsRequest) (*commonpb.Status, error) {
	return s.proxynode.ReleasePartitions(ctx, request)
341 342
}

G
godchen 已提交
343 344
func (s *Server) GetPartitionStatistics(ctx context.Context, request *milvuspb.GetPartitionStatisticsRequest) (*milvuspb.GetPartitionStatisticsResponse, error) {
	return s.proxynode.GetPartitionStatistics(ctx, request)
345 346
}

G
godchen 已提交
347 348
func (s *Server) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
	return s.proxynode.ShowPartitions(ctx, request)
349 350 351
}

func (s *Server) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
G
godchen 已提交
352
	return s.proxynode.CreateIndex(ctx, request)
353 354
}

B
BossZou 已提交
355
func (s *Server) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
G
godchen 已提交
356
	return s.proxynode.DropIndex(ctx, request)
B
BossZou 已提交
357 358
}

359
func (s *Server) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
G
godchen 已提交
360
	return s.proxynode.DescribeIndex(ctx, request)
361 362
}

363 364 365 366 367 368
// GetIndexBuildProgress gets index build progress with filed_name and index_name.
// IndexRows is the num of indexed rows. And TotalRows is the total number of segment rows.
func (s *Server) GetIndexBuildProgress(ctx context.Context, request *milvuspb.GetIndexBuildProgressRequest) (*milvuspb.GetIndexBuildProgressResponse, error) {
	return s.proxynode.GetIndexBuildProgress(ctx, request)
}

G
godchen 已提交
369 370
func (s *Server) GetIndexState(ctx context.Context, request *milvuspb.GetIndexStateRequest) (*milvuspb.GetIndexStateResponse, error) {
	return s.proxynode.GetIndexState(ctx, request)
371 372 373
}

func (s *Server) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) {
G
godchen 已提交
374
	return s.proxynode.Insert(ctx, request)
375 376 377
}

func (s *Server) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
G
godchen 已提交
378
	return s.proxynode.Search(ctx, request)
379 380
}

381 382 383 384
func (s *Server) Retrieve(ctx context.Context, request *milvuspb.RetrieveRequest) (*milvuspb.RetrieveResults, error) {
	return s.proxynode.Retrieve(ctx, request)
}

385
func (s *Server) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) {
G
godchen 已提交
386
	return s.proxynode.Flush(ctx, request)
387 388
}

G
godchen 已提交
389 390
func (s *Server) GetDdChannel(ctx context.Context, request *internalpb.GetDdChannelRequest) (*milvuspb.StringResponse, error) {
	return s.proxynode.GetDdChannel(ctx, request)
391
}
Z
zhenshan.cao 已提交
392

G
godchen 已提交
393 394
func (s *Server) GetPersistentSegmentInfo(ctx context.Context, request *milvuspb.GetPersistentSegmentInfoRequest) (*milvuspb.GetPersistentSegmentInfoResponse, error) {
	return s.proxynode.GetPersistentSegmentInfo(ctx, request)
Z
zhenshan.cao 已提交
395
}
Z
zhenshan.cao 已提交
396

G
godchen 已提交
397 398
func (s *Server) GetQuerySegmentInfo(ctx context.Context, request *milvuspb.GetQuerySegmentInfoRequest) (*milvuspb.GetQuerySegmentInfoResponse, error) {
	return s.proxynode.GetQuerySegmentInfo(ctx, request)
Z
zhenshan.cao 已提交
399 400

}
401

X
Xiangyu Wang 已提交
402 403 404 405
func (s *Server) Dummy(ctx context.Context, request *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) {
	return s.proxynode.Dummy(ctx, request)
}

G
godchen 已提交
406 407
func (s *Server) RegisterLink(ctx context.Context, request *milvuspb.RegisterLinkRequest) (*milvuspb.RegisterLinkResponse, error) {
	return s.proxynode.RegisterLink(ctx, request)
408
}