service.go 11.7 KB
Newer Older
1 2 3 4
package grpcproxynode

import (
	"context"
G
godchen 已提交
5
	"fmt"
S
sunby 已提交
6
	"io"
7
	"math"
8 9 10
	"net"
	"strconv"
	"sync"
11 12
	"time"

13 14
	"go.uber.org/zap"

15 16
	"google.golang.org/grpc"

G
godchen 已提交
17
	otgrpc "github.com/opentracing-contrib/go-grpc"
18
	grpcdataserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice/client"
Z
zhenshan.cao 已提交
19
	grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
20
	grpcmasterserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice/client"
S
sunby 已提交
21
	grpcproxyserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
Z
zhenshan.cao 已提交
22
	grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
23 24

	"github.com/opentracing/opentracing-go"
G
godchen 已提交
25
	"github.com/uber/jaeger-client-go/config"
26
	"github.com/zilliztech/milvus-distributed/internal/log"
S
sunby 已提交
27
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
28
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
S
sunby 已提交
29
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
30 31
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
	"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
Z
zhenshan.cao 已提交
32
	"github.com/zilliztech/milvus-distributed/internal/proxynode"
S
sunby 已提交
33
	"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
34 35
)

36 37 38 39
const (
	GRPCMaxMagSize = 2 << 30
)

40
type Server struct {
Z
zhenshan.cao 已提交
41 42
	ctx        context.Context
	wg         sync.WaitGroup
43
	impl       *proxynode.ProxyNode
Z
zhenshan.cao 已提交
44
	grpcServer *grpc.Server
45

Z
zhenshan.cao 已提交
46
	grpcErrChan chan error
47

48 49 50
	proxyServiceClient  *grpcproxyserviceclient.Client
	masterServiceClient *grpcmasterserviceclient.GrpcClient
	dataServiceClient   *grpcdataserviceclient.Client
Z
zhenshan.cao 已提交
51 52
	queryServiceClient  *grpcqueryserviceclient.Client
	indexServiceClient  *grpcindexserviceclient.Client
S
sunby 已提交
53 54 55

	tracer opentracing.Tracer
	closer io.Closer
Z
zhenshan.cao 已提交
56
}
57

G
groot 已提交
58
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
59

S
sunby 已提交
60
	var err error
Z
zhenshan.cao 已提交
61
	server := &Server{
X
Xiangyu Wang 已提交
62
		ctx:         ctx,
Z
zhenshan.cao 已提交
63
		grpcErrChan: make(chan error),
64 65
	}

66
	server.impl, err = proxynode.NewProxyNode(server.ctx, factory)
67
	if err != nil {
Z
zhenshan.cao 已提交
68
		return nil, err
69
	}
Z
zhenshan.cao 已提交
70 71
	return server, err
}
72

Z
zhenshan.cao 已提交
73
func (s *Server) startGrpcLoop(grpcPort int) {
74

Z
zhenshan.cao 已提交
75
	defer s.wg.Done()
76

77
	log.Debug("proxynode", zap.Int("network port", grpcPort))
Z
zhenshan.cao 已提交
78
	lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
79
	if err != nil {
80
		log.Warn("proxynode", zap.String("Server:failed to listen:", err.Error()))
Z
zhenshan.cao 已提交
81 82
		s.grpcErrChan <- err
		return
83 84
	}

Z
zhenshan.cao 已提交
85 86
	ctx, cancel := context.WithCancel(s.ctx)
	defer cancel()
87

G
godchen 已提交
88
	tracer := opentracing.GlobalTracer()
89 90 91 92 93
	s.grpcServer = grpc.NewServer(
		grpc.MaxRecvMsgSize(math.MaxInt32),
		grpc.MaxSendMsgSize(math.MaxInt32),
		grpc.UnaryInterceptor(
			otgrpc.OpenTracingServerInterceptor(tracer)),
G
godchen 已提交
94
		grpc.StreamInterceptor(
95 96
			otgrpc.OpenTracingStreamServerInterceptor(tracer)),
		grpc.MaxRecvMsgSize(GRPCMaxMagSize))
Z
zhenshan.cao 已提交
97 98
	proxypb.RegisterProxyNodeServiceServer(s.grpcServer, s)
	milvuspb.RegisterMilvusServiceServer(s.grpcServer, s)
99

Z
zhenshan.cao 已提交
100 101 102
	go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
	if err := s.grpcServer.Serve(lis); err != nil {
		s.grpcErrChan <- err
103 104
	}

Z
zhenshan.cao 已提交
105
}
106

Z
zhenshan.cao 已提交
107
func (s *Server) Run() error {
108

Z
zhenshan.cao 已提交
109
	if err := s.init(); err != nil {
110
		return err
111
	}
112
	log.Debug("proxy node init done ...")
113

Z
zhenshan.cao 已提交
114
	if err := s.start(); err != nil {
115 116
		return err
	}
117
	log.Debug("proxy node start done ...")
118 119 120
	return nil
}

Z
zhenshan.cao 已提交
121
func (s *Server) init() error {
122
	ctx := context.Background()
Z
zhenshan.cao 已提交
123
	var err error
124
	Params.Init()
Z
zhenshan.cao 已提交
125 126 127
	if !funcutil.CheckPortAvailable(Params.Port) {
		Params.Port = funcutil.GetAvailablePort()
	}
Z
zhenshan.cao 已提交
128 129 130
	Params.LoadFromEnv()
	Params.LoadFromArgs()

Z
zhenshan.cao 已提交
131
	Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
D
dragondriver 已提交
132

133 134 135
	log.Debug("proxynode", zap.String("proxy host", Params.IP))
	log.Debug("proxynode", zap.Int("proxy port", Params.Port))
	log.Debug("proxynode", zap.String("proxy address", Params.Address))
136

G
godchen 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
	// TODO
	cfg := &config.Configuration{
		ServiceName: fmt.Sprintf("proxy_node ip: %s, port: %d", Params.IP, Params.Port),
		Sampler: &config.SamplerConfig{
			Type:  "const",
			Param: 1,
		},
	}
	tracer, closer, err := cfg.NewTracer()
	if err != nil {
		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
	}
	opentracing.SetGlobalTracer(tracer)
	s.closer = closer

Z
zhenshan.cao 已提交
152
	defer func() {
D
dragondriver 已提交
153
		if err != nil {
Z
zhenshan.cao 已提交
154 155
			err2 := s.Stop()
			if err2 != nil {
156
				log.Debug("Init failed, and Stop failed")
Z
zhenshan.cao 已提交
157
			}
D
dragondriver 已提交
158
		}
Z
zhenshan.cao 已提交
159
	}()
D
dragondriver 已提交
160

Z
zhenshan.cao 已提交
161
	s.wg.Add(1)
X
Xiangyu Wang 已提交
162
	go s.startGrpcLoop(Params.Port)
Z
zhenshan.cao 已提交
163 164
	// wait for grpc server loop start
	err = <-s.grpcErrChan
165
	log.Debug("create grpc server ...")
Z
zhenshan.cao 已提交
166 167
	if err != nil {
		return err
D
dragondriver 已提交
168
	}
Z
zhenshan.cao 已提交
169

170
	s.proxyServiceClient = grpcproxyserviceclient.NewClient(Params.ProxyServiceAddress)
Z
zhenshan.cao 已提交
171 172 173
	err = s.proxyServiceClient.Init()
	if err != nil {
		return err
D
dragondriver 已提交
174
	}
Z
zhenshan.cao 已提交
175
	s.impl.SetProxyServiceClient(s.proxyServiceClient)
176
	log.Debug("set proxy service client ...")
Z
zhenshan.cao 已提交
177 178

	masterServiceAddr := Params.MasterAddress
179
	log.Debug("proxynode", zap.String("master address", masterServiceAddr))
Z
zhenshan.cao 已提交
180
	timeout := 3 * time.Second
181
	s.masterServiceClient, err = grpcmasterserviceclient.NewClient(masterServiceAddr, timeout)
Z
zhenshan.cao 已提交
182 183
	if err != nil {
		return err
D
dragondriver 已提交
184
	}
Z
zhenshan.cao 已提交
185
	err = s.masterServiceClient.Init()
D
dragondriver 已提交
186
	if err != nil {
Z
zhenshan.cao 已提交
187
		return err
D
dragondriver 已提交
188
	}
189 190 191 192 193
	err = funcutil.WaitForComponentHealthy(ctx, s.masterServiceClient, "MasterService", 100, time.Millisecond*200)

	if err != nil {
		panic(err)
	}
Z
zhenshan.cao 已提交
194
	s.impl.SetMasterClient(s.masterServiceClient)
195
	log.Debug("set master client ...")
D
dragondriver 已提交
196

Z
zhenshan.cao 已提交
197
	dataServiceAddr := Params.DataServiceAddress
198
	log.Debug("proxynode", zap.String("data service address", dataServiceAddr))
199
	s.dataServiceClient = grpcdataserviceclient.NewClient(dataServiceAddr)
Z
zhenshan.cao 已提交
200 201 202 203 204
	err = s.dataServiceClient.Init()
	if err != nil {
		return err
	}
	s.impl.SetDataServiceClient(s.dataServiceClient)
205
	log.Debug("set data service address ...")
Z
zhenshan.cao 已提交
206 207

	indexServiceAddr := Params.IndexServerAddress
208
	log.Debug("proxynode", zap.String("index server address", indexServiceAddr))
Z
zhenshan.cao 已提交
209 210
	s.indexServiceClient = grpcindexserviceclient.NewClient(indexServiceAddr)
	err = s.indexServiceClient.Init()
211 212 213
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
214
	s.impl.SetIndexServiceClient(s.indexServiceClient)
215
	log.Debug("set index service client ...")
216

D
dragondriver 已提交
217
	queryServiceAddr := Params.QueryServiceAddress
218
	log.Debug("proxynode", zap.String("query server address", queryServiceAddr))
D
dragondriver 已提交
219 220 221 222 223 224 225 226 227
	s.queryServiceClient, err = grpcqueryserviceclient.NewClient(queryServiceAddr, timeout)
	if err != nil {
		return err
	}
	err = s.queryServiceClient.Init()
	if err != nil {
		return err
	}
	s.impl.SetQueryServiceClient(s.queryServiceClient)
228
	log.Debug("set query service client ...")
229

Z
zhenshan.cao 已提交
230
	proxynode.Params.Init()
231
	log.Debug("init params done ...")
Z
zhenshan.cao 已提交
232 233 234 235 236
	proxynode.Params.NetworkPort = Params.Port
	proxynode.Params.IP = Params.IP
	proxynode.Params.NetworkAddress = Params.Address
	// for purpose of ID Allocator
	proxynode.Params.MasterAddress = Params.MasterAddress
237

D
del-zhenwu 已提交
238
	s.impl.UpdateStateCode(internalpb2.StateCode_Initializing)
239

Z
zhenshan.cao 已提交
240
	if err := s.impl.Init(); err != nil {
241
		log.Debug("proxynode", zap.String("impl init error", err.Error()))
Z
zhenshan.cao 已提交
242 243 244 245 246
		return err
	}

	return nil
}
247

Z
zhenshan.cao 已提交
248
func (s *Server) start() error {
249 250 251 252 253
	return s.impl.Start()
}

func (s *Server) Stop() error {
	var err error
S
sunby 已提交
254
	s.closer.Close()
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274

	if s.grpcServer != nil {
		s.grpcServer.GracefulStop()
	}

	err = s.impl.Stop()
	if err != nil {
		return err
	}

	s.wg.Wait()

	return nil
}

func (s *Server) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
	return s.impl.InvalidateCollectionMetaCache(ctx, request)
}

func (s *Server) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
275
	return s.impl.CreateCollection(ctx, request)
276 277 278
}

func (s *Server) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
279
	return s.impl.DropCollection(ctx, request)
280 281 282
}

func (s *Server) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
G
godchen 已提交
283
	return s.impl.HasCollection(ctx, request)
284 285 286
}

func (s *Server) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
287
	return s.impl.LoadCollection(ctx, request)
288 289 290
}

func (s *Server) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
G
godchen 已提交
291
	return s.impl.ReleaseCollection(ctx, request)
292 293 294
}

func (s *Server) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
G
godchen 已提交
295
	return s.impl.DescribeCollection(ctx, request)
296 297 298
}

func (s *Server) GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error) {
G
godchen 已提交
299
	return s.impl.GetCollectionStatistics(ctx, request)
300 301 302
}

func (s *Server) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
G
godchen 已提交
303
	return s.impl.ShowCollections(ctx, request)
304 305 306
}

func (s *Server) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
G
godchen 已提交
307
	return s.impl.CreatePartition(ctx, request)
308 309 310
}

func (s *Server) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
G
godchen 已提交
311
	return s.impl.DropPartition(ctx, request)
312 313 314
}

func (s *Server) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
G
godchen 已提交
315
	return s.impl.HasPartition(ctx, request)
316 317 318
}

func (s *Server) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error) {
G
godchen 已提交
319
	return s.impl.LoadPartitions(ctx, request)
320 321 322
}

func (s *Server) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) {
G
godchen 已提交
323
	return s.impl.ReleasePartitions(ctx, request)
324 325 326
}

func (s *Server) GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error) {
G
godchen 已提交
327
	return s.impl.GetPartitionStatistics(ctx, request)
328 329 330
}

func (s *Server) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
G
godchen 已提交
331
	return s.impl.ShowPartitions(ctx, request)
332 333 334
}

func (s *Server) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
G
godchen 已提交
335
	return s.impl.CreateIndex(ctx, request)
336 337
}

B
BossZou 已提交
338
func (s *Server) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
G
godchen 已提交
339
	return s.impl.DropIndex(ctx, request)
B
BossZou 已提交
340 341
}

342
func (s *Server) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
G
godchen 已提交
343
	return s.impl.DescribeIndex(ctx, request)
344 345 346
}

func (s *Server) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) {
G
godchen 已提交
347
	return s.impl.GetIndexState(ctx, request)
348 349 350
}

func (s *Server) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) {
G
godchen 已提交
351
	return s.impl.Insert(ctx, request)
352 353 354
}

func (s *Server) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
G
godchen 已提交
355
	return s.impl.Search(ctx, request)
356 357 358
}

func (s *Server) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) {
G
godchen 已提交
359
	return s.impl.Flush(ctx, request)
360 361 362
}

func (s *Server) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error) {
G
godchen 已提交
363
	return s.impl.GetDdChannel(ctx, request)
364
}
Z
zhenshan.cao 已提交
365 366

func (s *Server) GetPersistentSegmentInfo(ctx context.Context, request *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
G
godchen 已提交
367
	return s.impl.GetPersistentSegmentInfo(ctx, request)
Z
zhenshan.cao 已提交
368
}
Z
zhenshan.cao 已提交
369 370

func (s *Server) GetQuerySegmentInfo(ctx context.Context, request *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error) {
G
godchen 已提交
371
	return s.impl.GetQuerySegmentInfo(ctx, request)
Z
zhenshan.cao 已提交
372 373

}
374 375 376 377

func (s *Server) RegisterLink(ctx context.Context, empty *commonpb.Empty) (*milvuspb.RegisterLinkResponse, error) {
	return s.impl.RegisterLink(empty)
}