client.go 4.3 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package grpcdatanodeclient
X
XuanYang-cn 已提交
13 14 15

import (
	"context"
G
godchen 已提交
16
	"fmt"
X
XuanYang-cn 已提交
17
	"time"
X
XuanYang-cn 已提交
18

X
Xiangyu Wang 已提交
19 20
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/util/retry"
N
neza2017 已提交
21

G
godchen 已提交
22 23
	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
	grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
G
godchen 已提交
24
	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
X
Xiangyu Wang 已提交
25 26 27 28
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
G
godchen 已提交
29
	"github.com/milvus-io/milvus/internal/util/trace"
X
XuanYang-cn 已提交
30

X
XuanYang-cn 已提交
31
	"go.uber.org/zap"
X
XuanYang-cn 已提交
32
	"google.golang.org/grpc"
X
XuanYang-cn 已提交
33 34 35
)

type Client struct {
G
godchen 已提交
36 37
	ctx    context.Context
	cancel context.CancelFunc
G
godchen 已提交
38 39 40 41

	grpc datapb.DataNodeClient
	conn *grpc.ClientConn

G
godchen 已提交
42
	addr string
G
godchen 已提交
43

G
godchen 已提交
44
	retryOptions []retry.Option
X
XuanYang-cn 已提交
45
}
X
XuanYang-cn 已提交
46

G
godchen 已提交
47
func NewClient(ctx context.Context, addr string, retryOptions ...retry.Option) (*Client, error) {
G
godchen 已提交
48
	if addr == "" {
G
godchen 已提交
49
		return nil, fmt.Errorf("address is empty")
G
godchen 已提交
50 51
	}

G
godchen 已提交
52
	ctx, cancel := context.WithCancel(ctx)
G
godchen 已提交
53
	return &Client{
G
godchen 已提交
54 55 56 57
		ctx:          ctx,
		cancel:       cancel,
		addr:         addr,
		retryOptions: retryOptions,
G
godchen 已提交
58
	}, nil
X
XuanYang-cn 已提交
59 60
}

N
neza2017 已提交
61
func (c *Client) Init() error {
G
godchen 已提交
62
	return c.connect()
G
godchen 已提交
63 64
}

65
func (c *Client) connect() error {
66
	connectGrpcFunc := func() error {
G
godchen 已提交
67 68
		opts := trace.GetInterceptorOpts()
		log.Debug("DataNode connect ", zap.String("address", c.addr))
G
godchen 已提交
69 70
		conn, err := grpc.DialContext(c.ctx, c.addr,
			grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second),
G
godchen 已提交
71
			grpc.WithUnaryInterceptor(
G
godchen 已提交
72 73 74 75
				grpc_middleware.ChainUnaryClient(
					grpc_retry.UnaryClientInterceptor(),
					grpc_opentracing.UnaryClientInterceptor(opts...),
				)),
G
godchen 已提交
76
			grpc.WithStreamInterceptor(
G
godchen 已提交
77 78 79 80 81
				grpc_middleware.ChainStreamClient(
					grpc_retry.StreamClientInterceptor(),
					grpc_opentracing.StreamClientInterceptor(opts...),
				)),
		)
82 83
		if err != nil {
			return err
X
XuanYang-cn 已提交
84
		}
85 86
		c.conn = conn
		return nil
X
XuanYang-cn 已提交
87
	}
88

G
godchen 已提交
89
	err := retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
X
XuanYang-cn 已提交
90
	if err != nil {
91
		log.Debug("DataNodeClient try connect failed", zap.Error(err))
X
XuanYang-cn 已提交
92 93
		return err
	}
94
	log.Debug("DataNodeClient connect success")
X
XuanYang-cn 已提交
95 96
	c.grpc = datapb.NewDataNodeClient(c.conn)
	return nil
X
XuanYang-cn 已提交
97 98
}

G
godchen 已提交
99 100 101 102 103
func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) {
	ret, err := caller()
	if err == nil {
		return ret, nil
	}
G
godchen 已提交
104
	err = c.connect()
G
godchen 已提交
105
	if err != nil {
G
godchen 已提交
106
		return ret, err
G
godchen 已提交
107 108 109 110 111
	}
	ret, err = caller()
	if err == nil {
		return ret, nil
	}
G
godchen 已提交
112 113 114
	return ret, err
}

N
neza2017 已提交
115
func (c *Client) Start() error {
X
XuanYang-cn 已提交
116
	return nil
X
XuanYang-cn 已提交
117 118
}

N
neza2017 已提交
119
func (c *Client) Stop() error {
G
godchen 已提交
120
	c.cancel()
X
XuanYang-cn 已提交
121
	return c.conn.Close()
X
XuanYang-cn 已提交
122 123
}

124 125 126 127 128
// Register dummy
func (c *Client) Register() error {
	return nil
}

G
godchen 已提交
129
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
G
godchen 已提交
130 131 132 133
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpc.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
	})
	return ret.(*internalpb.ComponentStates), err
N
neza2017 已提交
134 135 136
}

func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
137 138 139 140
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpc.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
	})
	return ret.(*milvuspb.StringResponse), err
X
XuanYang-cn 已提交
141 142
}

G
godchen 已提交
143
func (c *Client) WatchDmChannels(ctx context.Context, req *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) {
G
godchen 已提交
144 145 146 147
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpc.WatchDmChannels(ctx, req)
	})
	return ret.(*commonpb.Status), err
X
XuanYang-cn 已提交
148 149
}

G
godchen 已提交
150
func (c *Client) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
G
godchen 已提交
151 152 153 154
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpc.FlushSegments(ctx, req)
	})
	return ret.(*commonpb.Status), err
X
XuanYang-cn 已提交
155
}