提交 90d471b1 编写于 作者: X XuanYang-cn 提交者: yefu.chen

Refactor main of queryservice and querynode

Signed-off-by: NXuanYang-cn <xuan.yang@zilliz.com>
上级 93fd6b70
...@@ -122,6 +122,8 @@ build-go: build-cpp ...@@ -122,6 +122,8 @@ build-go: build-cpp
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxynode $(PWD)/cmd/proxy/node/proxy_node.go 1>/dev/null @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/proxynode $(PWD)/cmd/proxy/node/proxy_node.go 1>/dev/null
@echo "Building query service ..." @echo "Building query service ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/queryservice $(PWD)/cmd/queryservice/queryservice.go 1>/dev/null @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/queryservice $(PWD)/cmd/queryservice/queryservice.go 1>/dev/null
@echo "Building query node ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/querynode $(PWD)/cmd/querynode/querynode.go 1>/dev/null
@echo "Building binlog ..." @echo "Building binlog ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/binlog/main.go 1>/dev/null @mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/binlog/main.go 1>/dev/null
@echo "Building singlenode ..." @echo "Building singlenode ..."
......
...@@ -2,19 +2,238 @@ package components ...@@ -2,19 +2,238 @@ package components
import ( import (
"context" "context"
"fmt"
"log"
"time"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
isc "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
qns "github.com/zilliztech/milvus-distributed/internal/distributed/querynode"
qsc "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
is "github.com/zilliztech/milvus-distributed/internal/indexservice"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
qs "github.com/zilliztech/milvus-distributed/internal/queryservice"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
) )
func NewQueryNode(ctx context.Context) (*QueryNode, error) { type QueryNode struct {
return nil, nil ctx context.Context
svr *qns.Server
dataService *dsc.Client
masterService *msc.GrpcClient
indexService *isc.Client
queryService *qsc.Client
} }
type QueryNode struct { func NewQueryNode(ctx context.Context) (*QueryNode, error) {
const retry = 10
const interval = 500
svr, err := qns.NewServer(ctx)
if err != nil {
panic(err)
}
// --- QueryService ---
qs.Params.Init()
log.Println("QueryService address:", qs.Params.Address)
log.Println("Init Query service client ...")
queryService, err := qsc.NewClient(qs.Params.Address, 20*time.Second)
if err != nil {
panic(err)
}
if err = queryService.Init(); err != nil {
panic(err)
}
if err = queryService.Start(); err != nil {
panic(err)
}
var cnt int
for cnt = 0; cnt < retry; cnt++ {
if cnt != 0 {
log.Println("Query service isn't ready ...")
log.Printf("Retrying getting query service's states in ... %v ms", interval)
}
qsStates, err := queryService.GetComponentStates()
if err != nil {
continue
}
if qsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
}
if qsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && qsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= retry {
panic("Query service isn't ready")
}
if err := svr.SetQueryService(queryService); err != nil {
panic(err)
}
// --- Master Service Client ---
ms.Params.Init()
addr := fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port)
log.Println("Master service address:", addr)
log.Println("Init master service client ...")
masterService, err := msc.NewGrpcClient(addr, 20*time.Second)
if err != nil {
panic(err)
}
if err = masterService.Init(); err != nil {
panic(err)
}
if err = masterService.Start(); err != nil {
panic(err)
}
ticker := time.NewTicker(interval * time.Millisecond)
tctx, tcancel := context.WithTimeout(ctx, 10*interval*time.Millisecond)
defer func() {
ticker.Stop()
tcancel()
}()
for {
var states *internalpb2.ComponentStates
select {
case <-ticker.C:
states, err = masterService.GetComponentStates()
if err != nil {
continue
}
case <-tctx.Done():
return nil, errors.New("master client connect timeout")
}
if states.State.StateCode == internalpb2.StateCode_HEALTHY {
break
}
}
if err := svr.SetMasterService(masterService); err != nil {
panic(err)
}
// --- IndexService ---
is.Params.Init()
log.Println("Index service address:", is.Params.Address)
indexService := isc.NewClient(is.Params.Address)
if err := indexService.Init(); err != nil {
panic(err)
}
if err := indexService.Start(); err != nil {
panic(err)
}
ticker = time.NewTicker(interval * time.Millisecond)
tctx, tcancel = context.WithTimeout(ctx, 10*interval*time.Millisecond)
defer func() {
ticker.Stop()
tcancel()
}()
for {
var states *internalpb2.ComponentStates
select {
case <-ticker.C:
states, err = indexService.GetComponentStates()
if err != nil {
continue
}
case <-tctx.Done():
return nil, errors.New("Index service client connect timeout")
}
if states.State.StateCode == internalpb2.StateCode_HEALTHY {
break
}
}
if err := svr.SetIndexService(indexService); err != nil {
panic(err)
}
// --- DataService ---
ds.Params.Init()
log.Printf("Data service address: %s:%d", ds.Params.Address, ds.Params.Port)
log.Println("Init data service client ...")
dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
for cnt = 0; cnt < retry; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
log.Printf("retry cout = %d, error = %s", cnt, err.Error())
continue
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
log.Printf("retry cout = %d, error = %s", cnt, err.Error())
continue
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= retry {
panic("Data service isn't ready")
}
if err := svr.SetDataService(dataService); err != nil {
panic(err)
}
return &QueryNode{
ctx: ctx,
svr: svr,
dataService: dataService,
masterService: masterService,
indexService: indexService,
queryService: queryService,
}, nil
} }
func (ps *QueryNode) Run() error { func (q *QueryNode) Run() error {
if err := q.svr.Init(); err != nil {
panic(err)
}
if err := q.svr.Start(); err != nil {
panic(err)
}
log.Println("Query node successfully started ...")
return nil return nil
} }
func (ps *QueryNode) Stop() error { func (q *QueryNode) Stop() error {
return nil _ = q.dataService.Stop()
_ = q.masterService.Stop()
_ = q.queryService.Stop()
_ = q.indexService.Stop()
return q.svr.Stop()
} }
...@@ -2,19 +2,138 @@ package components ...@@ -2,19 +2,138 @@ package components
import ( import (
"context" "context"
"fmt"
"log"
"time"
dsc "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
msc "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
qs "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice"
ds "github.com/zilliztech/milvus-distributed/internal/dataservice"
ms "github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/queryservice"
) )
func NewQueryService(ctx context.Context) (*QueryService, error) { type QueryService struct {
return nil, nil ctx context.Context
svr *qs.Server
dataService *dsc.Client
masterService *msc.GrpcClient
} }
type QueryService struct { func NewQueryService(ctx context.Context) (*QueryService, error) {
const retry = 10
const interval = 200
queryservice.Params.Init()
svr := qs.NewServer(ctx)
log.Println("Queryservice id is", queryservice.Params.QueryServiceID)
// --- Master Service Client ---
ms.Params.Init()
log.Printf("Master service address: %s:%d", ms.Params.Address, ms.Params.Port)
log.Println("Init master service client ...")
masterService, err := msc.NewGrpcClient(fmt.Sprintf("%s:%d", ms.Params.Address, ms.Params.Port), 20*time.Second)
if err != nil {
panic(err)
}
if err = masterService.Init(); err != nil {
panic(err)
}
if err = masterService.Start(); err != nil {
panic(err)
}
var cnt int
for cnt = 0; cnt < retry; cnt++ {
time.Sleep(time.Duration(cnt*interval) * time.Millisecond)
if cnt != 0 {
log.Println("Master service isn't ready ...")
log.Printf("Retrying getting master service's states in ... %v ms", interval)
}
msStates, err := masterService.GetComponentStates()
if err != nil {
continue
}
if msStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
}
if msStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= retry {
panic("Master service isn't ready")
}
if err := svr.SetMasterService(masterService); err != nil {
panic(err)
}
// --- Data service client ---
ds.Params.Init()
log.Printf("Data service address: %s:%d", ds.Params.Address, ds.Params.Port)
log.Println("Init data service client ...")
dataService := dsc.NewClient(fmt.Sprintf("%s:%d", ds.Params.Address, ds.Params.Port))
if err = dataService.Init(); err != nil {
panic(err)
}
if err = dataService.Start(); err != nil {
panic(err)
}
for cnt = 0; cnt < retry; cnt++ {
dsStates, err := dataService.GetComponentStates()
if err != nil {
continue
}
if dsStates.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
continue
}
if dsStates.State.StateCode != internalpb2.StateCode_INITIALIZING && dsStates.State.StateCode != internalpb2.StateCode_HEALTHY {
continue
}
break
}
if cnt >= retry {
panic("Data service isn't ready")
}
if err := svr.SetDataService(dataService); err != nil {
panic(err)
}
return &QueryService{
ctx: ctx,
svr: svr,
dataService: dataService,
masterService: masterService,
}, nil
} }
func (ps *QueryService) Run() error { func (qs *QueryService) Run() error {
if err := qs.svr.Init(); err != nil {
panic(err)
}
if err := qs.svr.Start(); err != nil {
panic(err)
}
log.Println("Data node successfully started ...")
return nil return nil
} }
func (ps *QueryService) Stop() error { func (qs *QueryService) Stop() error {
return nil _ = qs.dataService.Stop()
_ = qs.masterService.Stop()
return qs.svr.Stop()
} }
...@@ -7,16 +7,20 @@ import ( ...@@ -7,16 +7,20 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
"go.uber.org/zap" distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components"
grpcquerynode "github.com/zilliztech/milvus-distributed/internal/distributed/querynode"
) )
func main() { func main() {
// Creates server.
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
svr := grpcquerynode.NewServer(ctx) defer cancel()
if err := svr.Init(); err != nil {
svr, err := distributed.NewQueryNode(ctx)
if err != nil {
panic(err)
}
if err = svr.Run(); err != nil {
panic(err) panic(err)
} }
...@@ -27,30 +31,10 @@ func main() { ...@@ -27,30 +31,10 @@ func main() {
syscall.SIGTERM, syscall.SIGTERM,
syscall.SIGQUIT) syscall.SIGQUIT)
var sig os.Signal sig := <-sc
go func() { log.Print("Got signal to exit", sig.String())
sig = <-sc
cancel()
}()
if err := svr.Start(); err != nil {
panic(err)
}
<-ctx.Done()
log.Print("Got signal to exit", zap.String("signal", sig.String()))
if err := svr.Stop(); err != nil { if err := svr.Stop(); err != nil {
panic(err) panic(err)
} }
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
}
}
func exit(code int) {
os.Exit(code)
} }
...@@ -7,23 +7,19 @@ import ( ...@@ -7,23 +7,19 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
grpcqueryservice "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice" distributed "github.com/zilliztech/milvus-distributed/cmd/distributed/components"
"github.com/zilliztech/milvus-distributed/internal/queryservice"
) )
func main() { func main() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
svr := grpcqueryservice.NewServer(ctx) svr, err := distributed.NewQueryService(ctx)
if err != nil {
if err := svr.Init(); err != nil {
panic(err) panic(err)
} }
log.Printf("query service address : %s", queryservice.Params.Address) if err := svr.Run(); err != nil {
if err := svr.Start(); err != nil {
panic(err) panic(err)
} }
...@@ -35,5 +31,8 @@ func main() { ...@@ -35,5 +31,8 @@ func main() {
syscall.SIGQUIT) syscall.SIGQUIT)
sig := <-sc sig := <-sc
log.Printf("Got %s signal to exit", sig.String()) log.Printf("Got %s signal to exit", sig.String())
_ = svr.Stop()
if err := svr.Stop(); err != nil {
panic(err)
}
} }
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"sync" "sync"
dn "github.com/zilliztech/milvus-distributed/internal/datanode" dn "github.com/zilliztech/milvus-distributed/internal/datanode"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
...@@ -74,7 +75,10 @@ func (s *Server) Start() error { ...@@ -74,7 +75,10 @@ func (s *Server) Start() error {
} }
func (s *Server) Stop() error { func (s *Server) Stop() error {
return s.core.Stop() err := s.core.Stop()
s.cancel()
s.grpcServer.GracefulStop()
return err
} }
func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) { func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
...@@ -86,6 +90,12 @@ func (s *Server) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelR ...@@ -86,6 +90,12 @@ func (s *Server) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelR
} }
func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) (*commonpb.Status, error) { func (s *Server) FlushSegments(ctx context.Context, in *datapb.FlushSegRequest) (*commonpb.Status, error) {
if s.core.State != internalpb2.StateCode_HEALTHY {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "DataNode isn't healthy.",
}, errors.Errorf("DataNode is not ready yet")
}
return &commonpb.Status{ return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS, ErrorCode: commonpb.ErrorCode_SUCCESS,
}, s.core.FlushSegments(in) }, s.core.FlushSegments(in)
......
...@@ -2,7 +2,6 @@ package grpcquerynodeclient ...@@ -2,7 +2,6 @@ package grpcquerynodeclient
import ( import (
"context" "context"
"log"
"time" "time"
"google.golang.org/grpc" "google.golang.org/grpc"
...@@ -12,9 +11,46 @@ import ( ...@@ -12,9 +11,46 @@ import (
"github.com/zilliztech/milvus-distributed/internal/proto/querypb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
) )
const (
RPCConnectionTimeout = 30 * time.Second
Retry = 3
)
type Client struct { type Client struct {
ctx context.Context ctx context.Context
grpcClient querypb.QueryNodeClient grpcClient querypb.QueryNodeClient
conn *grpc.ClientConn
addr string
}
func NewClient(address string) *Client {
return &Client{
addr: address,
}
}
func (c *Client) Init() error {
ctx, cancel := context.WithTimeout(context.Background(), RPCConnectionTimeout)
defer cancel()
var err error
for i := 0; i < Retry; i++ {
if c.conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock()); err == nil {
break
}
}
if err != nil {
return err
}
c.grpcClient = querypb.NewQueryNodeClient(c.conn)
return nil
}
func (c *Client) Start() error {
return nil
}
func (c *Client) Stop() error {
return c.conn.Close()
} }
func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) { func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
...@@ -60,17 +96,3 @@ func (c *Client) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status, ...@@ -60,17 +96,3 @@ func (c *Client) LoadSegments(in *querypb.LoadSegmentRequest) (*commonpb.Status,
func (c *Client) ReleaseSegments(in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error) { func (c *Client) ReleaseSegments(in *querypb.ReleaseSegmentRequest) (*commonpb.Status, error) {
return c.grpcClient.ReleaseSegments(context.TODO(), in) return c.grpcClient.ReleaseSegments(context.TODO(), in)
} }
func NewClient(address string) *Client {
ctx1, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx1, address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Printf("connect to queryNode failed, error= %v", err)
}
log.Printf("connected to queryNode, queryNode=%s", address)
return &Client{
grpcClient: querypb.NewQueryNodeClient(conn),
}
}
...@@ -5,78 +5,58 @@ import ( ...@@ -5,78 +5,58 @@ import (
"fmt" "fmt"
"log" "log"
"net" "net"
"time" "sync"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/zilliztech/milvus-distributed/internal/dataservice"
grpcdataserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
grpcindexserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
grpcmasterserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
grpcqueryserviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/queryservice/client"
"github.com/zilliztech/milvus-distributed/internal/indexservice"
"github.com/zilliztech/milvus-distributed/internal/masterservice"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/proto/querypb" "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
"github.com/zilliztech/milvus-distributed/internal/querynode" qn "github.com/zilliztech/milvus-distributed/internal/querynode"
"github.com/zilliztech/milvus-distributed/internal/queryservice"
) )
type Server struct { type Server struct {
node *qn.QueryNode
grpcServer *grpc.Server grpcServer *grpc.Server
node *querynode.QueryNode grpcError error
} grpcErrMux sync.Mutex
func NewServer(ctx context.Context) *Server { ctx context.Context
server := &Server{ cancel context.CancelFunc
node: querynode.NewQueryNodeWithoutID(ctx), }
}
queryservice.Params.Init() func NewServer(ctx context.Context) (*Server, error) {
queryClient := grpcqueryserviceclient.NewClient(queryservice.Params.Address) s := &Server{
if err := server.node.SetQueryService(queryClient); err != nil { ctx: ctx,
panic(err) node: qn.NewQueryNodeWithoutID(ctx),
} }
masterservice.Params.Init() qn.Params.Init()
masterConnectTimeout := 10 * time.Second s.grpcServer = grpc.NewServer()
masterClient, err := grpcmasterserviceclient.NewGrpcClient(masterservice.Params.Address, masterConnectTimeout) querypb.RegisterQueryNodeServer(s.grpcServer, s)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", qn.Params.QueryNodePort))
if err != nil { if err != nil {
panic(err) return nil, err
}
if err = server.node.SetMasterService(masterClient); err != nil {
panic(err)
}
indexservice.Params.Init()
indexClient := grpcindexserviceclient.NewClient(indexservice.Params.Address)
if err := server.node.SetIndexService(indexClient); err != nil {
panic(err)
} }
dataservice.Params.Init() go func() {
log.Println("connect to data service, address =", fmt.Sprint(dataservice.Params.Address, ":", dataservice.Params.Port)) log.Println("start query node grpc server...")
dataClient := grpcdataserviceclient.NewClient(fmt.Sprint(dataservice.Params.Address, ":", dataservice.Params.Port)) if err = s.grpcServer.Serve(lis); err != nil {
if err := server.node.SetDataService(dataClient); err != nil { s.grpcErrMux.Lock()
panic(err) defer s.grpcErrMux.Unlock()
} s.grpcError = err
}
}()
return server s.grpcErrMux.Lock()
} err = s.grpcError
s.grpcErrMux.Unlock()
func (s *Server) StartGrpcServer() {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", querynode.Params.QueryNodePort))
if err != nil { if err != nil {
panic(err) return nil, err
}
s.grpcServer = grpc.NewServer()
querypb.RegisterQueryNodeServer(s.grpcServer, s)
fmt.Println("start query node grpc server...")
if err = s.grpcServer.Serve(lis); err != nil {
panic(err)
} }
return s, nil
} }
func (s *Server) Init() error { func (s *Server) Init() error {
...@@ -84,13 +64,30 @@ func (s *Server) Init() error { ...@@ -84,13 +64,30 @@ func (s *Server) Init() error {
} }
func (s *Server) Start() error { func (s *Server) Start() error {
go s.StartGrpcServer()
return s.node.Start() return s.node.Start()
} }
func (s *Server) Stop() error { func (s *Server) Stop() error {
s.grpcServer.Stop() err := s.node.Stop()
return s.node.Stop() s.cancel()
s.grpcServer.GracefulStop()
return err
}
func (s *Server) SetMasterService(master qn.MasterServiceInterface) error {
return s.node.SetMasterService(master)
}
func (s *Server) SetQueryService(query qn.QueryServiceInterface) error {
return s.node.SetQueryService(query)
}
func (s *Server) SetIndexService(index qn.IndexServiceInterface) error {
return s.node.SetIndexService(index)
}
func (s *Server) SetDataService(data qn.DataServiceInterface) error {
return s.node.SetDataService(data)
} }
func (s *Server) GetTimeTickChannel(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) { func (s *Server) GetTimeTickChannel(ctx context.Context, in *commonpb.Empty) (*milvuspb.StringResponse, error) {
......
...@@ -2,6 +2,7 @@ package grpcqueryserviceclient ...@@ -2,6 +2,7 @@ package grpcqueryserviceclient
import ( import (
"context" "context"
"errors"
"log" "log"
"time" "time"
...@@ -14,30 +15,76 @@ import ( ...@@ -14,30 +15,76 @@ import (
type Client struct { type Client struct {
grpcClient querypb.QueryServiceClient grpcClient querypb.QueryServiceClient
conn *grpc.ClientConn
addr string
timeout time.Duration
retry int
}
func NewClient(address string, timeout time.Duration) (*Client, error) {
return &Client{
grpcClient: nil,
conn: nil,
addr: address,
timeout: timeout,
retry: 3,
}, nil
} }
func (c *Client) Init() error { func (c *Client) Init() error {
panic("implement me") ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()
var err error
for i := 0; i < c.retry; i++ {
if c.conn, err = grpc.DialContext(ctx, c.addr, grpc.WithInsecure(), grpc.WithBlock()); err != nil {
break
}
}
if err != nil {
return err
}
c.grpcClient = querypb.NewQueryServiceClient(c.conn)
log.Printf("connected to queryService, queryService=%s", c.addr)
return nil
} }
func (c *Client) Start() error { func (c *Client) Start() error {
panic("implement me") return nil
} }
func (c *Client) Stop() error { func (c *Client) Stop() error {
panic("implement me") return c.conn.Close()
} }
func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) { func (c *Client) GetComponentStates() (*internalpb2.ComponentStates, error) {
panic("implement me") return c.grpcClient.GetComponentStates(context.Background(), &commonpb.Empty{})
} }
func (c *Client) GetTimeTickChannel() (string, error) { func (c *Client) GetTimeTickChannel() (string, error) {
panic("implement me") resp, err := c.grpcClient.GetTimeTickChannel(context.Background(), &commonpb.Empty{})
if err != nil {
return "", err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return "", errors.New(resp.Status.Reason)
}
return resp.Value, nil
} }
func (c *Client) GetStatisticsChannel() (string, error) { func (c *Client) GetStatisticsChannel() (string, error) {
panic("implement me") resp, err := c.grpcClient.GetStatisticsChannel(context.Background(), &commonpb.Empty{})
if err != nil {
return "", err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
return "", errors.New(resp.Status.Reason)
}
return resp.Value, nil
} }
func (c *Client) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { func (c *Client) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
...@@ -75,17 +122,3 @@ func (c *Client) CreateQueryChannel() (*querypb.CreateQueryChannelResponse, erro ...@@ -75,17 +122,3 @@ func (c *Client) CreateQueryChannel() (*querypb.CreateQueryChannelResponse, erro
func (c *Client) GetPartitionStates(req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) { func (c *Client) GetPartitionStates(req *querypb.PartitionStatesRequest) (*querypb.PartitionStatesResponse, error) {
return c.grpcClient.GetPartitionStates(context.TODO(), req) return c.grpcClient.GetPartitionStates(context.TODO(), req)
} }
func NewClient(address string) *Client {
ctx1, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
conn, err := grpc.DialContext(ctx1, address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
log.Printf("connect to queryService failed, error= %v", err)
}
log.Printf("connected to queryService, queryService=%s", address)
return &Client{
grpcClient: querypb.NewQueryServiceClient(conn),
}
}
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
"github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
"github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
...@@ -119,6 +120,16 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, req *commonpb.Empty) ...@@ -119,6 +120,16 @@ func (s *Server) GetStatisticsChannel(ctx context.Context, req *commonpb.Empty)
}, nil }, nil
} }
func (s *Server) SetMasterService(m queryservice.MasterServiceInterface) error {
s.queryService.SetMasterService(m)
return nil
}
func (s *Server) SetDataService(d queryservice.DataServiceInterface) error {
s.queryService.SetDataService(d)
return nil
}
func (s *Server) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { func (s *Server) RegisterNode(ctx context.Context, req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) {
return s.queryService.RegisterNode(req) return s.queryService.RegisterNode(req)
} }
...@@ -157,7 +168,7 @@ func (s *Server) CreateQueryChannel(ctx context.Context, req *commonpb.Empty) (* ...@@ -157,7 +168,7 @@ func (s *Server) CreateQueryChannel(ctx context.Context, req *commonpb.Empty) (*
func NewServer(ctx context.Context) *Server { func NewServer(ctx context.Context) *Server {
ctx1, cancel := context.WithCancel(ctx) ctx1, cancel := context.WithCancel(ctx)
service, err := queryservice.NewQueryService(ctx) service, err := queryservice.NewQueryService(ctx1)
if err != nil { if err != nil {
log.Fatal(errors.New("create QueryService failed")) log.Fatal(errors.New("create QueryService failed"))
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册