client.go 2.8 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package grpcdatanodeclient
X
XuanYang-cn 已提交
13 14 15

import (
	"context"
X
XuanYang-cn 已提交
16
	"time"
X
XuanYang-cn 已提交
17

G
godchen 已提交
18 19
	otgrpc "github.com/opentracing-contrib/go-grpc"
	"github.com/opentracing/opentracing-go"
X
XuanYang-cn 已提交
20
	"github.com/zilliztech/milvus-distributed/internal/log"
N
neza2017 已提交
21 22
	"github.com/zilliztech/milvus-distributed/internal/util/retry"

X
XuanYang-cn 已提交
23
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
X
XuanYang-cn 已提交
24
	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
G
godchen 已提交
25
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
N
neza2017 已提交
26
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
X
XuanYang-cn 已提交
27

X
XuanYang-cn 已提交
28
	"go.uber.org/zap"
X
XuanYang-cn 已提交
29
	"google.golang.org/grpc"
X
XuanYang-cn 已提交
30 31 32
)

type Client struct {
X
XuanYang-cn 已提交
33 34 35 36 37
	ctx     context.Context
	grpc    datapb.DataNodeClient
	conn    *grpc.ClientConn
	address string
}
X
XuanYang-cn 已提交
38

X
XuanYang-cn 已提交
39 40 41
func NewClient(address string) *Client {
	return &Client{
		address: address,
42
		ctx:     context.Background(),
X
XuanYang-cn 已提交
43
	}
X
XuanYang-cn 已提交
44 45
}

N
neza2017 已提交
46
func (c *Client) Init() error {
G
godchen 已提交
47
	tracer := opentracing.GlobalTracer()
48
	connectGrpcFunc := func() error {
G
godchen 已提交
49
		log.Debug("DataNode connect ", zap.String("address", c.address))
G
godchen 已提交
50 51 52 53 54
		conn, err := grpc.DialContext(c.ctx, c.address, grpc.WithInsecure(), grpc.WithBlock(),
			grpc.WithUnaryInterceptor(
				otgrpc.OpenTracingClientInterceptor(tracer)),
			grpc.WithStreamInterceptor(
				otgrpc.OpenTracingStreamClientInterceptor(tracer)))
55 56
		if err != nil {
			return err
X
XuanYang-cn 已提交
57
		}
58 59
		c.conn = conn
		return nil
X
XuanYang-cn 已提交
60
	}
61

Z
zhenshan.cao 已提交
62
	err := retry.Retry(100000, time.Millisecond*200, connectGrpcFunc)
X
XuanYang-cn 已提交
63 64 65 66 67
	if err != nil {
		return err
	}
	c.grpc = datapb.NewDataNodeClient(c.conn)
	return nil
X
XuanYang-cn 已提交
68 69
}

N
neza2017 已提交
70
func (c *Client) Start() error {
X
XuanYang-cn 已提交
71
	return nil
X
XuanYang-cn 已提交
72 73
}

N
neza2017 已提交
74
func (c *Client) Stop() error {
X
XuanYang-cn 已提交
75
	return c.conn.Close()
X
XuanYang-cn 已提交
76 77
}

G
godchen 已提交
78 79
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
	return c.grpc.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
N
neza2017 已提交
80 81 82
}

func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
83
	return c.grpc.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
X
XuanYang-cn 已提交
84 85
}

G
godchen 已提交
86 87
func (c *Client) WatchDmChannels(ctx context.Context, req *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) {
	return c.grpc.WatchDmChannels(ctx, req)
X
XuanYang-cn 已提交
88 89
}

G
godchen 已提交
90 91
func (c *Client) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
	return c.grpc.FlushSegments(ctx, req)
X
XuanYang-cn 已提交
92
}