client.go 1.7 KB
Newer Older
1
package grpcdatanodeclient
X
XuanYang-cn 已提交
2 3 4

import (
	"context"
X
XuanYang-cn 已提交
5
	"time"
X
XuanYang-cn 已提交
6

X
XuanYang-cn 已提交
7
	"github.com/zilliztech/milvus-distributed/internal/log"
X
XuanYang-cn 已提交
8
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
X
XuanYang-cn 已提交
9
	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
X
XuanYang-cn 已提交
10
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
11
	"github.com/zilliztech/milvus-distributed/internal/util/retry"
X
XuanYang-cn 已提交
12

X
XuanYang-cn 已提交
13
	"go.uber.org/zap"
X
XuanYang-cn 已提交
14
	"google.golang.org/grpc"
X
XuanYang-cn 已提交
15 16 17
)

type Client struct {
X
XuanYang-cn 已提交
18 19 20 21 22
	ctx     context.Context
	grpc    datapb.DataNodeClient
	conn    *grpc.ClientConn
	address string
}
X
XuanYang-cn 已提交
23

X
XuanYang-cn 已提交
24 25 26
func NewClient(address string) *Client {
	return &Client{
		address: address,
27
		ctx:     context.Background(),
X
XuanYang-cn 已提交
28
	}
X
XuanYang-cn 已提交
29 30
}

N
neza2017 已提交
31
func (c *Client) Init() error {
32 33

	connectGrpcFunc := func() error {
X
XuanYang-cn 已提交
34
		log.Debug("DataNode connect czs::", zap.String("address", c.address))
35 36 37
		conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock())
		if err != nil {
			return err
X
XuanYang-cn 已提交
38
		}
39 40
		c.conn = conn
		return nil
X
XuanYang-cn 已提交
41
	}
42 43

	err := retry.Retry(100, time.Millisecond*200, connectGrpcFunc)
X
XuanYang-cn 已提交
44 45 46 47 48
	if err != nil {
		return err
	}
	c.grpc = datapb.NewDataNodeClient(c.conn)
	return nil
X
XuanYang-cn 已提交
49 50
}

N
neza2017 已提交
51
func (c *Client) Start() error {
X
XuanYang-cn 已提交
52
	return nil
X
XuanYang-cn 已提交
53 54
}

N
neza2017 已提交
55
func (c *Client) Stop() error {
X
XuanYang-cn 已提交
56
	return c.conn.Close()
X
XuanYang-cn 已提交
57 58
}

X
XuanYang-cn 已提交
59
func (c *Client) GetComponentStates(empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
X
XuanYang-cn 已提交
60
	return c.grpc.GetComponentStates(context.Background(), empty)
X
XuanYang-cn 已提交
61 62
}

X
XuanYang-cn 已提交
63 64
func (c *Client) WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error) {
	return c.grpc.WatchDmChannels(context.Background(), in)
X
XuanYang-cn 已提交
65 66
}

X
XuanYang-cn 已提交
67 68
func (c *Client) FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) {
	return c.grpc.FlushSegments(context.Background(), in)
X
XuanYang-cn 已提交
69
}