client.go 21.6 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
C
cai.zhang 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
C
cai.zhang 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
C
cai.zhang 已提交
16

17
package grpcdatacoordclient
S
sunby 已提交
18 19 20

import (
	"context"
G
godchen 已提交
21
	"fmt"
D
dragondriver 已提交
22
	"sync"
G
godchen 已提交
23
	"time"
S
sunby 已提交
24

G
godchen 已提交
25 26
	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
	grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
G
godchen 已提交
27
	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
X
Xiangyu Wang 已提交
28 29
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
30
	"github.com/milvus-io/milvus/internal/util/funcutil"
X
Xiangyu Wang 已提交
31
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
32
	"github.com/milvus-io/milvus/internal/util/sessionutil"
G
godchen 已提交
33
	"github.com/milvus-io/milvus/internal/util/trace"
G
godchen 已提交
34
	"github.com/milvus-io/milvus/internal/util/typeutil"
G
godchen 已提交
35
	"go.uber.org/zap"
S
sunby 已提交
36
	"google.golang.org/grpc"
37
	"google.golang.org/grpc/codes"
38
	"google.golang.org/grpc/keepalive"
S
sunby 已提交
39

X
Xiangyu Wang 已提交
40 41 42
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
S
sunby 已提交
43 44
)

45
// Client is the datacoord grpc client
S
sunby 已提交
46
type Client struct {
G
godchen 已提交
47 48 49
	ctx    context.Context
	cancel context.CancelFunc

D
dragondriver 已提交
50 51 52
	grpcClient    datapb.DataCoordClient
	conn          *grpc.ClientConn
	grpcClientMtx sync.RWMutex
G
godchen 已提交
53

G
godchen 已提交
54
	sess *sessionutil.Session
G
godchen 已提交
55
	addr string
56 57 58 59 60 61

	getGrpcClient func() (datapb.DataCoordClient, error)
}

func (c *Client) setGetGrpcClientFunc() {
	c.getGrpcClient = c.getGrpcClientFunc
G
godchen 已提交
62 63
}

64
func (c *Client) getGrpcClientFunc() (datapb.DataCoordClient, error) {
D
dragondriver 已提交
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
	c.grpcClientMtx.RLock()
	if c.grpcClient != nil {
		defer c.grpcClientMtx.RUnlock()
		return c.grpcClient, nil
	}
	c.grpcClientMtx.RUnlock()

	c.grpcClientMtx.Lock()
	defer c.grpcClientMtx.Unlock()

	if c.grpcClient != nil {
		return c.grpcClient, nil
	}

	// FIXME(dragondriver): how to handle error here?
	// if we return nil here, then we should check if client is nil outside,
	err := c.connect(retry.Attempts(20))
	if err != nil {
		return nil, err
	}

	return c.grpcClient, nil
}

func (c *Client) resetConnection() {
	c.grpcClientMtx.Lock()
	defer c.grpcClientMtx.Unlock()

	if c.conn != nil {
		_ = c.conn.Close()
	}
	c.conn = nil
	c.grpcClient = nil
}

100
func getDataCoordAddress(sess *sessionutil.Session) (string, error) {
101
	key := typeutil.DataCoordRole
G
godchen 已提交
102 103
	msess, _, err := sess.GetSessions(key)
	if err != nil {
104
		log.Debug("DataCoordClient, getSessions failed", zap.Any("key", key), zap.Error(err))
G
godchen 已提交
105 106 107 108
		return "", err
	}
	ms, ok := msess[key]
	if !ok {
109 110
		log.Debug("DataCoordClient, not existed in msess ", zap.Any("key", key), zap.Any("len of msess", len(msess)))
		return "", fmt.Errorf("number of datacoord is incorrect, %d", len(msess))
G
godchen 已提交
111 112
	}
	return ms.Address, nil
S
sunby 已提交
113 114
}

S
sunby 已提交
115
// NewClient creates a new client instance
G
godchen 已提交
116
func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*Client, error) {
G
godchen 已提交
117 118 119 120 121
	sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints)
	if sess == nil {
		err := fmt.Errorf("new session error, maybe can not connect to etcd")
		log.Debug("DataCoordClient NewClient failed", zap.Error(err))
		return nil, err
S
sunby 已提交
122
	}
G
godchen 已提交
123
	ctx, cancel := context.WithCancel(ctx)
124
	client := &Client{
G
godchen 已提交
125 126 127
		ctx:    ctx,
		cancel: cancel,
		sess:   sess,
128 129 130 131
	}

	client.setGetGrpcClientFunc()
	return client, nil
S
sunby 已提交
132 133
}

S
sunby 已提交
134
// Init initializes the client
S
sunby 已提交
135
func (c *Client) Init() error {
136
	Params.Init()
D
dragondriver 已提交
137
	return nil
G
godchen 已提交
138 139
}

G
godchen 已提交
140
func (c *Client) connect(retryOptions ...retry.Option) error {
141 142 143 144 145 146
	var kacp = keepalive.ClientParameters{
		Time:                60 * time.Second, // send pings every 60 seconds if there is no activity
		Timeout:             6 * time.Second,  // wait 6 second for ping ack before considering the connection dead
		PermitWithoutStream: true,             // send pings even without active streams
	}

G
godchen 已提交
147
	var err error
G
godchen 已提交
148
	connectDataCoordFn := func() error {
149
		c.addr, err = getDataCoordAddress(c.sess)
G
godchen 已提交
150
		if err != nil {
G
godchen 已提交
151
			log.Debug("DataCoordClient getDataCoordAddr failed", zap.Error(err))
G
godchen 已提交
152 153
			return err
		}
G
godchen 已提交
154
		opts := trace.GetInterceptorOpts()
155
		log.Debug("DataCoordClient try reconnect ", zap.String("address", c.addr))
G
godchen 已提交
156 157 158
		ctx, cancel := context.WithTimeout(c.ctx, 15*time.Second)
		defer cancel()
		conn, err := grpc.DialContext(ctx, c.addr,
159
			grpc.WithKeepaliveParams(kacp),
G
godchen 已提交
160
			grpc.WithInsecure(), grpc.WithBlock(),
161 162 163
			grpc.WithDefaultCallOptions(
				grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),
				grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)),
G
godchen 已提交
164
			grpc.WithUnaryInterceptor(
G
godchen 已提交
165
				grpc_middleware.ChainUnaryClient(
166 167 168 169
					grpc_retry.UnaryClientInterceptor(
						grpc_retry.WithMax(3),
						grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
					),
G
godchen 已提交
170 171
					grpc_opentracing.UnaryClientInterceptor(opts...),
				)),
G
godchen 已提交
172
			grpc.WithStreamInterceptor(
G
godchen 已提交
173
				grpc_middleware.ChainStreamClient(
174 175 176
					grpc_retry.StreamClientInterceptor(grpc_retry.WithMax(3),
						grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
					),
G
godchen 已提交
177 178 179
					grpc_opentracing.StreamClientInterceptor(opts...),
				)),
		)
180 181
		if err != nil {
			return err
S
sunby 已提交
182
		}
D
dragondriver 已提交
183 184 185
		if c.conn != nil {
			_ = c.conn.Close()
		}
186 187
		c.conn = conn
		return nil
S
sunby 已提交
188
	}
189

G
godchen 已提交
190
	err = retry.Do(c.ctx, connectDataCoordFn, retryOptions...)
S
sunby 已提交
191
	if err != nil {
192
		log.Debug("DataCoord try reconnect failed", zap.Error(err))
S
sunby 已提交
193 194
		return err
	}
195
	c.grpcClient = datapb.NewDataCoordClient(c.conn)
S
sunby 已提交
196 197 198
	return nil
}

G
godchen 已提交
199 200 201 202 203
func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) {
	ret, err := caller()
	if err == nil {
		return ret, nil
	}
204
	if err == context.Canceled || err == context.DeadlineExceeded {
205
		return nil, fmt.Errorf("err: %s\n, %s", err.Error(), trace.StackTrace())
206 207
	}

N
neza2017 已提交
208
	log.Debug("DataCoord Client grpc error", zap.Error(err))
D
dragondriver 已提交
209 210 211

	c.resetConnection()

G
godchen 已提交
212
	ret, err = caller()
213 214 215 216
	if err != nil {
		return nil, fmt.Errorf("err: %s\n, %s", err.Error(), trace.StackTrace())
	}

G
godchen 已提交
217 218 219
	return ret, err
}

220
// Start enables the client
S
sunby 已提交
221 222 223 224 225
func (c *Client) Start() error {
	return nil
}

func (c *Client) Stop() error {
G
godchen 已提交
226
	c.cancel()
D
dragondriver 已提交
227 228 229 230 231 232
	c.grpcClientMtx.Lock()
	defer c.grpcClientMtx.Unlock()
	if c.conn != nil {
		return c.conn.Close()
	}
	return nil
S
sunby 已提交
233 234
}

235 236 237 238 239
// Register dumy
func (c *Client) Register() error {
	return nil
}

G
godchen 已提交
240
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
G
godchen 已提交
241
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
242 243 244 245
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
246 247 248
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
249
		return client.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
G
godchen 已提交
250
	})
D
dragondriver 已提交
251 252 253
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
254
	return ret.(*internalpb.ComponentStates), err
S
sunby 已提交
255 256
}

257
// GetTimeTickChannel return the name of time tick channel.
G
godchen 已提交
258
func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
259
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
260 261 262 263
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
264 265 266
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
267
		return client.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{})
G
godchen 已提交
268
	})
D
dragondriver 已提交
269 270 271
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
272
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
273 274
}

275
// GetStatisticsChannel return the name of statistics channel.
G
godchen 已提交
276
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
277
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
278 279 280 281
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
282 283 284
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
285
		return client.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
G
godchen 已提交
286
	})
D
dragondriver 已提交
287 288 289
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
290
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
291 292
}

293
func (c *Client) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
G
godchen 已提交
294
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
295 296 297 298
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
299 300 301
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
302
		return client.Flush(ctx, req)
G
godchen 已提交
303
	})
D
dragondriver 已提交
304 305 306
	if err != nil || ret == nil {
		return nil, err
	}
307
	return ret.(*datapb.FlushResponse), err
S
sunby 已提交
308 309
}

310 311 312 313 314 315 316 317 318 319 320 321 322
// AssignSegmentID applies allocations for specified Coolection/Partition and related Channel Name(Virtial Channel)
//
// ctx is the context to control request deadline and cancellation
// req contains the requester's info(id and role) and the list of Assignment Request,
// which coontains the specified collection, partitaion id, the related VChannel Name and row count it needs
//
// response struct `AssignSegmentIDResponse` contains the the assignment result for each request
// error is returned only when some communication issue occurs
// if some error occurs in the process of `AssignSegmentID`, it will be recorded and returned in `Status` field of response
//
// `AssignSegmentID` will applies current configured allocation policies for each request
// if the VChannel is newly used, `WatchDmlChannels` will be invoked to notify a `DataNode`(selected by policy) to watch it
// if there is anything make the allocation impossible, the response will not contain the corresponding result
G
godchen 已提交
323
func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
G
godchen 已提交
324
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
325 326 327 328
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
329 330 331
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
332
		return client.AssignSegmentID(ctx, req)
G
godchen 已提交
333
	})
D
dragondriver 已提交
334 335 336
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
337
	return ret.(*datapb.AssignSegmentIDResponse), err
S
sunby 已提交
338 339
}

340 341 342 343 344 345 346 347 348
// GetSegmentStates requests segment state information
//
// ctx is the context to control request deadline and cancellation
// req contains the list of segment id to query
//
// response struct `GetSegmentStatesResponse` contains the list of each state query result
// 	when the segment is not found, the state entry will has the field `Status`  to identify failure
// 	otherwise the Segment State and Start position information will be returned
// error is returned only when some communication issue occurs
G
godchen 已提交
349
func (c *Client) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
G
godchen 已提交
350
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
351 352 353 354
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
355 356 357
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
358
		return client.GetSegmentStates(ctx, req)
G
godchen 已提交
359
	})
D
dragondriver 已提交
360 361 362
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
363
	return ret.(*datapb.GetSegmentStatesResponse), err
S
sunby 已提交
364 365
}

366 367 368 369 370 371 372 373
// GetInsertBinlogPaths requests binlog paths for specified segment
//
// ctx is the context to control request deadline and cancellation
// req contains the segment id to query
//
// response struct `GetInsertBinlogPathsResponse` contains the fields list
// 	and corresponding binlog path list
// error is returned only when some communication issue occurs
G
godchen 已提交
374
func (c *Client) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
G
godchen 已提交
375
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
376 377 378 379
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
380 381 382
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
383
		return client.GetInsertBinlogPaths(ctx, req)
G
godchen 已提交
384
	})
D
dragondriver 已提交
385 386 387
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
388
	return ret.(*datapb.GetInsertBinlogPathsResponse), err
S
sunby 已提交
389 390
}

391 392 393 394 395 396 397 398
// GetCollectionStatistics requests collection statistics
//
// ctx is the context to control request deadline and cancellation
// req contains the collection id to query
//
// response struct `GetCollectionStatisticsResponse` contains the key-value list fields returning related data
// 	only row count for now
// error is returned only when some communication issue occurs
G
godchen 已提交
399
func (c *Client) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
G
godchen 已提交
400
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
401 402 403 404
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
405 406 407
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
408
		return client.GetCollectionStatistics(ctx, req)
G
godchen 已提交
409
	})
D
dragondriver 已提交
410 411 412
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
413
	return ret.(*datapb.GetCollectionStatisticsResponse), err
S
sunby 已提交
414 415
}

416 417 418 419 420 421 422 423
// GetPartitionStatistics requests partition statistics
//
// ctx is the context to control request deadline and cancellation
// req contains the collection and partition id to query
//
// response struct `GetPartitionStatisticsResponse` contains the key-value list fields returning related data
// 	only row count for now
// error is returned only when some communication issue occurs
G
godchen 已提交
424
func (c *Client) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) {
G
godchen 已提交
425
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
426 427 428 429
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
430 431 432
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
433
		return client.GetPartitionStatistics(ctx, req)
G
godchen 已提交
434
	})
D
dragondriver 已提交
435 436 437
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
438
	return ret.(*datapb.GetPartitionStatisticsResponse), err
S
sunby 已提交
439
}
N
neza2017 已提交
440

441 442
// GetSegmentInfoChannel DEPRECATED
// legacy api to get SegmentInfo Channel name
G
godchen 已提交
443
func (c *Client) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
444
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
445 446 447 448
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
449 450 451
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
452
		return client.GetSegmentInfoChannel(ctx, &datapb.GetSegmentInfoChannelRequest{})
G
godchen 已提交
453
	})
D
dragondriver 已提交
454 455 456
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
457
	return ret.(*milvuspb.StringResponse), err
N
neza2017 已提交
458
}
459

460 461 462 463 464 465 466
// GetSegmentInfo requests segment info
//
// ctx is the context to control request deadline and cancellation
// req contains the list of segment ids to query
//
// response struct `GetSegmentInfoResponse` contains the list of segment info
// error is returned only when some communication issue occurs
G
godchen 已提交
467
func (c *Client) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
G
godchen 已提交
468
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
469 470 471 472
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
473 474 475
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
476
		return client.GetSegmentInfo(ctx, req)
G
godchen 已提交
477
	})
D
dragondriver 已提交
478 479 480
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
481
	return ret.(*datapb.GetSegmentInfoResponse), err
X
XuanYang-cn 已提交
482
}
483

484 485 486 487 488 489 490 491 492 493 494 495
// SaveBinlogPaths updates segments binlogs(including insert binlogs, stats logs and delta logs)
//  and related message stream positions
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//
// response status contains the status/error code and failing reason if any
// error is returned only when some communication issue occurs
//
// there is a constraint that the `SaveBinlogPaths` requests of same segment shall be passed in sequence
// 	the root reason is each `SaveBinlogPaths` will overwrite the checkpoint position
//  if the constraint is broken, the checkpoint position will not be monotonically increasing and the integrity will be compromised
496
func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
D
dragondriver 已提交
497 498 499 500 501 502 503 504 505 506
	// FIXME(dragondriver): why not to recall here?
	client, err := c.getGrpcClient()
	if err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	return client.SaveBinlogPaths(ctx, req)
507 508
}

509 510 511 512 513 514 515
// GetRecoveryInfo request segment recovery info of collection/partition
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//
// response struct `GetRecoveryInfoResponse` contains the list of segments info and corresponding vchannel info
// error is returned only when some communication issue occurs
516 517
func (c *Client) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
518 519 520 521
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
522 523 524
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
525
		return client.GetRecoveryInfo(ctx, req)
526
	})
D
dragondriver 已提交
527 528 529
	if err != nil || ret == nil {
		return nil, err
	}
530 531
	return ret.(*datapb.GetRecoveryInfoResponse), err
}
532

533 534 535 536 537 538 539 540
// GetFlushedSegments returns flushed segment list of requested collection/parition
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//  when partition is lesser or equal to 0, all flushed segments of collection will be returned
//
// response struct `GetFlushedSegmentsResponse` contains flushed segment id list
// error is returned only when some communication issue occurs
541 542
func (c *Client) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
543 544 545 546
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
547 548 549
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
550
		return client.GetFlushedSegments(ctx, req)
551
	})
D
dragondriver 已提交
552 553 554
	if err != nil || ret == nil {
		return nil, err
	}
555 556
	return ret.(*datapb.GetFlushedSegmentsResponse), err
}
557

558
// GetMetrics gets all metrics of datacoord
559 560
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
561 562 563 564
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
565 566 567
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
568
		return client.GetMetrics(ctx, req)
569
	})
D
dragondriver 已提交
570 571 572
	if err != nil || ret == nil {
		return nil, err
	}
573 574
	return ret.(*milvuspb.GetMetricsResponse), err
}
S
sunby 已提交
575 576 577 578 579 580 581

func (c *Client) CompleteCompaction(ctx context.Context, req *datapb.CompactionResult) (*commonpb.Status, error) {
	ret, err := c.recall(func() (interface{}, error) {
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
582 583 584
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
S
sunby 已提交
585 586 587 588 589 590 591 592
		return client.CompleteCompaction(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*commonpb.Status), err
}

593
func (c *Client) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
S
sunby 已提交
594 595 596 597 598
	ret, err := c.recall(func() (interface{}, error) {
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
599 600 601
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
S
sunby 已提交
602 603 604 605 606
		return client.ManualCompaction(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
	}
607
	return ret.(*milvuspb.ManualCompactionResponse), err
S
sunby 已提交
608 609
}

610
func (c *Client) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
S
sunby 已提交
611 612 613 614 615
	ret, err := c.recall(func() (interface{}, error) {
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
616 617 618
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
S
sunby 已提交
619 620 621 622 623
		return client.GetCompactionState(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
	}
624 625 626 627 628 629 630 631 632
	return ret.(*milvuspb.GetCompactionStateResponse), err
}

func (c *Client) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
633 634 635
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
636 637 638 639 640 641
		return client.GetCompactionStateWithPlans(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*milvuspb.GetCompactionPlansResponse), err
S
sunby 已提交
642
}
G
godchen 已提交
643 644 645 646 647 648 649

func (c *Client) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
650 651 652
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
G
godchen 已提交
653 654 655 656 657 658 659
		return client.WatchChannels(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*datapb.WatchChannelsResponse), err
}
B
Bingyi Sun 已提交
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677

// GetFlushState gets the flush state of multiple segments
func (c *Client) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
		return client.GetFlushState(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*milvuspb.GetFlushStateResponse), err
}
678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695

// DropVirtualChannel drops virtual channel in datacoord.
func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
		return client.DropVirtualChannel(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*datapb.DropVirtualChannelResponse), err
}