client.go 4.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

C
Cai Yudong 已提交
12
package grpcproxyclient
13 14 15

import (
	"context"
G
godchen 已提交
16
	"fmt"
Z
zhenshan.cao 已提交
17
	"time"
18

G
godchen 已提交
19 20
	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
	grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
G
godchen 已提交
21
	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
X
Xiangyu Wang 已提交
22 23 24 25 26 27
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/proxypb"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
28
	"github.com/milvus-io/milvus/internal/util/trace"
29 30
	"go.uber.org/zap"
	"google.golang.org/grpc"
31 32 33
)

type Client struct {
G
godchen 已提交
34 35 36
	ctx    context.Context
	cancel context.CancelFunc

37
	grpcClient proxypb.ProxyClient
G
godchen 已提交
38 39
	conn       *grpc.ClientConn

G
godchen 已提交
40 41 42
	addr string

	retryOptions []retry.Option
43 44
}

G
godchen 已提交
45 46 47
func NewClient(ctx context.Context, addr string, retryOptions ...retry.Option) (*Client, error) {
	if addr == "" {
		return nil, fmt.Errorf("address is empty")
G
godchen 已提交
48
	}
G
godchen 已提交
49 50 51 52 53 54 55 56
	ctx, cancel := context.WithCancel(ctx)

	return &Client{
		ctx:          ctx,
		cancel:       cancel,
		addr:         addr,
		retryOptions: retryOptions,
	}, nil
T
ThreadDao 已提交
57 58
}

Z
zhenshan.cao 已提交
59
func (c *Client) Init() error {
G
godchen 已提交
60
	return c.connect()
Z
zhenshan.cao 已提交
61 62
}

63
func (c *Client) connect() error {
G
godchen 已提交
64
	connectGrpcFunc := func() error {
G
godchen 已提交
65
		opts := trace.GetInterceptorOpts()
G
godchen 已提交
66 67 68
		log.Debug("ProxyNodeClient try connect ", zap.String("address", c.addr))
		conn, err := grpc.DialContext(c.ctx, c.addr,
			grpc.WithInsecure(), grpc.WithBlock(), grpc.WithTimeout(5*time.Second),
G
godchen 已提交
69
			grpc.WithUnaryInterceptor(
G
godchen 已提交
70 71 72 73
				grpc_middleware.ChainUnaryClient(
					grpc_retry.UnaryClientInterceptor(),
					grpc_opentracing.UnaryClientInterceptor(opts...),
				)),
G
godchen 已提交
74
			grpc.WithStreamInterceptor(
G
godchen 已提交
75 76 77 78 79
				grpc_middleware.ChainStreamClient(
					grpc_retry.StreamClientInterceptor(),
					grpc_opentracing.StreamClientInterceptor(opts...),
				)),
		)
G
godchen 已提交
80 81 82 83 84 85 86
		if err != nil {
			return err
		}
		c.conn = conn
		return nil
	}

G
godchen 已提交
87
	err := retry.Do(c.ctx, connectGrpcFunc, c.retryOptions...)
G
godchen 已提交
88
	if err != nil {
89
		log.Debug("ProxyClient try connect failed", zap.Error(err))
G
godchen 已提交
90 91
		return err
	}
92
	log.Debug("ProxyClient connect success")
93
	c.grpcClient = proxypb.NewProxyClient(c.conn)
G
godchen 已提交
94 95 96 97 98 99 100 101
	return nil
}

func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) {
	ret, err := caller()
	if err == nil {
		return ret, nil
	}
G
godchen 已提交
102
	err = c.connect()
G
godchen 已提交
103
	if err != nil {
G
godchen 已提交
104
		return ret, err
G
godchen 已提交
105 106 107 108 109
	}
	ret, err = caller()
	if err == nil {
		return ret, nil
	}
G
godchen 已提交
110 111 112
	return ret, err
}

Z
zhenshan.cao 已提交
113 114 115 116 117
func (c *Client) Start() error {
	return nil
}

func (c *Client) Stop() error {
118 119 120
	return nil
}

121 122 123 124 125
// Register dummy
func (c *Client) Register() error {
	return nil
}

G
godchen 已提交
126
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
G
godchen 已提交
127 128 129 130
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpcClient.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
	})
	return ret.(*internalpb.ComponentStates), err
131 132
}

G
godchen 已提交
133
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
134 135 136 137
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpcClient.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
	})
	return ret.(*milvuspb.StringResponse), err
G
godchen 已提交
138 139 140
}

func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, req *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
G
godchen 已提交
141 142 143 144
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpcClient.InvalidateCollectionMetaCache(ctx, req)
	})
	return ret.(*commonpb.Status), err
145
}
146 147 148 149 150 151

func (c *Client) ReleaseDQLMessageStream(ctx context.Context, req *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) {
	ret, err := c.recall(func() (interface{}, error) {
		return c.grpcClient.ReleaseDQLMessageStream(ctx, req)
	})
	return ret.(*commonpb.Status), err
N
neza2017 已提交
152
}