client.go 20.0 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
C
cai.zhang 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
C
cai.zhang 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
C
cai.zhang 已提交
16

17
package grpcdatacoordclient
S
sunby 已提交
18 19 20

import (
	"context"
G
godchen 已提交
21
	"fmt"
D
dragondriver 已提交
22
	"sync"
G
godchen 已提交
23
	"time"
S
sunby 已提交
24

G
godchen 已提交
25 26
	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
	grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
G
godchen 已提交
27
	grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
X
Xiangyu Wang 已提交
28 29
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
30
	"github.com/milvus-io/milvus/internal/util/funcutil"
X
Xiangyu Wang 已提交
31
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
32
	"github.com/milvus-io/milvus/internal/util/sessionutil"
G
godchen 已提交
33
	"github.com/milvus-io/milvus/internal/util/trace"
G
godchen 已提交
34
	"github.com/milvus-io/milvus/internal/util/typeutil"
G
godchen 已提交
35
	"go.uber.org/zap"
S
sunby 已提交
36
	"google.golang.org/grpc"
37
	"google.golang.org/grpc/codes"
S
sunby 已提交
38

X
Xiangyu Wang 已提交
39 40 41
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
S
sunby 已提交
42 43
)

44
// Client is the datacoord grpc client
S
sunby 已提交
45
type Client struct {
G
godchen 已提交
46 47 48
	ctx    context.Context
	cancel context.CancelFunc

D
dragondriver 已提交
49 50 51
	grpcClient    datapb.DataCoordClient
	conn          *grpc.ClientConn
	grpcClientMtx sync.RWMutex
G
godchen 已提交
52

G
godchen 已提交
53
	sess *sessionutil.Session
G
godchen 已提交
54
	addr string
55 56 57 58 59 60

	getGrpcClient func() (datapb.DataCoordClient, error)
}

func (c *Client) setGetGrpcClientFunc() {
	c.getGrpcClient = c.getGrpcClientFunc
G
godchen 已提交
61 62
}

63
func (c *Client) getGrpcClientFunc() (datapb.DataCoordClient, error) {
D
dragondriver 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
	c.grpcClientMtx.RLock()
	if c.grpcClient != nil {
		defer c.grpcClientMtx.RUnlock()
		return c.grpcClient, nil
	}
	c.grpcClientMtx.RUnlock()

	c.grpcClientMtx.Lock()
	defer c.grpcClientMtx.Unlock()

	if c.grpcClient != nil {
		return c.grpcClient, nil
	}

	// FIXME(dragondriver): how to handle error here?
	// if we return nil here, then we should check if client is nil outside,
	err := c.connect(retry.Attempts(20))
	if err != nil {
		return nil, err
	}

	return c.grpcClient, nil
}

func (c *Client) resetConnection() {
	c.grpcClientMtx.Lock()
	defer c.grpcClientMtx.Unlock()

	if c.conn != nil {
		_ = c.conn.Close()
	}
	c.conn = nil
	c.grpcClient = nil
}

99
func getDataCoordAddress(sess *sessionutil.Session) (string, error) {
100
	key := typeutil.DataCoordRole
G
godchen 已提交
101 102
	msess, _, err := sess.GetSessions(key)
	if err != nil {
103
		log.Debug("DataCoordClient, getSessions failed", zap.Any("key", key), zap.Error(err))
G
godchen 已提交
104 105 106 107
		return "", err
	}
	ms, ok := msess[key]
	if !ok {
108 109
		log.Debug("DataCoordClient, not existed in msess ", zap.Any("key", key), zap.Any("len of msess", len(msess)))
		return "", fmt.Errorf("number of datacoord is incorrect, %d", len(msess))
G
godchen 已提交
110 111
	}
	return ms.Address, nil
S
sunby 已提交
112 113
}

S
sunby 已提交
114
// NewClient creates a new client instance
G
godchen 已提交
115
func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*Client, error) {
G
godchen 已提交
116 117 118 119 120
	sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints)
	if sess == nil {
		err := fmt.Errorf("new session error, maybe can not connect to etcd")
		log.Debug("DataCoordClient NewClient failed", zap.Error(err))
		return nil, err
S
sunby 已提交
121
	}
G
godchen 已提交
122
	ctx, cancel := context.WithCancel(ctx)
123
	client := &Client{
G
godchen 已提交
124 125 126
		ctx:    ctx,
		cancel: cancel,
		sess:   sess,
127 128 129 130
	}

	client.setGetGrpcClientFunc()
	return client, nil
S
sunby 已提交
131 132
}

S
sunby 已提交
133
// Init initializes the client
S
sunby 已提交
134
func (c *Client) Init() error {
135
	Params.Init()
D
dragondriver 已提交
136
	return nil
G
godchen 已提交
137 138
}

G
godchen 已提交
139
func (c *Client) connect(retryOptions ...retry.Option) error {
G
godchen 已提交
140
	var err error
G
godchen 已提交
141
	connectDataCoordFn := func() error {
142
		c.addr, err = getDataCoordAddress(c.sess)
G
godchen 已提交
143
		if err != nil {
G
godchen 已提交
144
			log.Debug("DataCoordClient getDataCoordAddr failed", zap.Error(err))
G
godchen 已提交
145 146
			return err
		}
G
godchen 已提交
147
		opts := trace.GetInterceptorOpts()
148
		log.Debug("DataCoordClient try reconnect ", zap.String("address", c.addr))
G
godchen 已提交
149 150 151
		ctx, cancel := context.WithTimeout(c.ctx, 15*time.Second)
		defer cancel()
		conn, err := grpc.DialContext(ctx, c.addr,
G
godchen 已提交
152
			grpc.WithInsecure(), grpc.WithBlock(),
153 154 155
			grpc.WithDefaultCallOptions(
				grpc.MaxCallRecvMsgSize(Params.ClientMaxRecvSize),
				grpc.MaxCallSendMsgSize(Params.ClientMaxSendSize)),
G
godchen 已提交
156
			grpc.WithUnaryInterceptor(
G
godchen 已提交
157
				grpc_middleware.ChainUnaryClient(
158 159 160 161
					grpc_retry.UnaryClientInterceptor(
						grpc_retry.WithMax(3),
						grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
					),
G
godchen 已提交
162 163
					grpc_opentracing.UnaryClientInterceptor(opts...),
				)),
G
godchen 已提交
164
			grpc.WithStreamInterceptor(
G
godchen 已提交
165
				grpc_middleware.ChainStreamClient(
166 167 168
					grpc_retry.StreamClientInterceptor(grpc_retry.WithMax(3),
						grpc_retry.WithCodes(codes.Aborted, codes.Unavailable),
					),
G
godchen 已提交
169 170 171
					grpc_opentracing.StreamClientInterceptor(opts...),
				)),
		)
172 173
		if err != nil {
			return err
S
sunby 已提交
174
		}
D
dragondriver 已提交
175 176 177
		if c.conn != nil {
			_ = c.conn.Close()
		}
178 179
		c.conn = conn
		return nil
S
sunby 已提交
180
	}
181

G
godchen 已提交
182
	err = retry.Do(c.ctx, connectDataCoordFn, retryOptions...)
S
sunby 已提交
183
	if err != nil {
184
		log.Debug("DataCoord try reconnect failed", zap.Error(err))
S
sunby 已提交
185 186
		return err
	}
187
	c.grpcClient = datapb.NewDataCoordClient(c.conn)
S
sunby 已提交
188 189 190
	return nil
}

G
godchen 已提交
191 192 193 194 195
func (c *Client) recall(caller func() (interface{}, error)) (interface{}, error) {
	ret, err := caller()
	if err == nil {
		return ret, nil
	}
196 197 198 199
	if err == context.Canceled || err == context.DeadlineExceeded {
		return nil, err
	}

N
neza2017 已提交
200
	log.Debug("DataCoord Client grpc error", zap.Error(err))
D
dragondriver 已提交
201 202 203

	c.resetConnection()

G
godchen 已提交
204
	ret, err = caller()
G
godchen 已提交
205 206 207
	return ret, err
}

208
// Start enables the client
S
sunby 已提交
209 210 211 212 213
func (c *Client) Start() error {
	return nil
}

func (c *Client) Stop() error {
G
godchen 已提交
214
	c.cancel()
D
dragondriver 已提交
215 216 217 218 219 220
	c.grpcClientMtx.Lock()
	defer c.grpcClientMtx.Unlock()
	if c.conn != nil {
		return c.conn.Close()
	}
	return nil
S
sunby 已提交
221 222
}

223 224 225 226 227
// Register dumy
func (c *Client) Register() error {
	return nil
}

G
godchen 已提交
228
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
G
godchen 已提交
229
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
230 231 232 233
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
234 235 236
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
237
		return client.GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
G
godchen 已提交
238
	})
D
dragondriver 已提交
239 240 241
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
242
	return ret.(*internalpb.ComponentStates), err
S
sunby 已提交
243 244
}

245
// GetTimeTickChannel return the name of time tick channel.
G
godchen 已提交
246
func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
247
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
248 249 250 251
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
252 253 254
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
255
		return client.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{})
G
godchen 已提交
256
	})
D
dragondriver 已提交
257 258 259
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
260
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
261 262
}

263
// GetStatisticsChannel return the name of statistics channel.
G
godchen 已提交
264
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
265
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
266 267 268 269
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
270 271 272
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
273
		return client.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
G
godchen 已提交
274
	})
D
dragondriver 已提交
275 276 277
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
278
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
279 280
}

281
func (c *Client) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
G
godchen 已提交
282
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
283 284 285 286
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
287 288 289
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
290
		return client.Flush(ctx, req)
G
godchen 已提交
291
	})
D
dragondriver 已提交
292 293 294
	if err != nil || ret == nil {
		return nil, err
	}
295
	return ret.(*datapb.FlushResponse), err
S
sunby 已提交
296 297
}

298 299 300 301 302 303 304 305 306 307 308 309 310
// AssignSegmentID applies allocations for specified Coolection/Partition and related Channel Name(Virtial Channel)
//
// ctx is the context to control request deadline and cancellation
// req contains the requester's info(id and role) and the list of Assignment Request,
// which coontains the specified collection, partitaion id, the related VChannel Name and row count it needs
//
// response struct `AssignSegmentIDResponse` contains the the assignment result for each request
// error is returned only when some communication issue occurs
// if some error occurs in the process of `AssignSegmentID`, it will be recorded and returned in `Status` field of response
//
// `AssignSegmentID` will applies current configured allocation policies for each request
// if the VChannel is newly used, `WatchDmlChannels` will be invoked to notify a `DataNode`(selected by policy) to watch it
// if there is anything make the allocation impossible, the response will not contain the corresponding result
G
godchen 已提交
311
func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
G
godchen 已提交
312
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
313 314 315 316
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
317 318 319
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
320
		return client.AssignSegmentID(ctx, req)
G
godchen 已提交
321
	})
D
dragondriver 已提交
322 323 324
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
325
	return ret.(*datapb.AssignSegmentIDResponse), err
S
sunby 已提交
326 327
}

328 329 330 331 332 333 334 335 336
// GetSegmentStates requests segment state information
//
// ctx is the context to control request deadline and cancellation
// req contains the list of segment id to query
//
// response struct `GetSegmentStatesResponse` contains the list of each state query result
// 	when the segment is not found, the state entry will has the field `Status`  to identify failure
// 	otherwise the Segment State and Start position information will be returned
// error is returned only when some communication issue occurs
G
godchen 已提交
337
func (c *Client) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
G
godchen 已提交
338
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
339 340 341 342
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
343 344 345
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
346
		return client.GetSegmentStates(ctx, req)
G
godchen 已提交
347
	})
D
dragondriver 已提交
348 349 350
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
351
	return ret.(*datapb.GetSegmentStatesResponse), err
S
sunby 已提交
352 353
}

354 355 356 357 358 359 360 361
// GetInsertBinlogPaths requests binlog paths for specified segment
//
// ctx is the context to control request deadline and cancellation
// req contains the segment id to query
//
// response struct `GetInsertBinlogPathsResponse` contains the fields list
// 	and corresponding binlog path list
// error is returned only when some communication issue occurs
G
godchen 已提交
362
func (c *Client) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
G
godchen 已提交
363
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
364 365 366 367
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
368 369 370
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
371
		return client.GetInsertBinlogPaths(ctx, req)
G
godchen 已提交
372
	})
D
dragondriver 已提交
373 374 375
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
376
	return ret.(*datapb.GetInsertBinlogPathsResponse), err
S
sunby 已提交
377 378
}

379 380 381 382 383 384 385 386
// GetCollectionStatistics requests collection statistics
//
// ctx is the context to control request deadline and cancellation
// req contains the collection id to query
//
// response struct `GetCollectionStatisticsResponse` contains the key-value list fields returning related data
// 	only row count for now
// error is returned only when some communication issue occurs
G
godchen 已提交
387
func (c *Client) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
G
godchen 已提交
388
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
389 390 391 392
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
393 394 395
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
396
		return client.GetCollectionStatistics(ctx, req)
G
godchen 已提交
397
	})
D
dragondriver 已提交
398 399 400
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
401
	return ret.(*datapb.GetCollectionStatisticsResponse), err
S
sunby 已提交
402 403
}

404 405 406 407 408 409 410 411
// GetPartitionStatistics requests partition statistics
//
// ctx is the context to control request deadline and cancellation
// req contains the collection and partition id to query
//
// response struct `GetPartitionStatisticsResponse` contains the key-value list fields returning related data
// 	only row count for now
// error is returned only when some communication issue occurs
G
godchen 已提交
412
func (c *Client) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) {
G
godchen 已提交
413
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
414 415 416 417
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
418 419 420
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
421
		return client.GetPartitionStatistics(ctx, req)
G
godchen 已提交
422
	})
D
dragondriver 已提交
423 424 425
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
426
	return ret.(*datapb.GetPartitionStatisticsResponse), err
S
sunby 已提交
427
}
N
neza2017 已提交
428

429 430
// GetSegmentInfoChannel DEPRECATED
// legacy api to get SegmentInfo Channel name
G
godchen 已提交
431
func (c *Client) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
G
godchen 已提交
432
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
433 434 435 436
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
437 438 439
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
440
		return client.GetSegmentInfoChannel(ctx, &datapb.GetSegmentInfoChannelRequest{})
G
godchen 已提交
441
	})
D
dragondriver 已提交
442 443 444
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
445
	return ret.(*milvuspb.StringResponse), err
N
neza2017 已提交
446
}
447

448 449 450 451 452 453 454
// GetSegmentInfo requests segment info
//
// ctx is the context to control request deadline and cancellation
// req contains the list of segment ids to query
//
// response struct `GetSegmentInfoResponse` contains the list of segment info
// error is returned only when some communication issue occurs
G
godchen 已提交
455
func (c *Client) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
G
godchen 已提交
456
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
457 458 459 460
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
461 462 463
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
464
		return client.GetSegmentInfo(ctx, req)
G
godchen 已提交
465
	})
D
dragondriver 已提交
466 467 468
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
469
	return ret.(*datapb.GetSegmentInfoResponse), err
X
XuanYang-cn 已提交
470
}
471

472 473 474 475 476 477 478 479 480 481 482 483
// SaveBinlogPaths updates segments binlogs(including insert binlogs, stats logs and delta logs)
//  and related message stream positions
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//
// response status contains the status/error code and failing reason if any
// error is returned only when some communication issue occurs
//
// there is a constraint that the `SaveBinlogPaths` requests of same segment shall be passed in sequence
// 	the root reason is each `SaveBinlogPaths` will overwrite the checkpoint position
//  if the constraint is broken, the checkpoint position will not be monotonically increasing and the integrity will be compromised
484
func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
D
dragondriver 已提交
485 486 487 488 489 490 491 492 493 494
	// FIXME(dragondriver): why not to recall here?
	client, err := c.getGrpcClient()
	if err != nil {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	return client.SaveBinlogPaths(ctx, req)
495 496
}

497 498 499 500 501 502 503
// GetRecoveryInfo request segment recovery info of collection/partition
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//
// response struct `GetRecoveryInfoResponse` contains the list of segments info and corresponding vchannel info
// error is returned only when some communication issue occurs
504 505
func (c *Client) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
506 507 508 509
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
510 511 512
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
513
		return client.GetRecoveryInfo(ctx, req)
514
	})
D
dragondriver 已提交
515 516 517
	if err != nil || ret == nil {
		return nil, err
	}
518 519
	return ret.(*datapb.GetRecoveryInfoResponse), err
}
520

521 522 523 524 525 526 527 528
// GetFlushedSegments returns flushed segment list of requested collection/parition
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//  when partition is lesser or equal to 0, all flushed segments of collection will be returned
//
// response struct `GetFlushedSegmentsResponse` contains flushed segment id list
// error is returned only when some communication issue occurs
529 530
func (c *Client) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
531 532 533 534
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
535 536 537
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
538
		return client.GetFlushedSegments(ctx, req)
539
	})
D
dragondriver 已提交
540 541 542
	if err != nil || ret == nil {
		return nil, err
	}
543 544
	return ret.(*datapb.GetFlushedSegmentsResponse), err
}
545

546
// GetMetrics gets all metrics of datacoord
547 548
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
D
dragondriver 已提交
549 550 551 552
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
553 554 555
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
D
dragondriver 已提交
556
		return client.GetMetrics(ctx, req)
557
	})
D
dragondriver 已提交
558 559 560
	if err != nil || ret == nil {
		return nil, err
	}
561 562
	return ret.(*milvuspb.GetMetricsResponse), err
}
S
sunby 已提交
563 564 565 566 567 568 569

func (c *Client) CompleteCompaction(ctx context.Context, req *datapb.CompactionResult) (*commonpb.Status, error) {
	ret, err := c.recall(func() (interface{}, error) {
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
570 571 572
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
S
sunby 已提交
573 574 575 576 577 578 579 580
		return client.CompleteCompaction(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*commonpb.Status), err
}

581
func (c *Client) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
S
sunby 已提交
582 583 584 585 586
	ret, err := c.recall(func() (interface{}, error) {
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
587 588 589
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
S
sunby 已提交
590 591 592 593 594
		return client.ManualCompaction(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
	}
595
	return ret.(*milvuspb.ManualCompactionResponse), err
S
sunby 已提交
596 597
}

598
func (c *Client) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
S
sunby 已提交
599 600 601 602 603
	ret, err := c.recall(func() (interface{}, error) {
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
604 605 606
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
S
sunby 已提交
607 608 609 610 611
		return client.GetCompactionState(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
	}
612 613 614 615 616 617 618 619 620
	return ret.(*milvuspb.GetCompactionStateResponse), err
}

func (c *Client) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
621 622 623
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
624 625 626 627 628 629
		return client.GetCompactionStateWithPlans(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*milvuspb.GetCompactionPlansResponse), err
S
sunby 已提交
630
}
G
godchen 已提交
631 632 633 634 635 636 637

func (c *Client) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
	ret, err := c.recall(func() (interface{}, error) {
		client, err := c.getGrpcClient()
		if err != nil {
			return nil, err
		}
638 639 640
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
G
godchen 已提交
641 642 643 644 645 646 647
		return client.WatchChannels(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*datapb.WatchChannelsResponse), err
}