client.go 4.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

C
Cai Yudong 已提交
12
package grpcproxyclient
13 14 15

import (
	"context"
G
godchen 已提交
16
	"errors"
G
godchen 已提交
17
	"fmt"
Z
zhenshan.cao 已提交
18
	"time"
19

G
godchen 已提交
20 21
	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
	grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
G
godchen 已提交
22
	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
X
Xiangyu Wang 已提交
23 24 25 26 27 28
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
29
	"github.com/milvus-io/milvus/internal/util/trace"
30 31
	"go.uber.org/zap"
	"google.golang.org/grpc"
32
	"google.golang.org/grpc/codes"
33 34 35
)

type Client struct {
G
godchen 已提交
36 37 38
	ctx    context.Context
	cancel context.CancelFunc

39
	grpcClient proxypb.ProxyClient
G
godchen 已提交
40 41
	conn       *grpc.ClientConn

G
godchen 已提交
42
	addr string
43 44
}

G
godchen 已提交
45
func NewClient(ctx context.Context, addr string) (*Client, error) {
G
godchen 已提交
46 47
	if addr == "" {
		return nil, fmt.Errorf("address is empty")
G
godchen 已提交
48
	}
G
godchen 已提交
49 50 51
	ctx, cancel := context.WithCancel(ctx)

	return &Client{
G
godchen 已提交
52 53 54
		ctx:    ctx,
		cancel: cancel,
		addr:   addr,
G
godchen 已提交
55
	}, nil
T
ThreadDao 已提交
56 57
}

Z
zhenshan.cao 已提交
58
func (c *Client) Init() error {
59
	Params.Init()
G
godchen 已提交
60
	return c.connect(retry.Attempts(20))
Z
zhenshan.cao 已提交
61 62
}

G
godchen 已提交
63
func (c *Client) connect(retryOptions ...retry.Option) error {
G
godchen 已提交
64
	connectGrpcFunc := func() error {
G
godchen 已提交
65
		opts := trace.GetInterceptorOpts()
C
Cai Yudong 已提交
66
		log.Debug("ProxyClient try connect ", zap.String("address", c.addr))
G
godchen 已提交
67
		conn, err := grpc.DialContext(c.ctx, c.addr,
G
godchen 已提交
68
			grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(2*time.Second),
69 70 71
			grpc.WithDefaultCallOptions(
				grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),
				grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)),
G
godchen 已提交
72
			grpc.WithUnaryInterceptor(
G
godchen 已提交
73
				grpc_middleware.ChainUnaryClient(
74 75 76 77 78
					grpc_retry.UnaryClientInterceptor(
						grpc_retry.WithMax(3),
						grpc_retry.WithPerRetryTimeout(time.Second*3),
						grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
					),
G
godchen 已提交
79 80
					grpc_opentracing.UnaryClientInterceptor(opts...),
				)),
G
godchen 已提交
81
			grpc.WithStreamInterceptor(
G
godchen 已提交
82
				grpc_middleware.ChainStreamClient(
83 84 85 86 87
					grpc_retry.StreamClientInterceptor(
						grpc_retry.WithMax(3),
						grpc_retry.WithPerRetryTimeout(time.Second*3),
						grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
					),
G
godchen 已提交
88 89 90
					grpc_opentracing.StreamClientInterceptor(opts...),
				)),
		)
G
godchen 已提交
91 92 93 94 95 96 97
		if err != nil {
			return err
		}
		c.conn = conn
		return nil
	}

G
godchen 已提交
98
	err := retry.Do(c.ctx, connectGrpcFunc, retryOptions...)
G
godchen 已提交
99
	if err != nil {
100
		log.Debug("ProxyClient try connect failed", zap.Error(err))
G
godchen 已提交
101 102
		return err
	}
103
	log.Debug("ProxyClient connect success")
104
	c.grpcClient = proxypb.NewProxyClient(c.conn)
G
godchen 已提交
105 106 107 108 109 110 111 112
	return nil
}

func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) {
	ret, err := caller()
	if err == nil {
		return ret, nil
	}
N
neza2017 已提交
113
	log.Debug("Proxy Client grpc error", zap.Error(err))
G
godchen 已提交
114
	err = c.connect()
G
godchen 已提交
115
	if err != nil {
G
godchen 已提交
116
		return ret, errors.New("Connect to proxy failed with error:\n" + err.Error())
G
godchen 已提交
117 118 119 120 121
	}
	ret, err = caller()
	if err == nil {
		return ret, nil
	}
G
godchen 已提交
122 123 124
	return ret, err
}

Z
zhenshan.cao 已提交
125 126 127 128 129
func (c *Client) Start() error {
	return nil
}

func (c *Client) Stop() error {
130 131 132
	return nil
}

133 134 135 136 137
// Register dummy
func (c *Client) Register() error {
	return nil
}

G
godchen 已提交
138
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
G
godchen 已提交
139 140 141 142
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
	})
	return ret.(*internalpb.ComponentStates), err
143 144
}

G
godchen 已提交
145
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
146 147 148 149
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
	})
	return ret.(*milvuspb.StringResponse), err
G
godchen 已提交
150 151 152
}

func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, req *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
G
godchen 已提交
153 154 155 156
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpcClient.InvalidateCollectionMetaCache(ctx, req)
	})
	return ret.(*commonpb.Status), err
157
}
158 159 160 161 162 163

func (c *Client) ReleaseDQLMessageStream(ctx context.Context, req *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpcClient.ReleaseDQLMessageStream(ctx, req)
	})
	return ret.(*commonpb.Status), err
N
neza2017 已提交
164
}