client.go 18.5 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
C
cai.zhang 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
C
cai.zhang 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
C
cai.zhang 已提交
16

17
package grpcdatacoordclient
S
sunby 已提交
18 19 20

import (
	"context"
G
godchen 已提交
21
	"fmt"
S
sunby 已提交
22

23
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
24 25 26
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
27 28 29
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/util/funcutil"
	"github.com/milvus-io/milvus/internal/util/grpcclient"
30
	"github.com/milvus-io/milvus/internal/util/paramtable"
31
	"github.com/milvus-io/milvus/internal/util/sessionutil"
32
	"github.com/milvus-io/milvus/internal/util/typeutil"
X
Xiaofan 已提交
33
	clientv3 "go.etcd.io/etcd/client/v3"
34 35
	"go.uber.org/zap"
	"google.golang.org/grpc"
S
sunby 已提交
36 37
)

38
// ClientParams is the parameters of client singleton
39
var ClientParams paramtable.GrpcClientConfig
40

41
// Client is the datacoord grpc client
S
sunby 已提交
42
type Client struct {
43 44
	grpcClient grpcclient.GrpcClient
	sess       *sessionutil.Session
G
godchen 已提交
45 46
}

47
// NewClient creates a new client instance
X
Xiaofan 已提交
48 49
func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (*Client, error) {
	sess := sessionutil.NewSession(ctx, metaRoot, etcdCli)
50 51 52
	if sess == nil {
		err := fmt.Errorf("new session error, maybe can not connect to etcd")
		log.Debug("DataCoordClient NewClient failed", zap.Error(err))
D
dragondriver 已提交
53 54
		return nil, err
	}
55
	ClientParams.InitOnce(typeutil.DataCoordRole)
56 57
	client := &Client{
		grpcClient: &grpcclient.ClientBase{
58 59
			ClientMaxRecvSize: ClientParams.ClientMaxRecvSize,
			ClientMaxSendSize: ClientParams.ClientMaxSendSize,
60 61 62 63 64 65
		},
		sess: sess,
	}
	client.grpcClient.SetRole(typeutil.DataCoordRole)
	client.grpcClient.SetGetAddrFunc(client.getDataCoordAddr)
	client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
D
dragondriver 已提交
66

67
	return client, nil
D
dragondriver 已提交
68 69
}

70 71
func (c *Client) newGrpcClient(cc *grpc.ClientConn) interface{} {
	return datapb.NewDataCoordClient(cc)
D
dragondriver 已提交
72 73
}

74 75 76
func (c *Client) getDataCoordAddr() (string, error) {
	key := c.grpcClient.GetRole()
	msess, _, err := c.sess.GetSessions(key)
G
godchen 已提交
77
	if err != nil {
78
		log.Debug("DataCoordClient, getSessions failed", zap.Any("key", key), zap.Error(err))
G
godchen 已提交
79 80 81 82
		return "", err
	}
	ms, ok := msess[key]
	if !ok {
83 84
		log.Debug("DataCoordClient, not existed in msess ", zap.Any("key", key), zap.Any("len of msess", len(msess)))
		return "", fmt.Errorf("number of datacoord is incorrect, %d", len(msess))
G
godchen 已提交
85 86
	}
	return ms.Address, nil
S
sunby 已提交
87 88
}

S
sunby 已提交
89
// Init initializes the client
S
sunby 已提交
90 91 92 93
func (c *Client) Init() error {
	return nil
}

94
// Start enables the client
S
sunby 已提交
95 96 97 98
func (c *Client) Start() error {
	return nil
}

99
// Stop stops the client
S
sunby 已提交
100
func (c *Client) Stop() error {
101
	return c.grpcClient.Close()
S
sunby 已提交
102 103
}

104
// Register dummy
105 106 107 108
func (c *Client) Register() error {
	return nil
}

109
// GetComponentStates calls DataCoord GetComponentStates services
G
godchen 已提交
110
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
111
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
112 113 114
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
115
		return client.(datapb.DataCoordClient).GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
G
godchen 已提交
116
	})
D
dragondriver 已提交
117 118 119
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
120
	return ret.(*internalpb.ComponentStates), err
S
sunby 已提交
121 122
}

123
// GetTimeTickChannel return the name of time tick channel.
G
godchen 已提交
124
func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
125
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
126 127 128
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
129
		return client.(datapb.DataCoordClient).GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{})
G
godchen 已提交
130
	})
D
dragondriver 已提交
131 132 133
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
134
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
135 136
}

137
// GetStatisticsChannel return the name of statistics channel.
G
godchen 已提交
138
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
139
	ret, err := c.grpcClient.Call(ctx, func(client interface{}) (interface{}, error) {
140 141 142
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
143
		return client.(datapb.DataCoordClient).GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
G
godchen 已提交
144
	})
D
dragondriver 已提交
145 146 147
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
148
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
149 150
}

151
// Flush flushes a collection's data
152
func (c *Client) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
153
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
154 155 156
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
157
		return client.(datapb.DataCoordClient).Flush(ctx, req)
G
godchen 已提交
158
	})
D
dragondriver 已提交
159 160 161
	if err != nil || ret == nil {
		return nil, err
	}
162
	return ret.(*datapb.FlushResponse), err
S
sunby 已提交
163 164
}

165 166 167 168 169 170 171 172 173 174 175 176 177
// AssignSegmentID applies allocations for specified Coolection/Partition and related Channel Name(Virtial Channel)
//
// ctx is the context to control request deadline and cancellation
// req contains the requester's info(id and role) and the list of Assignment Request,
// which coontains the specified collection, partitaion id, the related VChannel Name and row count it needs
//
// response struct `AssignSegmentIDResponse` contains the the assignment result for each request
// error is returned only when some communication issue occurs
// if some error occurs in the process of `AssignSegmentID`, it will be recorded and returned in `Status` field of response
//
// `AssignSegmentID` will applies current configured allocation policies for each request
// if the VChannel is newly used, `WatchDmlChannels` will be invoked to notify a `DataNode`(selected by policy) to watch it
// if there is anything make the allocation impossible, the response will not contain the corresponding result
G
godchen 已提交
178
func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
179
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
180 181 182
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
183
		return client.(datapb.DataCoordClient).AssignSegmentID(ctx, req)
G
godchen 已提交
184
	})
D
dragondriver 已提交
185 186 187
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
188
	return ret.(*datapb.AssignSegmentIDResponse), err
S
sunby 已提交
189 190
}

191 192 193 194 195 196 197 198 199
// GetSegmentStates requests segment state information
//
// ctx is the context to control request deadline and cancellation
// req contains the list of segment id to query
//
// response struct `GetSegmentStatesResponse` contains the list of each state query result
// 	when the segment is not found, the state entry will has the field `Status`  to identify failure
// 	otherwise the Segment State and Start position information will be returned
// error is returned only when some communication issue occurs
G
godchen 已提交
200
func (c *Client) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
201
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
202 203 204
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
205
		return client.(datapb.DataCoordClient).GetSegmentStates(ctx, req)
G
godchen 已提交
206
	})
D
dragondriver 已提交
207 208 209
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
210
	return ret.(*datapb.GetSegmentStatesResponse), err
S
sunby 已提交
211 212
}

213 214 215 216 217 218 219 220
// GetInsertBinlogPaths requests binlog paths for specified segment
//
// ctx is the context to control request deadline and cancellation
// req contains the segment id to query
//
// response struct `GetInsertBinlogPathsResponse` contains the fields list
// 	and corresponding binlog path list
// error is returned only when some communication issue occurs
G
godchen 已提交
221
func (c *Client) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
222
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
223 224 225
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
226
		return client.(datapb.DataCoordClient).GetInsertBinlogPaths(ctx, req)
G
godchen 已提交
227
	})
D
dragondriver 已提交
228 229 230
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
231
	return ret.(*datapb.GetInsertBinlogPathsResponse), err
S
sunby 已提交
232 233
}

234 235 236 237 238 239 240 241
// GetCollectionStatistics requests collection statistics
//
// ctx is the context to control request deadline and cancellation
// req contains the collection id to query
//
// response struct `GetCollectionStatisticsResponse` contains the key-value list fields returning related data
// 	only row count for now
// error is returned only when some communication issue occurs
G
godchen 已提交
242
func (c *Client) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
243
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
244 245 246
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
247
		return client.(datapb.DataCoordClient).GetCollectionStatistics(ctx, req)
G
godchen 已提交
248
	})
D
dragondriver 已提交
249 250 251
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
252
	return ret.(*datapb.GetCollectionStatisticsResponse), err
S
sunby 已提交
253 254
}

255 256 257 258 259 260 261 262
// GetPartitionStatistics requests partition statistics
//
// ctx is the context to control request deadline and cancellation
// req contains the collection and partition id to query
//
// response struct `GetPartitionStatisticsResponse` contains the key-value list fields returning related data
// 	only row count for now
// error is returned only when some communication issue occurs
G
godchen 已提交
263
func (c *Client) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) {
264
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
265 266 267
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
268
		return client.(datapb.DataCoordClient).GetPartitionStatistics(ctx, req)
G
godchen 已提交
269
	})
D
dragondriver 已提交
270 271 272
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
273
	return ret.(*datapb.GetPartitionStatisticsResponse), err
S
sunby 已提交
274
}
N
neza2017 已提交
275

276 277
// GetSegmentInfoChannel DEPRECATED
// legacy api to get SegmentInfo Channel name
G
godchen 已提交
278
func (c *Client) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
279
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
280 281 282
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
283
		return client.(datapb.DataCoordClient).GetSegmentInfoChannel(ctx, &datapb.GetSegmentInfoChannelRequest{})
G
godchen 已提交
284
	})
D
dragondriver 已提交
285 286 287
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
288
	return ret.(*milvuspb.StringResponse), err
N
neza2017 已提交
289
}
290

291 292 293 294 295 296 297
// GetSegmentInfo requests segment info
//
// ctx is the context to control request deadline and cancellation
// req contains the list of segment ids to query
//
// response struct `GetSegmentInfoResponse` contains the list of segment info
// error is returned only when some communication issue occurs
G
godchen 已提交
298
func (c *Client) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
299
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
300 301 302
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
303
		return client.(datapb.DataCoordClient).GetSegmentInfo(ctx, req)
G
godchen 已提交
304
	})
D
dragondriver 已提交
305 306 307
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
308
	return ret.(*datapb.GetSegmentInfoResponse), err
X
XuanYang-cn 已提交
309
}
310

311 312 313 314 315 316 317 318 319 320 321 322
// SaveBinlogPaths updates segments binlogs(including insert binlogs, stats logs and delta logs)
//  and related message stream positions
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//
// response status contains the status/error code and failing reason if any
// error is returned only when some communication issue occurs
//
// there is a constraint that the `SaveBinlogPaths` requests of same segment shall be passed in sequence
// 	the root reason is each `SaveBinlogPaths` will overwrite the checkpoint position
//  if the constraint is broken, the checkpoint position will not be monotonically increasing and the integrity will be compromised
323
func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
324 325 326 327 328 329 330 331 332
	// use Call here on purpose
	ret, err := c.grpcClient.Call(ctx, func(client interface{}) (interface{}, error) {
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
		return client.(datapb.DataCoordClient).SaveBinlogPaths(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
D
dragondriver 已提交
333
	}
334
	return ret.(*commonpb.Status), err
335 336
}

337 338 339 340 341 342 343
// GetRecoveryInfo request segment recovery info of collection/partition
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//
// response struct `GetRecoveryInfoResponse` contains the list of segments info and corresponding vchannel info
// error is returned only when some communication issue occurs
344
func (c *Client) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
345
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
346 347 348
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
349
		return client.(datapb.DataCoordClient).GetRecoveryInfo(ctx, req)
350
	})
D
dragondriver 已提交
351 352 353
	if err != nil || ret == nil {
		return nil, err
	}
354 355
	return ret.(*datapb.GetRecoveryInfoResponse), err
}
356

357 358 359 360 361 362 363 364
// GetFlushedSegments returns flushed segment list of requested collection/parition
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//  when partition is lesser or equal to 0, all flushed segments of collection will be returned
//
// response struct `GetFlushedSegmentsResponse` contains flushed segment id list
// error is returned only when some communication issue occurs
365
func (c *Client) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
366
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
367 368 369
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
370
		return client.(datapb.DataCoordClient).GetFlushedSegments(ctx, req)
371
	})
D
dragondriver 已提交
372 373 374
	if err != nil || ret == nil {
		return nil, err
	}
375 376
	return ret.(*datapb.GetFlushedSegmentsResponse), err
}
377

378
// GetMetrics gets all metrics of datacoord
379
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
380
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
381 382 383
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
384
		return client.(datapb.DataCoordClient).GetMetrics(ctx, req)
385
	})
D
dragondriver 已提交
386 387 388
	if err != nil || ret == nil {
		return nil, err
	}
389 390
	return ret.(*milvuspb.GetMetricsResponse), err
}
S
sunby 已提交
391

392
// CompleteCompaction completes a compaction with the result
S
sunby 已提交
393
func (c *Client) CompleteCompaction(ctx context.Context, req *datapb.CompactionResult) (*commonpb.Status, error) {
394
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
395 396 397
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
398
		return client.(datapb.DataCoordClient).CompleteCompaction(ctx, req)
S
sunby 已提交
399 400 401 402 403 404 405
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*commonpb.Status), err
}

406
// ManualCompaction triggers a compaction for a collection
407
func (c *Client) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
408
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
409 410 411
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
412
		return client.(datapb.DataCoordClient).ManualCompaction(ctx, req)
S
sunby 已提交
413 414 415 416
	})
	if err != nil || ret == nil {
		return nil, err
	}
417
	return ret.(*milvuspb.ManualCompactionResponse), err
S
sunby 已提交
418 419
}

420
// GetCompactionState gets the state of a compaction
421
func (c *Client) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
422
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
423 424 425
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
426
		return client.(datapb.DataCoordClient).GetCompactionState(ctx, req)
S
sunby 已提交
427 428 429 430
	})
	if err != nil || ret == nil {
		return nil, err
	}
431 432 433
	return ret.(*milvuspb.GetCompactionStateResponse), err
}

434
// GetCompactionStateWithPlans gets the state of a compaction by plan
435
func (c *Client) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
436
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
437 438 439
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
440
		return client.(datapb.DataCoordClient).GetCompactionStateWithPlans(ctx, req)
441 442 443 444 445
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*milvuspb.GetCompactionPlansResponse), err
S
sunby 已提交
446
}
G
godchen 已提交
447

448
// WatchChannels notifies DataCoord to watch vchannels of a collection
G
godchen 已提交
449
func (c *Client) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
450
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
451 452 453
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
454
		return client.(datapb.DataCoordClient).WatchChannels(ctx, req)
G
godchen 已提交
455 456 457 458 459 460
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*datapb.WatchChannelsResponse), err
}
B
Bingyi Sun 已提交
461 462 463

// GetFlushState gets the flush state of multiple segments
func (c *Client) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
464
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
B
Bingyi Sun 已提交
465 466 467
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
468
		return client.(datapb.DataCoordClient).GetFlushState(ctx, req)
B
Bingyi Sun 已提交
469 470 471 472 473 474
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*milvuspb.GetFlushStateResponse), err
}
475 476 477

// DropVirtualChannel drops virtual channel in datacoord.
func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
478
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
479 480 481
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
482
		return client.(datapb.DataCoordClient).DropVirtualChannel(ctx, req)
483 484 485 486 487 488
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*datapb.DropVirtualChannelResponse), err
}