client.go 4.8 KB
Newer Older
X
XuanYang-cn 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package grpcdatanodeclient
X
XuanYang-cn 已提交
13 14 15

import (
	"context"
G
godchen 已提交
16
	"errors"
G
godchen 已提交
17
	"fmt"
X
XuanYang-cn 已提交
18

X
Xiangyu Wang 已提交
19 20
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/util/retry"
N
neza2017 已提交
21

G
godchen 已提交
22 23
	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
	grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
G
godchen 已提交
24
	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
X
Xiangyu Wang 已提交
25 26 27 28
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
G
godchen 已提交
29
	"github.com/milvus-io/milvus/internal/util/trace"
30
	"google.golang.org/grpc/codes"
X
XuanYang-cn 已提交
31

X
XuanYang-cn 已提交
32
	"go.uber.org/zap"
X
XuanYang-cn 已提交
33
	"google.golang.org/grpc"
X
XuanYang-cn 已提交
34 35 36
)

type Client struct {
G
godchen 已提交
37 38
	ctx    context.Context
	cancel context.CancelFunc
G
godchen 已提交
39 40 41 42

	grpc datapb.DataNodeClient
	conn *grpc.ClientConn

G
godchen 已提交
43
	addr string
G
godchen 已提交
44

G
godchen 已提交
45
	retryOptions []retry.Option
X
XuanYang-cn 已提交
46
}
X
XuanYang-cn 已提交
47

G
godchen 已提交
48
func NewClient(ctx context.Context, addr string, retryOptions ...retry.Option) (*Client, error) {
G
godchen 已提交
49
	if addr == "" {
G
godchen 已提交
50
		return nil, fmt.Errorf("address is empty")
G
godchen 已提交
51 52
	}

G
godchen 已提交
53
	ctx, cancel := context.WithCancel(ctx)
G
godchen 已提交
54
	return &Client{
G
godchen 已提交
55 56 57 58
		ctx:          ctx,
		cancel:       cancel,
		addr:         addr,
		retryOptions: retryOptions,
G
godchen 已提交
59
	}, nil
X
XuanYang-cn 已提交
60 61
}

N
neza2017 已提交
62
func (c *Client) Init() error {
63
	Params.Init()
G
godchen 已提交
64
	return c.connect(retry.Attempts(20))
G
godchen 已提交
65 66
}

G
godchen 已提交
67
func (c *Client) connect(retryOptions ...retry.Option) error {
68
	connectGrpcFunc := func() error {
G
godchen 已提交
69 70
		opts := trace.GetInterceptorOpts()
		log.Debug("DataNode connect ", zap.String("address", c.addr))
71
		conn, err := grpc.DialContext(c.ctx, c.addr,
G
godchen 已提交
72
			grpc.WithInsecure(), grpc.WithBlock(),
73 74 75
			grpc.WithDefaultCallOptions(
				grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),
				grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)),
C
congqixia 已提交
76
			grpc.WithDisableRetry(),
G
godchen 已提交
77
			grpc.WithUnaryInterceptor(
G
godchen 已提交
78
				grpc_middleware.ChainUnaryClient(
79 80 81 82
					grpc_retry.UnaryClientInterceptor(
						grpc_retry.WithMax(3),
						grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
					),
G
godchen 已提交
83 84
					grpc_opentracing.UnaryClientInterceptor(opts...),
				)),
G
godchen 已提交
85
			grpc.WithStreamInterceptor(
G
godchen 已提交
86
				grpc_middleware.ChainStreamClient(
87 88 89 90
					grpc_retry.StreamClientInterceptor(
						grpc_retry.WithMax(3),
						grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
					),
G
godchen 已提交
91 92 93
					grpc_opentracing.StreamClientInterceptor(opts...),
				)),
		)
94 95
		if err != nil {
			return err
X
XuanYang-cn 已提交
96
		}
97 98
		c.conn = conn
		return nil
X
XuanYang-cn 已提交
99
	}
100

G
godchen 已提交
101
	err := retry.Do(c.ctx, connectGrpcFunc, retryOptions...)
X
XuanYang-cn 已提交
102
	if err != nil {
103
		log.Debug("DataNodeClient try connect failed", zap.Error(err))
X
XuanYang-cn 已提交
104 105
		return err
	}
106
	log.Debug("DataNodeClient connect success")
X
XuanYang-cn 已提交
107 108
	c.grpc = datapb.NewDataNodeClient(c.conn)
	return nil
X
XuanYang-cn 已提交
109 110
}

G
godchen 已提交
111 112 113 114 115
func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) {
	ret, err := caller()
	if err == nil {
		return ret, nil
	}
N
neza2017 已提交
116
	log.Debug("DataNode Client grpc error", zap.Error(err))
G
godchen 已提交
117
	err = c.connect()
G
godchen 已提交
118
	if err != nil {
G
godchen 已提交
119
		return ret, errors.New("Connect to datanode failed with error:\n" + err.Error())
G
godchen 已提交
120 121 122 123 124
	}
	ret, err = caller()
	if err == nil {
		return ret, nil
	}
G
godchen 已提交
125 126 127
	return ret, err
}

N
neza2017 已提交
128
func (c *Client) Start() error {
X
XuanYang-cn 已提交
129
	return nil
X
XuanYang-cn 已提交
130 131
}

N
neza2017 已提交
132
func (c *Client) Stop() error {
G
godchen 已提交
133
	c.cancel()
X
XuanYang-cn 已提交
134
	return c.conn.Close()
X
XuanYang-cn 已提交
135 136
}

137 138 139 140 141
// Register dummy
func (c *Client) Register() error {
	return nil
}

G
godchen 已提交
142
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
G
godchen 已提交
143 144 145 146
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpc.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
	})
	return ret.(*internalpb.ComponentStates), err
N
neza2017 已提交
147 148 149
}

func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
150 151 152 153
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpc.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
	})
	return ret.(*milvuspb.StringResponse), err
X
XuanYang-cn 已提交
154 155
}

G
godchen 已提交
156
func (c *Client) WatchDmChannels(ctx context.Context, req *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) {
G
godchen 已提交
157 158 159 160
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpc.WatchDmChannels(ctx, req)
	})
	return ret.(*commonpb.Status), err
X
XuanYang-cn 已提交
161 162
}

G
godchen 已提交
163
func (c *Client) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
G
godchen 已提交
164 165 166 167
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpc.FlushSegments(ctx, req)
	})
	return ret.(*commonpb.Status), err
X
XuanYang-cn 已提交
168
}