service.go 9.8 KB
Newer Older
1 2 3 4
package grpcproxynode

import (
	"context"
Z
zhenshan.cao 已提交
5
	"log"
6
	"net"
D
dragondriver 已提交
7
	"os"
8 9
	"strconv"
	"sync"
10 11
	"time"

12 13
	grpcproxyserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice/client"

Z
zhenshan.cao 已提交
14
	"github.com/zilliztech/milvus-distributed/internal/util/funcutil"
15 16

	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
17

Z
zhenshan.cao 已提交
18
	"google.golang.org/grpc"
D
dragondriver 已提交
19

Z
zhenshan.cao 已提交
20 21 22 23
	grpcdataservice "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
	grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
	grcpmasterservice "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
	grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
24 25 26
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
	"github.com/zilliztech/milvus-distributed/internal/proto/proxypb"
Z
zhenshan.cao 已提交
27
	"github.com/zilliztech/milvus-distributed/internal/proxynode"
28 29 30
)

type Server struct {
Z
zhenshan.cao 已提交
31 32 33 34
	ctx        context.Context
	wg         sync.WaitGroup
	impl       *proxynode.NodeImpl
	grpcServer *grpc.Server
35

Z
zhenshan.cao 已提交
36
	grpcErrChan chan error
37

Z
zhenshan.cao 已提交
38 39
	ip   string
	port int
40

Z
zhenshan.cao 已提交
41
	//todo
42
	proxyServiceClient *grpcproxyserviceclient.Client
43

Z
zhenshan.cao 已提交
44 45 46 47 48 49
	// todo InitParams Service addrs
	masterServiceClient *grcpmasterservice.GrpcClient
	dataServiceClient   *grpcdataservice.Client
	queryServiceClient  *grpcqueryserviceclient.Client
	indexServiceClient  *grpcindexserviceclient.Client
}
50

X
Xiangyu Wang 已提交
51
func NewServer(ctx context.Context) (*Server, error) {
52

Z
zhenshan.cao 已提交
53
	server := &Server{
X
Xiangyu Wang 已提交
54
		ctx:         ctx,
Z
zhenshan.cao 已提交
55
		grpcErrChan: make(chan error),
56 57
	}

Z
zhenshan.cao 已提交
58 59
	var err error
	server.impl, err = proxynode.NewProxyNodeImpl(server.ctx)
60
	if err != nil {
Z
zhenshan.cao 已提交
61
		return nil, err
62
	}
Z
zhenshan.cao 已提交
63 64
	return server, err
}
65

Z
zhenshan.cao 已提交
66
func (s *Server) startGrpcLoop(grpcPort int) {
67

Z
zhenshan.cao 已提交
68
	defer s.wg.Done()
69

Z
zhenshan.cao 已提交
70 71
	log.Println("network port: ", grpcPort)
	lis, err := net.Listen("tcp", ":"+strconv.Itoa(grpcPort))
72
	if err != nil {
Z
zhenshan.cao 已提交
73 74 75
		log.Printf("GrpcServer:failed to listen: %v", err)
		s.grpcErrChan <- err
		return
76 77
	}

Z
zhenshan.cao 已提交
78 79
	ctx, cancel := context.WithCancel(s.ctx)
	defer cancel()
80

Z
zhenshan.cao 已提交
81 82 83
	s.grpcServer = grpc.NewServer()
	proxypb.RegisterProxyNodeServiceServer(s.grpcServer, s)
	milvuspb.RegisterMilvusServiceServer(s.grpcServer, s)
84

Z
zhenshan.cao 已提交
85 86 87
	go funcutil.CheckGrpcReady(ctx, s.grpcErrChan)
	if err := s.grpcServer.Serve(lis); err != nil {
		s.grpcErrChan <- err
88 89
	}

Z
zhenshan.cao 已提交
90
}
91

Z
zhenshan.cao 已提交
92
func (s *Server) Run() error {
93

Z
zhenshan.cao 已提交
94
	if err := s.init(); err != nil {
95
		return err
96
	}
97
	log.Println("proxy node init done ...")
98

Z
zhenshan.cao 已提交
99
	if err := s.start(); err != nil {
100 101
		return err
	}
102
	log.Println("proxy node start done ...")
103 104 105
	return nil
}

Z
zhenshan.cao 已提交
106 107
func (s *Server) init() error {
	var err error
108
	Params.Init()
D
dragondriver 已提交
109

Z
zhenshan.cao 已提交
110 111 112 113 114 115 116 117
	Params.IP = funcutil.GetLocalIP()
	host := os.Getenv("PROXY_NODE_HOST")
	if len(host) > 0 {
		Params.IP = host
	}

	Params.Port = funcutil.GetAvailablePort()
	Params.Address = Params.IP + ":" + strconv.FormatInt(int64(Params.Port), 10)
D
dragondriver 已提交
118

119 120 121 122
	log.Println("proxy host: ", Params.IP)
	log.Println("proxy port: ", Params.Port)
	log.Println("proxy address: ", Params.Address)

Z
zhenshan.cao 已提交
123
	defer func() {
D
dragondriver 已提交
124
		if err != nil {
Z
zhenshan.cao 已提交
125 126 127 128
			err2 := s.Stop()
			if err2 != nil {
				log.Println("Init failed, and Stop failed")
			}
D
dragondriver 已提交
129
		}
Z
zhenshan.cao 已提交
130
	}()
D
dragondriver 已提交
131

Z
zhenshan.cao 已提交
132
	s.wg.Add(1)
X
Xiangyu Wang 已提交
133
	go s.startGrpcLoop(Params.Port)
Z
zhenshan.cao 已提交
134 135
	// wait for grpc server loop start
	err = <-s.grpcErrChan
136
	log.Println("create grpc server ...")
Z
zhenshan.cao 已提交
137 138
	if err != nil {
		return err
D
dragondriver 已提交
139
	}
Z
zhenshan.cao 已提交
140

141
	s.proxyServiceClient = grpcproxyserviceclient.NewClient(Params.ProxyServiceAddress)
Z
zhenshan.cao 已提交
142 143 144
	err = s.proxyServiceClient.Init()
	if err != nil {
		return err
D
dragondriver 已提交
145
	}
Z
zhenshan.cao 已提交
146
	s.impl.SetProxyServiceClient(s.proxyServiceClient)
147
	log.Println("set proxy service client ...")
Z
zhenshan.cao 已提交
148 149

	masterServiceAddr := Params.MasterAddress
150
	log.Println("master address: ", masterServiceAddr)
Z
zhenshan.cao 已提交
151 152 153 154
	timeout := 3 * time.Second
	s.masterServiceClient, err = grcpmasterservice.NewGrpcClient(masterServiceAddr, timeout)
	if err != nil {
		return err
D
dragondriver 已提交
155
	}
Z
zhenshan.cao 已提交
156
	err = s.masterServiceClient.Init()
D
dragondriver 已提交
157
	if err != nil {
Z
zhenshan.cao 已提交
158
		return err
D
dragondriver 已提交
159
	}
Z
zhenshan.cao 已提交
160
	s.impl.SetMasterClient(s.masterServiceClient)
161
	log.Println("set master client ...")
D
dragondriver 已提交
162

Z
zhenshan.cao 已提交
163
	dataServiceAddr := Params.DataServiceAddress
164
	log.Println("data service address ...")
Z
zhenshan.cao 已提交
165 166 167 168 169 170
	s.dataServiceClient = grpcdataservice.NewClient(dataServiceAddr)
	err = s.dataServiceClient.Init()
	if err != nil {
		return err
	}
	s.impl.SetDataServiceClient(s.dataServiceClient)
171
	log.Println("set data service address ...")
Z
zhenshan.cao 已提交
172 173

	indexServiceAddr := Params.IndexServerAddress
174
	log.Println("index server address: ", indexServiceAddr)
Z
zhenshan.cao 已提交
175 176
	s.indexServiceClient = grpcindexserviceclient.NewClient(indexServiceAddr)
	err = s.indexServiceClient.Init()
177 178 179
	if err != nil {
		return err
	}
Z
zhenshan.cao 已提交
180
	s.impl.SetIndexServiceClient(s.indexServiceClient)
181 182 183 184 185 186 187 188 189 190 191
	log.Println("set index service client ...")

	// queryServiceAddr := Params.QueryServiceAddress
	// log.Println("query service address: ", queryServiceAddr)
	// s.queryServiceClient = grpcqueryserviceclient.NewClient(queryServiceAddr)
	// err = s.queryServiceClient.Init()
	// if err != nil {
	// 	return err
	// }
	// s.impl.SetQueryServiceClient(s.queryServiceClient)
	// log.Println("set query service client ...")
192

Z
zhenshan.cao 已提交
193
	proxynode.Params.Init()
194
	log.Println("init params done ...")
Z
zhenshan.cao 已提交
195 196 197 198 199
	proxynode.Params.NetworkPort = Params.Port
	proxynode.Params.IP = Params.IP
	proxynode.Params.NetworkAddress = Params.Address
	// for purpose of ID Allocator
	proxynode.Params.MasterAddress = Params.MasterAddress
200

Z
zhenshan.cao 已提交
201
	s.impl.UpdateStateCode(internalpb2.StateCode_INITIALIZING)
202

Z
zhenshan.cao 已提交
203
	if err := s.impl.Init(); err != nil {
204
		log.Println("impl init error: ", err)
Z
zhenshan.cao 已提交
205 206 207 208 209
		return err
	}

	return nil
}
210

Z
zhenshan.cao 已提交
211
func (s *Server) start() error {
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
	return s.impl.Start()
}

func (s *Server) Stop() error {
	var err error

	if s.grpcServer != nil {
		s.grpcServer.GracefulStop()
	}

	err = s.impl.Stop()
	if err != nil {
		return err
	}

	s.wg.Wait()

	return nil
}

func (s *Server) InvalidateCollectionMetaCache(ctx context.Context, request *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
	return s.impl.InvalidateCollectionMetaCache(ctx, request)
}

func (s *Server) CreateCollection(ctx context.Context, request *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
237
	return s.impl.CreateCollection(request)
238 239 240
}

func (s *Server) DropCollection(ctx context.Context, request *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
241
	return s.impl.DropCollection(request)
242 243 244
}

func (s *Server) HasCollection(ctx context.Context, request *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
Z
zhenshan.cao 已提交
245
	return s.impl.HasCollection(request)
246 247 248
}

func (s *Server) LoadCollection(ctx context.Context, request *milvuspb.LoadCollectionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
249
	return s.impl.LoadCollection(request)
250 251 252
}

func (s *Server) ReleaseCollection(ctx context.Context, request *milvuspb.ReleaseCollectionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
253
	return s.impl.ReleaseCollection(request)
254 255 256
}

func (s *Server) DescribeCollection(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
Z
zhenshan.cao 已提交
257
	return s.impl.DescribeCollection(request)
258 259 260
}

func (s *Server) GetCollectionStatistics(ctx context.Context, request *milvuspb.CollectionStatsRequest) (*milvuspb.CollectionStatsResponse, error) {
Z
zhenshan.cao 已提交
261
	return s.impl.GetCollectionStatistics(request)
262 263 264
}

func (s *Server) ShowCollections(ctx context.Context, request *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) {
Z
zhenshan.cao 已提交
265
	return s.impl.ShowCollections(request)
266 267 268
}

func (s *Server) CreatePartition(ctx context.Context, request *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
269
	return s.impl.CreatePartition(request)
270 271 272
}

func (s *Server) DropPartition(ctx context.Context, request *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
273
	return s.impl.DropPartition(request)
274 275 276
}

func (s *Server) HasPartition(ctx context.Context, request *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
Z
zhenshan.cao 已提交
277
	return s.impl.HasPartition(request)
278 279 280
}

func (s *Server) LoadPartitions(ctx context.Context, request *milvuspb.LoadPartitonRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
281
	return s.impl.LoadPartitions(request)
282 283 284
}

func (s *Server) ReleasePartitions(ctx context.Context, request *milvuspb.ReleasePartitionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
285
	return s.impl.ReleasePartitions(request)
286 287 288
}

func (s *Server) GetPartitionStatistics(ctx context.Context, request *milvuspb.PartitionStatsRequest) (*milvuspb.PartitionStatsResponse, error) {
Z
zhenshan.cao 已提交
289
	return s.impl.GetPartitionStatistics(request)
290 291 292
}

func (s *Server) ShowPartitions(ctx context.Context, request *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) {
Z
zhenshan.cao 已提交
293
	return s.impl.ShowPartitions(request)
294 295 296
}

func (s *Server) CreateIndex(ctx context.Context, request *milvuspb.CreateIndexRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
297
	return s.impl.CreateIndex(request)
298 299 300
}

func (s *Server) DescribeIndex(ctx context.Context, request *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) {
Z
zhenshan.cao 已提交
301
	return s.impl.DescribeIndex(request)
302 303 304
}

func (s *Server) GetIndexState(ctx context.Context, request *milvuspb.IndexStateRequest) (*milvuspb.IndexStateResponse, error) {
Z
zhenshan.cao 已提交
305
	return s.impl.GetIndexState(request)
306 307 308
}

func (s *Server) Insert(ctx context.Context, request *milvuspb.InsertRequest) (*milvuspb.InsertResponse, error) {
Z
zhenshan.cao 已提交
309
	return s.impl.Insert(request)
310 311 312
}

func (s *Server) Search(ctx context.Context, request *milvuspb.SearchRequest) (*milvuspb.SearchResults, error) {
Z
zhenshan.cao 已提交
313
	return s.impl.Search(request)
314 315 316
}

func (s *Server) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
317
	return s.impl.Flush(request)
318 319 320
}

func (s *Server) GetDdChannel(ctx context.Context, request *commonpb.Empty) (*milvuspb.StringResponse, error) {
Z
zhenshan.cao 已提交
321
	return s.impl.GetDdChannel(request)
322
}
Z
zhenshan.cao 已提交
323 324

func (s *Server) GetPersistentSegmentInfo(ctx context.Context, request *milvuspb.PersistentSegmentInfoRequest) (*milvuspb.PersistentSegmentInfoResponse, error) {
X
XuanYang-cn 已提交
325
	return s.impl.GetPersistentSegmentInfo(request)
Z
zhenshan.cao 已提交
326
}
Z
zhenshan.cao 已提交
327 328 329 330 331

func (s *Server) GetQuerySegmentInfo(ctx context.Context, request *milvuspb.QuerySegmentInfoRequest) (*milvuspb.QuerySegmentInfoResponse, error) {
	return s.impl.GetQuerySegmentInfo(request)

}