client.go 11.9 KB
Newer Older
C
cai.zhang 已提交
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package grpcdatacoordclient
S
sunby 已提交
13 14 15

import (
	"context"
G
godchen 已提交
16
	"fmt"
D
dragondriver 已提交
17
	"sync"
G
godchen 已提交
18
	"time"
S
sunby 已提交
19

G
godchen 已提交
20 21
	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
	grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
G
godchen 已提交
22
	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
X
Xiangyu Wang 已提交
23 24
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
25
	"github.com/milvus-io/milvus/internal/types"
X
Xiangyu Wang 已提交
26
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
27
	"github.com/milvus-io/milvus/internal/util/sessionutil"
G
godchen 已提交
28
	"github.com/milvus-io/milvus/internal/util/trace"
G
godchen 已提交
29
	"github.com/milvus-io/milvus/internal/util/typeutil"
G
godchen 已提交
30
	"go.uber.org/zap"
S
sunby 已提交
31
	"google.golang.org/grpc"
32
	"google.golang.org/grpc/codes"
S
sunby 已提交
33

X
Xiangyu Wang 已提交
34 35 36
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
S
sunby 已提交
37 38
)

39 40 41 42 43 44 45 46 47 48
// Base is a base class abstracted from components.
type Base interface {
	types.DataCoord

	Init() error
	Start() error
	Stop() error
	Register() error
}

S
sunby 已提交
49
type Client struct {
G
godchen 已提交
50 51 52
	ctx    context.Context
	cancel context.CancelFunc

D
dragondriver 已提交
53 54 55
	grpcClient    datapb.DataCoordClient
	conn          *grpc.ClientConn
	grpcClientMtx sync.RWMutex
G
godchen 已提交
56

G
godchen 已提交
57
	sess *sessionutil.Session
G
godchen 已提交
58
	addr string
59 60 61 62 63 64

	getGrpcClient func() (datapb.DataCoordClient, error)
}

func (c *Client) setGetGrpcClientFunc() {
	c.getGrpcClient = c.getGrpcClientFunc
G
godchen 已提交
65 66
}

67
func (c *Client) getGrpcClientFunc() (datapb.DataCoordClient, error) {
D
dragondriver 已提交
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
	c.grpcClientMtx.RLock()
	if c.grpcClient != nil {
		defer c.grpcClientMtx.RUnlock()
		return c.grpcClient, nil
	}
	c.grpcClientMtx.RUnlock()

	c.grpcClientMtx.Lock()
	defer c.grpcClientMtx.Unlock()

	if c.grpcClient != nil {
		return c.grpcClient, nil
	}

	// FIXME(dragondriver): how to handle error here?
	// if we return nil here, then we should check if client is nil outside,
	err := c.connect(retry.Attempts(20))
	if err != nil {
		return nil, err
	}

	return c.grpcClient, nil
}

func (c *Client) resetConnection() {
	c.grpcClientMtx.Lock()
	defer c.grpcClientMtx.Unlock()

	if c.conn != nil {
		_ = c.conn.Close()
	}
	c.conn = nil
	c.grpcClient = nil
}

103
func getDataCoordAddress(sess *sessionutil.Session) (string, error) {
104
	key := typeutil.DataCoordRole
G
godchen 已提交
105 106
	msess, _, err := sess.GetSessions(key)
	if err != nil {
107
		log.Debug("DataCoordClient, getSessions failed", zap.Any("key", key), zap.Error(err))
G
godchen 已提交
108 109 110 111
		return "", err
	}
	ms, ok := msess[key]
	if !ok {
112 113
		log.Debug("DataCoordClient, not existed in msess ", zap.Any("key", key), zap.Any("len of msess", len(msess)))
		return "", fmt.Errorf("number of datacoord is incorrect, %d", len(msess))
G
godchen 已提交
114 115
	}
	return ms.Address, nil
S
sunby 已提交
116 117
}

G
godchen 已提交
118
func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*Client, error) {
G
godchen 已提交
119 120 121 122 123
	sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints)
	if sess == nil {
		err := fmt.Errorf("new session error, maybe can not connect to etcd")
		log.Debug("DataCoordClient NewClient failed", zap.Error(err))
		return nil, err
S
sunby 已提交
124
	}
G
godchen 已提交
125
	ctx, cancel := context.WithCancel(ctx)
126
	client := &Client{
G
godchen 已提交
127 128 129
		ctx:    ctx,
		cancel: cancel,
		sess:   sess,
130 131 132 133
	}

	client.setGetGrpcClientFunc()
	return client, nil
S
sunby 已提交
134 135 136
}

func (c *Client) Init() error {
137
	Params.Init()
D
dragondriver 已提交
138
	return nil
G
godchen 已提交
139 140
}

G
godchen 已提交
141
func (c *Client) connect(retryOptions ...retry.Option) error {
G
godchen 已提交
142
	var err error
G
godchen 已提交
143
	connectDataCoordFn := func() error {
144
		c.addr, err = getDataCoordAddress(c.sess)
G
godchen 已提交
145
		if err != nil {
G
godchen 已提交
146
			log.Debug("DataCoordClient getDataCoordAddr failed", zap.Error(err))
G
godchen 已提交
147 148
			return err
		}
G
godchen 已提交
149
		opts := trace.GetInterceptorOpts()
150
		log.Debug("DataCoordClient try reconnect ", zap.String("address", c.addr))
G
godchen 已提交
151 152 153
		ctx, cancel := context.WithTimeout(c.ctx, 15*time.Second)
		defer cancel()
		conn, err := grpc.DialContext(ctx, c.addr,
G
godchen 已提交
154
			grpc.WithInsecure(), grpc.WithBlock(),
155 156 157
			grpc.WithDefaultCallOptions(
				grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),
				grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)),
G
godchen 已提交
158
			grpc.WithUnaryInterceptor(
G
godchen 已提交
159
				grpc_middleware.ChainUnaryClient(
160 161 162 163
					grpc_retry.UnaryClientInterceptor(
						grpc_retry.WithMax(3),
						grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
					),
G
godchen 已提交
164 165
					grpc_opentracing.UnaryClientInterceptor(opts...),
				)),
G
godchen 已提交
166
			grpc.WithStreamInterceptor(
G
godchen 已提交
167
				grpc_middleware.ChainStreamClient(
168 169 170
					grpc_retry.StreamClientInterceptor(grpc_retry.WithMax(3),
						grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
					),
G
godchen 已提交
171 172 173
					grpc_opentracing.StreamClientInterceptor(opts...),
				)),
		)
174 175
		if err != nil {
			return err
S
sunby 已提交
176
		}
D
dragondriver 已提交
177 178 179
		if c.conn != nil {
			_ = c.conn.Close()
		}
180 181
		c.conn = conn
		return nil
S
sunby 已提交
182
	}
183

G
godchen 已提交
184
	err = retry.Do(c.ctx, connectDataCoordFn, retryOptions...)
S
sunby 已提交
185
	if err != nil {
186
		log.Debug("DataCoord try reconnect failed", zap.Error(err))
S
sunby 已提交
187 188
		return err
	}
189
	c.grpcClient = datapb.NewDataCoordClient(c.conn)
S
sunby 已提交
190 191 192
	return nil
}

G
godchen 已提交
193 194 195 196 197
func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) {
	ret, err := caller()
	if err == nil {
		return ret, nil
	}
N
neza2017 已提交
198
	log.Debug("DataCoord Client grpc error", zap.Error(err))
D
dragondriver 已提交
199 200 201

	c.resetConnection()

G
godchen 已提交
202 203 204 205
	ret, err = caller()
	if err == nil {
		return ret, nil
	}
G
godchen 已提交
206 207 208
	return ret, err
}

S
sunby 已提交
209 210 211 212 213
func (c *Client) Start() error {
	return nil
}

func (c *Client) Stop() error {
G
godchen 已提交
214
	c.cancel()
D
dragondriver 已提交
215 216 217 218 219 220
	c.grpcClientMtx.Lock()
	defer c.grpcClientMtx.Unlock()
	if c.conn != nil {
		return c.conn.Close()
	}
	return nil
S
sunby 已提交
221 222
}

223 224 225 226 227
// Register dumy
func (c *Client) Register() error {
	return nil
}

G
godchen 已提交
228
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
G
godchen 已提交
229
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
230 231 232 233 234 235
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
G
godchen 已提交
236
	})
D
dragondriver 已提交
237 238 239
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
240
	return ret.(*internalpb.ComponentStates), err
S
sunby 已提交
241 242
}

G
godchen 已提交
243
func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
244
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
245 246 247 248 249 250
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{})
G
godchen 已提交
251
	})
D
dragondriver 已提交
252 253 254
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
255
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
256 257
}

G
godchen 已提交
258
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
259
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
260 261 262 263 264 265
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
G
godchen 已提交
266
	})
D
dragondriver 已提交
267 268 269
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
270
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
271 272
}

273
func (c *Client) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
G
godchen 已提交
274
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
275 276 277 278 279 280
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.Flush(ctx, req)
G
godchen 已提交
281
	})
D
dragondriver 已提交
282 283 284
	if err != nil || ret == nil {
		return nil, err
	}
285
	return ret.(*datapb.FlushResponse), err
S
sunby 已提交
286 287
}

G
godchen 已提交
288
func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
G
godchen 已提交
289
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
290 291 292 293 294 295
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.AssignSegmentID(ctx, req)
G
godchen 已提交
296
	})
D
dragondriver 已提交
297 298 299
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
300
	return ret.(*datapb.AssignSegmentIDResponse), err
S
sunby 已提交
301 302
}

G
godchen 已提交
303
func (c *Client) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
G
godchen 已提交
304
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
305 306 307 308 309 310
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.GetSegmentStates(ctx, req)
G
godchen 已提交
311
	})
D
dragondriver 已提交
312 313 314
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
315
	return ret.(*datapb.GetSegmentStatesResponse), err
S
sunby 已提交
316 317
}

G
godchen 已提交
318
func (c *Client) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
G
godchen 已提交
319
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
320 321 322 323 324 325
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.GetInsertBinlogPaths(ctx, req)
G
godchen 已提交
326
	})
D
dragondriver 已提交
327 328 329
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
330
	return ret.(*datapb.GetInsertBinlogPathsResponse), err
S
sunby 已提交
331 332
}

G
godchen 已提交
333
func (c *Client) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
G
godchen 已提交
334
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
335 336 337 338 339 340
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.GetCollectionStatistics(ctx, req)
G
godchen 已提交
341
	})
D
dragondriver 已提交
342 343 344
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
345
	return ret.(*datapb.GetCollectionStatisticsResponse), err
S
sunby 已提交
346 347
}

G
godchen 已提交
348
func (c *Client) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) {
G
godchen 已提交
349
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
350 351 352 353 354 355
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.GetPartitionStatistics(ctx, req)
G
godchen 已提交
356
	})
D
dragondriver 已提交
357 358 359
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
360
	return ret.(*datapb.GetPartitionStatisticsResponse), err
S
sunby 已提交
361
}
N
neza2017 已提交
362

G
godchen 已提交
363
func (c *Client) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
364
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
365 366 367 368 369 370
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.GetSegmentInfoChannel(ctx, &datapb.GetSegmentInfoChannelRequest{})
G
godchen 已提交
371
	})
D
dragondriver 已提交
372 373 374
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
375
	return ret.(*milvuspb.StringResponse), err
N
neza2017 已提交
376
}
377

G
godchen 已提交
378
func (c *Client) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
G
godchen 已提交
379
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
380 381 382 383 384 385
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.GetSegmentInfo(ctx, req)
G
godchen 已提交
386
	})
D
dragondriver 已提交
387 388 389
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
390
	return ret.(*datapb.GetSegmentInfoResponse), err
X
XuanYang-cn 已提交
391
}
392

393
func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
D
dragondriver 已提交
394 395 396 397 398 399 400 401 402 403
	// FIXME(dragondriver): why not to recall here?
	client, err := c.getGrpcClient()
	if err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	return client.SaveBinlogPaths(ctx, req)
404 405
}

406 407
func (c *Client) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
408 409 410 411 412 413
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.GetRecoveryInfo(ctx, req)
414
	})
D
dragondriver 已提交
415 416 417
	if err != nil || ret == nil {
		return nil, err
	}
418 419
	return ret.(*datapb.GetRecoveryInfoResponse), err
}
420 421 422

func (c *Client) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
423 424 425 426 427 428
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.GetFlushedSegments(ctx, req)
429
	})
D
dragondriver 已提交
430 431 432
	if err != nil || ret == nil {
		return nil, err
	}
433 434
	return ret.(*datapb.GetFlushedSegmentsResponse), err
}
435 436 437

func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
438 439 440 441 442 443
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}

		return client.GetMetrics(ctx, req)
444
	})
D
dragondriver 已提交
445 446 447
	if err != nil || ret == nil {
		return nil, err
	}
448 449
	return ret.(*milvuspb.GetMetricsResponse), err
}