service.go 10.6 KB
Newer Older
1 2 3 4
package grpcproxynode

import (
	"context"
S
sunby 已提交
5 6
	"fmt"
	"io"
Z
zhenshan.cao 已提交
7
	"log"
8
	"net"
D
dragondriver 已提交
9
	"os"
10 11
	"strconv"
	"sync"
12 13
	"time"

S
sunby 已提交
14 15
	"github.com/opentracing/opentracing-go"
	"github.com/uber/jaeger-client-go/config"
Z
zhenshan.cao 已提交
16 17 18
	grpcdataservice "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
	grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
	grcpmasterservice "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
S
sunby 已提交
19
	grpcproxyserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"
Z
zhenshan.cao 已提交
20
	grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
S
sunby 已提交
21
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
22
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
S
sunby 已提交
23
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
24 25
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
	"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
Z
zhenshan.cao 已提交
26
	"github.com/zilliztech/milvus-distributed/internal/proxynode"
S
sunby 已提交
27 28
	"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
	"google.golang.org/grpc"
29 30 31
)

type Server struct {
Z
zhenshan.cao 已提交
32 33 34 35
	ctx        context.Context
	wg         sync.WaitGroup
	impl       *proxynode.NodeImpl
	grpcServer *grpc.Server
36

Z
zhenshan.cao 已提交
37
	grpcErrChan chan error
38

Z
zhenshan.cao 已提交
39 40
	ip   string
	port int
41

Z
zhenshan.cao 已提交
42
	//todo
43
	proxyServiceClient *grpcproxyserviceclient.Client
44

Z
zhenshan.cao 已提交
45 46 47 48 49
	// todo InitParams Service addrs
	masterServiceClient *grcpmasterservice.GrpcClient
	dataServiceClient   *grpcdataservice.Client
	queryServiceClient  *grpcqueryserviceclient.Client
	indexServiceClient  *grpcindexserviceclient.Client
S
sunby 已提交
50 51 52

	tracer opentracing.Tracer
	closer io.Closer
Z
zhenshan.cao 已提交
53
}
54

G
groot 已提交
55
func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
56

S
sunby 已提交
57
	var err error
Z
zhenshan.cao 已提交
58
	server := &Server{
X
Xiangyu Wang 已提交
59
		ctx:         ctx,
Z
zhenshan.cao 已提交
60
		grpcErrChan: make(chan error),
61 62
	}

S
sunby 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75
	cfg := &config.Configuration{
		ServiceName: "proxynode",
		Sampler: &config.SamplerConfig{
			Type:  "const",
			Param: 1,
		},
	}
	server.tracer, server.closer, err = cfg.NewTracer()
	if err != nil {
		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
	}
	opentracing.SetGlobalTracer(server.tracer)

G
groot 已提交
76
	server.impl, err = proxynode.NewProxyNodeImpl(server.ctx, factory)
77
	if err != nil {
Z
zhenshan.cao 已提交
78
		return nil, err
79
	}
Z
zhenshan.cao 已提交
80 81
	return server, err
}
82

Z
zhenshan.cao 已提交
83
func (s *Server) startGrpcLoop(grpcPort int) {
84

Z
zhenshan.cao 已提交
85
	defer s.wg.Done()
86

Z
zhenshan.cao 已提交
87 88
	log.Println("network port: ", grpcPort)
	lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
89
	if err != nil {
Z
zhenshan.cao 已提交
90 91 92
		log.Printf("GrpcServer:failed to listen: %v", err)
		s.grpcErrChan <- err
		return
93 94
	}

Z
zhenshan.cao 已提交
95 96
	ctx, cancel := context.WithCancel(s.ctx)
	defer cancel()
97

Z
zhenshan.cao 已提交
98 99 100
	s.grpcServer = grpc.NewServer()
	proxypb.RegisterProxyNodeServiceServer(s.grpcServer, s)
	milvuspb.RegisterMilvusServiceServer(s.grpcServer, s)
101

Z
zhenshan.cao 已提交
102 103 104
	go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
	if err := s.grpcServer.Serve(lis); err != nil {
		s.grpcErrChan <- err
105 106
	}

Z
zhenshan.cao 已提交
107
}
108

Z
zhenshan.cao 已提交
109
func (s *Server) Run() error {
110

Z
zhenshan.cao 已提交
111
	if err := s.init(); err != nil {
112
		return err
113
	}
114
	log.Println("proxy node init done ...")
115

Z
zhenshan.cao 已提交
116
	if err := s.start(); err != nil {
117 118
		return err
	}
119
	log.Println("proxy node start done ...")
120 121 122
	return nil
}

Z
zhenshan.cao 已提交
123 124
func (s *Server) init() error {
	var err error
125
	Params.Init()
D
dragondriver 已提交
126

Z
zhenshan.cao 已提交
127 128 129 130 131 132
	Params.IP = funcutil.GetLocalIP()
	host := os.Getenv("PROXY_NODE_HOST")
	if len(host) > 0 {
		Params.IP = host
	}

Z
zhenshan.cao 已提交
133 134 135
	Params.LoadFromEnv()
	Params.LoadFromArgs()

Z
zhenshan.cao 已提交
136 137
	Params.Port = funcutil.GetAvailablePort()
	Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
D
dragondriver 已提交
138

139 140 141 142
	log.Println("proxy host: ", Params.IP)
	log.Println("proxy port: ", Params.Port)
	log.Println("proxy address: ", Params.Address)

Z
zhenshan.cao 已提交
143
	defer func() {
D
dragondriver 已提交
144
		if err != nil {
Z
zhenshan.cao 已提交
145 146 147 148
			err2 := s.Stop()
			if err2 != nil {
				log.Println("Init failed, and Stop failed")
			}
D
dragondriver 已提交
149
		}
Z
zhenshan.cao 已提交
150
	}()
D
dragondriver 已提交
151

Z
zhenshan.cao 已提交
152
	s.wg.Add(1)
X
Xiangyu Wang 已提交
153
	go s.startGrpcLoop(Params.Port)
Z
zhenshan.cao 已提交
154 155
	// wait for grpc server loop start
	err = <-s.grpcErrChan
156
	log.Println("create grpc server ...")
Z
zhenshan.cao 已提交
157 158
	if err != nil {
		return err
D
dragondriver 已提交
159
	}
Z
zhenshan.cao 已提交
160

161
	s.proxyServiceClient = grpcproxyserviceclient.NewClient(Params.ProxyServiceAddress)
Z
zhenshan.cao 已提交
162 163 164
	err = s.proxyServiceClient.Init()
	if err != nil {
		return err
D
dragondriver 已提交
165
	}
Z
zhenshan.cao 已提交
166
	s.impl.SetProxyServiceClient(s.proxyServiceClient)
167
	log.Println("set proxy service client ...")
Z
zhenshan.cao 已提交
168 169

	masterServiceAddr := Params.MasterAddress
170
	log.Println("master address: ", masterServiceAddr)
Z
zhenshan.cao 已提交
171 172 173 174
	timeout := 3 * time.Second
	s.masterServiceClient, err = grcpmasterservice.NewGrpcClient(masterServiceAddr, timeout)
	if err != nil {
		return err
D
dragondriver 已提交
175
	}
Z
zhenshan.cao 已提交
176
	err = s.masterServiceClient.Init()
D
dragondriver 已提交
177
	if err != nil {
Z
zhenshan.cao 已提交
178
		return err
D
dragondriver 已提交
179
	}
Z
zhenshan.cao 已提交
180
	s.impl.SetMasterClient(s.masterServiceClient)
181
	log.Println("set master client ...")
D
dragondriver 已提交
182

Z
zhenshan.cao 已提交
183
	dataServiceAddr := Params.DataServiceAddress
C
cai.zhang 已提交
184
	log.Println("data service address ...", dataServiceAddr)
Z
zhenshan.cao 已提交
185 186 187 188 189 190
	s.dataServiceClient = grpcdataservice.NewClient(dataServiceAddr)
	err = s.dataServiceClient.Init()
	if err != nil {
		return err
	}
	s.impl.SetDataServiceClient(s.dataServiceClient)
191
	log.Println("set data service address ...")
Z
zhenshan.cao 已提交
192 193

	indexServiceAddr := Params.IndexServerAddress
194
	log.Println("index server address: ", indexServiceAddr)
Z
zhenshan.cao 已提交
195 196
	s.indexServiceClient = grpcindexserviceclient.NewClient(indexServiceAddr)
	err = s.indexServiceClient.Init()
197 198 199
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
200
	s.impl.SetIndexServiceClient(s.indexServiceClient)
201 202
	log.Println("set index service client ...")

D
dragondriver 已提交
203 204 205 206 207 208 209 210 211 212 213 214
	queryServiceAddr := Params.QueryServiceAddress
	log.Println("query server address: ", queryServiceAddr)
	s.queryServiceClient, err = grpcqueryserviceclient.NewClient(queryServiceAddr, timeout)
	if err != nil {
		return err
	}
	err = s.queryServiceClient.Init()
	if err != nil {
		return err
	}
	s.impl.SetQueryServiceClient(s.queryServiceClient)
	log.Println("set query service client ...")
215

Z
zhenshan.cao 已提交
216
	proxynode.Params.Init()
217
	log.Println("init params done ...")
Z
zhenshan.cao 已提交
218 219 220 221 222
	proxynode.Params.NetworkPort = Params.Port
	proxynode.Params.IP = Params.IP
	proxynode.Params.NetworkAddress = Params.Address
	// for purpose of ID Allocator
	proxynode.Params.MasterAddress = Params.MasterAddress
223

Z
zhenshan.cao 已提交
224
	s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
225

Z
zhenshan.cao 已提交
226
	if err := s.impl.Init(); err != nil {
227
		log.Println("impl init error: ", err)
Z
zhenshan.cao 已提交
228 229 230 231 232
		return err
	}

	return nil
}
233

Z
zhenshan.cao 已提交
234
func (s *Server) start() error {
235 236 237 238 239
	return s.impl.Start()
}

func (s *Server) Stop() error {
	var err error
S
sunby 已提交
240
	s.closer.Close()
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260

	if s.grpcServer != nil {
		s.grpcServer.GracefulStop()
	}

	err = s.impl.Stop()
	if err != nil {
		return err
	}

	s.wg.Wait()

	return nil
}

func (s *Server) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
	return s.impl.InvalidateCollectionMetaCache(ctx, request)
}

func (s *Server) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
261
	return s.impl.CreateCollection(request)
262 263 264
}

func (s *Server) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
265
	return s.impl.DropCollection(request)
266 267 268
}

func (s *Server) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
Z
zhenshan.cao 已提交
269
	return s.impl.HasCollection(request)
270 271 272
}

func (s *Server) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
273
	return s.impl.LoadCollection(request)
274 275 276
}

func (s *Server) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
277
	return s.impl.ReleaseCollection(request)
278 279 280
}

func (s *Server) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
Z
zhenshan.cao 已提交
281
	return s.impl.DescribeCollection(request)
282 283 284
}

func (s *Server) GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error) {
Z
zhenshan.cao 已提交
285
	return s.impl.GetCollectionStatistics(request)
286 287 288
}

func (s *Server) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
Z
zhenshan.cao 已提交
289
	return s.impl.ShowCollections(request)
290 291 292
}

func (s *Server) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
293
	return s.impl.CreatePartition(request)
294 295 296
}

func (s *Server) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
297
	return s.impl.DropPartition(request)
298 299 300
}

func (s *Server) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
Z
zhenshan.cao 已提交
301
	return s.impl.HasPartition(request)
302 303 304
}

func (s *Server) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
305
	return s.impl.LoadPartitions(request)
306 307 308
}

func (s *Server) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
309
	return s.impl.ReleasePartitions(request)
310 311 312
}

func (s *Server) GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error) {
Z
zhenshan.cao 已提交
313
	return s.impl.GetPartitionStatistics(request)
314 315 316
}

func (s *Server) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
Z
zhenshan.cao 已提交
317
	return s.impl.ShowPartitions(request)
318 319 320
}

func (s *Server) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
321
	return s.impl.CreateIndex(request)
322 323
}

B
BossZou 已提交
324 325 326 327
func (s *Server) DropIndex(ctx context.Context, request *milvuspb.DropIndexRequest) (*commonpb.Status, error) {
	return s.impl.DropIndex(request)
}

328
func (s *Server) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
Z
zhenshan.cao 已提交
329
	return s.impl.DescribeIndex(request)
330 331 332
}

func (s *Server) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) {
Z
zhenshan.cao 已提交
333
	return s.impl.GetIndexState(request)
334 335 336
}

func (s *Server) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) {
Z
zhenshan.cao 已提交
337
	return s.impl.Insert(request)
338 339 340
}

func (s *Server) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
Z
zhenshan.cao 已提交
341
	return s.impl.Search(request)
342 343 344
}

func (s *Server) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
345
	return s.impl.Flush(request)
346 347 348
}

func (s *Server) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error) {
Z
zhenshan.cao 已提交
349
	return s.impl.GetDdChannel(request)
350
}
Z
zhenshan.cao 已提交
351 352

func (s *Server) GetPersistentSegmentInfo(ctx context.Context, request *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
X
XuanYang-cn 已提交
353
	return s.impl.GetPersistentSegmentInfo(request)
Z
zhenshan.cao 已提交
354
}
Z
zhenshan.cao 已提交
355 356 357 358 359

func (s *Server) GetQuerySegmentInfo(ctx context.Context, request *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error) {
	return s.impl.GetQuerySegmentInfo(request)

}