client.go 18.1 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
C
cai.zhang 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
C
cai.zhang 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
C
cai.zhang 已提交
16

17
package grpcdatacoordclient
S
sunby 已提交
18 19 20

import (
	"context"
G
godchen 已提交
21
	"fmt"
S
sunby 已提交
22

23
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
24 25 26
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
27 28 29
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/util/funcutil"
	"github.com/milvus-io/milvus/internal/util/grpcclient"
30
	"github.com/milvus-io/milvus/internal/util/paramtable"
31
	"github.com/milvus-io/milvus/internal/util/sessionutil"
32
	"github.com/milvus-io/milvus/internal/util/typeutil"
X
Xiaofan 已提交
33
	clientv3 "go.etcd.io/etcd/client/v3"
34 35
	"go.uber.org/zap"
	"google.golang.org/grpc"
S
sunby 已提交
36 37
)

38
var ClientParams paramtable.GrpcClientConfig
39

40
// Client is the datacoord grpc client
S
sunby 已提交
41
type Client struct {
42 43
	grpcClient grpcclient.GrpcClient
	sess       *sessionutil.Session
G
godchen 已提交
44 45
}

46
// NewClient creates a new client instance
X
Xiaofan 已提交
47 48
func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (*Client, error) {
	sess := sessionutil.NewSession(ctx, metaRoot, etcdCli)
49 50 51
	if sess == nil {
		err := fmt.Errorf("new session error, maybe can not connect to etcd")
		log.Debug("DataCoordClient NewClient failed", zap.Error(err))
D
dragondriver 已提交
52 53
		return nil, err
	}
54
	ClientParams.InitOnce(typeutil.DataCoordRole)
55 56
	client := &Client{
		grpcClient: &grpcclient.ClientBase{
57 58
			ClientMaxRecvSize: ClientParams.ClientMaxRecvSize,
			ClientMaxSendSize: ClientParams.ClientMaxSendSize,
59 60 61 62 63 64
		},
		sess: sess,
	}
	client.grpcClient.SetRole(typeutil.DataCoordRole)
	client.grpcClient.SetGetAddrFunc(client.getDataCoordAddr)
	client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
D
dragondriver 已提交
65

66
	return client, nil
D
dragondriver 已提交
67 68
}

69 70
func (c *Client) newGrpcClient(cc *grpc.ClientConn) interface{} {
	return datapb.NewDataCoordClient(cc)
D
dragondriver 已提交
71 72
}

73 74 75
func (c *Client) getDataCoordAddr() (string, error) {
	key := c.grpcClient.GetRole()
	msess, _, err := c.sess.GetSessions(key)
G
godchen 已提交
76
	if err != nil {
77
		log.Debug("DataCoordClient, getSessions failed", zap.Any("key", key), zap.Error(err))
G
godchen 已提交
78 79 80 81
		return "", err
	}
	ms, ok := msess[key]
	if !ok {
82 83
		log.Debug("DataCoordClient, not existed in msess ", zap.Any("key", key), zap.Any("len of msess", len(msess)))
		return "", fmt.Errorf("number of datacoord is incorrect, %d", len(msess))
G
godchen 已提交
84 85
	}
	return ms.Address, nil
S
sunby 已提交
86 87
}

S
sunby 已提交
88
// Init initializes the client
S
sunby 已提交
89 90 91 92
func (c *Client) Init() error {
	return nil
}

93
// Start enables the client
S
sunby 已提交
94 95 96 97
func (c *Client) Start() error {
	return nil
}

98
// Stop stops the client
S
sunby 已提交
99
func (c *Client) Stop() error {
100
	return c.grpcClient.Close()
S
sunby 已提交
101 102
}

103
// Register dummy
104 105 106 107
func (c *Client) Register() error {
	return nil
}

108
// GetComponentStates calls DataCoord GetComponentStates services
G
godchen 已提交
109
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
110
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
111 112 113
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
114
		return client.(datapb.DataCoordClient).GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
G
godchen 已提交
115
	})
D
dragondriver 已提交
116 117 118
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
119
	return ret.(*internalpb.ComponentStates), err
S
sunby 已提交
120 121
}

122
// GetTimeTickChannel return the name of time tick channel.
G
godchen 已提交
123
func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
124
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
125 126 127
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
128
		return client.(datapb.DataCoordClient).GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{})
G
godchen 已提交
129
	})
D
dragondriver 已提交
130 131 132
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
133
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
134 135
}

136
// GetStatisticsChannel return the name of statistics channel.
G
godchen 已提交
137
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
138
	ret, err := c.grpcClient.Call(ctx, func(client interface{}) (interface{}, error) {
139 140 141
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
142
		return client.(datapb.DataCoordClient).GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
G
godchen 已提交
143
	})
D
dragondriver 已提交
144 145 146
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
147
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
148 149
}

150
func (c *Client) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
151
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
152 153 154
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
155
		return client.(datapb.DataCoordClient).Flush(ctx, req)
G
godchen 已提交
156
	})
D
dragondriver 已提交
157 158 159
	if err != nil || ret == nil {
		return nil, err
	}
160
	return ret.(*datapb.FlushResponse), err
S
sunby 已提交
161 162
}

163 164 165 166 167 168 169 170 171 172 173 174 175
// AssignSegmentID applies allocations for specified Coolection/Partition and related Channel Name(Virtial Channel)
//
// ctx is the context to control request deadline and cancellation
// req contains the requester's info(id and role) and the list of Assignment Request,
// which coontains the specified collection, partitaion id, the related VChannel Name and row count it needs
//
// response struct `AssignSegmentIDResponse` contains the the assignment result for each request
// error is returned only when some communication issue occurs
// if some error occurs in the process of `AssignSegmentID`, it will be recorded and returned in `Status` field of response
//
// `AssignSegmentID` will applies current configured allocation policies for each request
// if the VChannel is newly used, `WatchDmlChannels` will be invoked to notify a `DataNode`(selected by policy) to watch it
// if there is anything make the allocation impossible, the response will not contain the corresponding result
G
godchen 已提交
176
func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
177
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
178 179 180
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
181
		return client.(datapb.DataCoordClient).AssignSegmentID(ctx, req)
G
godchen 已提交
182
	})
D
dragondriver 已提交
183 184 185
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
186
	return ret.(*datapb.AssignSegmentIDResponse), err
S
sunby 已提交
187 188
}

189 190 191 192 193 194 195 196 197
// GetSegmentStates requests segment state information
//
// ctx is the context to control request deadline and cancellation
// req contains the list of segment id to query
//
// response struct `GetSegmentStatesResponse` contains the list of each state query result
// 	when the segment is not found, the state entry will has the field `Status`  to identify failure
// 	otherwise the Segment State and Start position information will be returned
// error is returned only when some communication issue occurs
G
godchen 已提交
198
func (c *Client) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
199
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
200 201 202
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
203
		return client.(datapb.DataCoordClient).GetSegmentStates(ctx, req)
G
godchen 已提交
204
	})
D
dragondriver 已提交
205 206 207
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
208
	return ret.(*datapb.GetSegmentStatesResponse), err
S
sunby 已提交
209 210
}

211 212 213 214 215 216 217 218
// GetInsertBinlogPaths requests binlog paths for specified segment
//
// ctx is the context to control request deadline and cancellation
// req contains the segment id to query
//
// response struct `GetInsertBinlogPathsResponse` contains the fields list
// 	and corresponding binlog path list
// error is returned only when some communication issue occurs
G
godchen 已提交
219
func (c *Client) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
220
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
221 222 223
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
224
		return client.(datapb.DataCoordClient).GetInsertBinlogPaths(ctx, req)
G
godchen 已提交
225
	})
D
dragondriver 已提交
226 227 228
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
229
	return ret.(*datapb.GetInsertBinlogPathsResponse), err
S
sunby 已提交
230 231
}

232 233 234 235 236 237 238 239
// GetCollectionStatistics requests collection statistics
//
// ctx is the context to control request deadline and cancellation
// req contains the collection id to query
//
// response struct `GetCollectionStatisticsResponse` contains the key-value list fields returning related data
// 	only row count for now
// error is returned only when some communication issue occurs
G
godchen 已提交
240
func (c *Client) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
241
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
242 243 244
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
245
		return client.(datapb.DataCoordClient).GetCollectionStatistics(ctx, req)
G
godchen 已提交
246
	})
D
dragondriver 已提交
247 248 249
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
250
	return ret.(*datapb.GetCollectionStatisticsResponse), err
S
sunby 已提交
251 252
}

253 254 255 256 257 258 259 260
// GetPartitionStatistics requests partition statistics
//
// ctx is the context to control request deadline and cancellation
// req contains the collection and partition id to query
//
// response struct `GetPartitionStatisticsResponse` contains the key-value list fields returning related data
// 	only row count for now
// error is returned only when some communication issue occurs
G
godchen 已提交
261
func (c *Client) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) {
262
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
263 264 265
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
266
		return client.(datapb.DataCoordClient).GetPartitionStatistics(ctx, req)
G
godchen 已提交
267
	})
D
dragondriver 已提交
268 269 270
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
271
	return ret.(*datapb.GetPartitionStatisticsResponse), err
S
sunby 已提交
272
}
N
neza2017 已提交
273

274 275
// GetSegmentInfoChannel DEPRECATED
// legacy api to get SegmentInfo Channel name
G
godchen 已提交
276
func (c *Client) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
277
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
278 279 280
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
281
		return client.(datapb.DataCoordClient).GetSegmentInfoChannel(ctx, &datapb.GetSegmentInfoChannelRequest{})
G
godchen 已提交
282
	})
D
dragondriver 已提交
283 284 285
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
286
	return ret.(*milvuspb.StringResponse), err
N
neza2017 已提交
287
}
288

289 290 291 292 293 294 295
// GetSegmentInfo requests segment info
//
// ctx is the context to control request deadline and cancellation
// req contains the list of segment ids to query
//
// response struct `GetSegmentInfoResponse` contains the list of segment info
// error is returned only when some communication issue occurs
G
godchen 已提交
296
func (c *Client) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
297
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
298 299 300
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
301
		return client.(datapb.DataCoordClient).GetSegmentInfo(ctx, req)
G
godchen 已提交
302
	})
D
dragondriver 已提交
303 304 305
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
306
	return ret.(*datapb.GetSegmentInfoResponse), err
X
XuanYang-cn 已提交
307
}
308

309 310 311 312 313 314 315 316 317 318 319 320
// SaveBinlogPaths updates segments binlogs(including insert binlogs, stats logs and delta logs)
//  and related message stream positions
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//
// response status contains the status/error code and failing reason if any
// error is returned only when some communication issue occurs
//
// there is a constraint that the `SaveBinlogPaths` requests of same segment shall be passed in sequence
// 	the root reason is each `SaveBinlogPaths` will overwrite the checkpoint position
//  if the constraint is broken, the checkpoint position will not be monotonically increasing and the integrity will be compromised
321
func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
322 323 324 325 326 327 328 329 330
	// use Call here on purpose
	ret, err := c.grpcClient.Call(ctx, func(client interface{}) (interface{}, error) {
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
		return client.(datapb.DataCoordClient).SaveBinlogPaths(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
D
dragondriver 已提交
331
	}
332
	return ret.(*commonpb.Status), err
333 334
}

335 336 337 338 339 340 341
// GetRecoveryInfo request segment recovery info of collection/partition
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//
// response struct `GetRecoveryInfoResponse` contains the list of segments info and corresponding vchannel info
// error is returned only when some communication issue occurs
342
func (c *Client) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
343
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
344 345 346
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
347
		return client.(datapb.DataCoordClient).GetRecoveryInfo(ctx, req)
348
	})
D
dragondriver 已提交
349 350 351
	if err != nil || ret == nil {
		return nil, err
	}
352 353
	return ret.(*datapb.GetRecoveryInfoResponse), err
}
354

355 356 357 358 359 360 361 362
// GetFlushedSegments returns flushed segment list of requested collection/parition
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//  when partition is lesser or equal to 0, all flushed segments of collection will be returned
//
// response struct `GetFlushedSegmentsResponse` contains flushed segment id list
// error is returned only when some communication issue occurs
363
func (c *Client) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
364
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
365 366 367
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
368
		return client.(datapb.DataCoordClient).GetFlushedSegments(ctx, req)
369
	})
D
dragondriver 已提交
370 371 372
	if err != nil || ret == nil {
		return nil, err
	}
373 374
	return ret.(*datapb.GetFlushedSegmentsResponse), err
}
375

376
// GetMetrics gets all metrics of datacoord
377
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
378
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
379 380 381
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
382
		return client.(datapb.DataCoordClient).GetMetrics(ctx, req)
383
	})
D
dragondriver 已提交
384 385 386
	if err != nil || ret == nil {
		return nil, err
	}
387 388
	return ret.(*milvuspb.GetMetricsResponse), err
}
S
sunby 已提交
389 390

func (c *Client) CompleteCompaction(ctx context.Context, req *datapb.CompactionResult) (*commonpb.Status, error) {
391
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
392 393 394
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
395
		return client.(datapb.DataCoordClient).CompleteCompaction(ctx, req)
S
sunby 已提交
396 397 398 399 400 401 402
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*commonpb.Status), err
}

403
func (c *Client) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
404
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
405 406 407
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
408
		return client.(datapb.DataCoordClient).ManualCompaction(ctx, req)
S
sunby 已提交
409 410 411 412
	})
	if err != nil || ret == nil {
		return nil, err
	}
413
	return ret.(*milvuspb.ManualCompactionResponse), err
S
sunby 已提交
414 415
}

416
func (c *Client) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
417
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
418 419 420
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
421
		return client.(datapb.DataCoordClient).GetCompactionState(ctx, req)
S
sunby 已提交
422 423 424 425
	})
	if err != nil || ret == nil {
		return nil, err
	}
426 427 428 429
	return ret.(*milvuspb.GetCompactionStateResponse), err
}

func (c *Client) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
430
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
431 432 433
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
434
		return client.(datapb.DataCoordClient).GetCompactionStateWithPlans(ctx, req)
435 436 437 438 439
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*milvuspb.GetCompactionPlansResponse), err
S
sunby 已提交
440
}
G
godchen 已提交
441

442
// WatchChannels notifies DataCoord to watch vchannels of a collection
G
godchen 已提交
443
func (c *Client) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
444
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
445 446 447
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
448
		return client.(datapb.DataCoordClient).WatchChannels(ctx, req)
G
godchen 已提交
449 450 451 452 453 454
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*datapb.WatchChannelsResponse), err
}
B
Bingyi Sun 已提交
455 456 457

// GetFlushState gets the flush state of multiple segments
func (c *Client) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
458
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
B
Bingyi Sun 已提交
459 460 461
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
462
		return client.(datapb.DataCoordClient).GetFlushState(ctx, req)
B
Bingyi Sun 已提交
463 464 465 466 467 468
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*milvuspb.GetFlushStateResponse), err
}
469 470 471

// DropVirtualChannel drops virtual channel in datacoord.
func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
472
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
473 474 475
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
476
		return client.(datapb.DataCoordClient).DropVirtualChannel(ctx, req)
477 478 479 480 481 482
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*datapb.DropVirtualChannelResponse), err
}