client.go 18.1 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
C
cai.zhang 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
C
cai.zhang 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
C
cai.zhang 已提交
16

17
package grpcdatacoordclient
S
sunby 已提交
18 19 20

import (
	"context"
G
godchen 已提交
21
	"fmt"
S
sunby 已提交
22

23
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
24 25 26
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
27 28 29
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/util/funcutil"
	"github.com/milvus-io/milvus/internal/util/grpcclient"
30
	"github.com/milvus-io/milvus/internal/util/paramtable"
31
	"github.com/milvus-io/milvus/internal/util/sessionutil"
32
	"github.com/milvus-io/milvus/internal/util/typeutil"
33 34
	"go.uber.org/zap"
	"google.golang.org/grpc"
S
sunby 已提交
35 36
)

37
var ClientParams paramtable.GrpcClientConfig
38

39
// Client is the datacoord grpc client
S
sunby 已提交
40
type Client struct {
41 42
	grpcClient grpcclient.GrpcClient
	sess       *sessionutil.Session
G
godchen 已提交
43 44
}

45 46 47 48 49 50
// NewClient creates a new client instance
func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*Client, error) {
	sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints)
	if sess == nil {
		err := fmt.Errorf("new session error, maybe can not connect to etcd")
		log.Debug("DataCoordClient NewClient failed", zap.Error(err))
D
dragondriver 已提交
51 52
		return nil, err
	}
53
	ClientParams.InitOnce(typeutil.DataCoordRole)
54 55
	client := &Client{
		grpcClient: &grpcclient.ClientBase{
56 57
			ClientMaxRecvSize: ClientParams.ClientMaxRecvSize,
			ClientMaxSendSize: ClientParams.ClientMaxSendSize,
58 59 60 61 62 63
		},
		sess: sess,
	}
	client.grpcClient.SetRole(typeutil.DataCoordRole)
	client.grpcClient.SetGetAddrFunc(client.getDataCoordAddr)
	client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
D
dragondriver 已提交
64

65
	return client, nil
D
dragondriver 已提交
66 67
}

68 69
func (c *Client) newGrpcClient(cc *grpc.ClientConn) interface{} {
	return datapb.NewDataCoordClient(cc)
D
dragondriver 已提交
70 71
}

72 73 74
func (c *Client) getDataCoordAddr() (string, error) {
	key := c.grpcClient.GetRole()
	msess, _, err := c.sess.GetSessions(key)
G
godchen 已提交
75
	if err != nil {
76
		log.Debug("DataCoordClient, getSessions failed", zap.Any("key", key), zap.Error(err))
G
godchen 已提交
77 78 79 80
		return "", err
	}
	ms, ok := msess[key]
	if !ok {
81 82
		log.Debug("DataCoordClient, not existed in msess ", zap.Any("key", key), zap.Any("len of msess", len(msess)))
		return "", fmt.Errorf("number of datacoord is incorrect, %d", len(msess))
G
godchen 已提交
83 84
	}
	return ms.Address, nil
S
sunby 已提交
85 86
}

S
sunby 已提交
87
// Init initializes the client
S
sunby 已提交
88 89 90 91
func (c *Client) Init() error {
	return nil
}

92
// Start enables the client
S
sunby 已提交
93 94 95 96 97
func (c *Client) Start() error {
	return nil
}

func (c *Client) Stop() error {
98
	return c.grpcClient.Close()
S
sunby 已提交
99 100
}

101
// Register dummy
102 103 104 105
func (c *Client) Register() error {
	return nil
}

106
// GetComponentStates calls DataCoord GetComponentStates services
G
godchen 已提交
107
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
108
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
109 110 111
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
112
		return client.(datapb.DataCoordClient).GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
G
godchen 已提交
113
	})
D
dragondriver 已提交
114 115 116
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
117
	return ret.(*internalpb.ComponentStates), err
S
sunby 已提交
118 119
}

120
// GetTimeTickChannel return the name of time tick channel.
G
godchen 已提交
121
func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
122
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
123 124 125
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
126
		return client.(datapb.DataCoordClient).GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{})
G
godchen 已提交
127
	})
D
dragondriver 已提交
128 129 130
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
131
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
132 133
}

134
// GetStatisticsChannel return the name of statistics channel.
G
godchen 已提交
135
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
136
	ret, err := c.grpcClient.Call(ctx, func(client interface{}) (interface{}, error) {
137 138 139
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
140
		return client.(datapb.DataCoordClient).GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
G
godchen 已提交
141
	})
D
dragondriver 已提交
142 143 144
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
145
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
146 147
}

148
func (c *Client) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
149
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
150 151 152
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
153
		return client.(datapb.DataCoordClient).Flush(ctx, req)
G
godchen 已提交
154
	})
D
dragondriver 已提交
155 156 157
	if err != nil || ret == nil {
		return nil, err
	}
158
	return ret.(*datapb.FlushResponse), err
S
sunby 已提交
159 160
}

161 162 163 164 165 166 167 168 169 170 171 172 173
// AssignSegmentID applies allocations for specified Coolection/Partition and related Channel Name(Virtial Channel)
//
// ctx is the context to control request deadline and cancellation
// req contains the requester's info(id and role) and the list of Assignment Request,
// which coontains the specified collection, partitaion id, the related VChannel Name and row count it needs
//
// response struct `AssignSegmentIDResponse` contains the the assignment result for each request
// error is returned only when some communication issue occurs
// if some error occurs in the process of `AssignSegmentID`, it will be recorded and returned in `Status` field of response
//
// `AssignSegmentID` will applies current configured allocation policies for each request
// if the VChannel is newly used, `WatchDmlChannels` will be invoked to notify a `DataNode`(selected by policy) to watch it
// if there is anything make the allocation impossible, the response will not contain the corresponding result
G
godchen 已提交
174
func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
175
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
176 177 178
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
179
		return client.(datapb.DataCoordClient).AssignSegmentID(ctx, req)
G
godchen 已提交
180
	})
D
dragondriver 已提交
181 182 183
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
184
	return ret.(*datapb.AssignSegmentIDResponse), err
S
sunby 已提交
185 186
}

187 188 189 190 191 192 193 194 195
// GetSegmentStates requests segment state information
//
// ctx is the context to control request deadline and cancellation
// req contains the list of segment id to query
//
// response struct `GetSegmentStatesResponse` contains the list of each state query result
// 	when the segment is not found, the state entry will has the field `Status`  to identify failure
// 	otherwise the Segment State and Start position information will be returned
// error is returned only when some communication issue occurs
G
godchen 已提交
196
func (c *Client) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
197
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
198 199 200
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
201
		return client.(datapb.DataCoordClient).GetSegmentStates(ctx, req)
G
godchen 已提交
202
	})
D
dragondriver 已提交
203 204 205
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
206
	return ret.(*datapb.GetSegmentStatesResponse), err
S
sunby 已提交
207 208
}

209 210 211 212 213 214 215 216
// GetInsertBinlogPaths requests binlog paths for specified segment
//
// ctx is the context to control request deadline and cancellation
// req contains the segment id to query
//
// response struct `GetInsertBinlogPathsResponse` contains the fields list
// 	and corresponding binlog path list
// error is returned only when some communication issue occurs
G
godchen 已提交
217
func (c *Client) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
218
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
219 220 221
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
222
		return client.(datapb.DataCoordClient).GetInsertBinlogPaths(ctx, req)
G
godchen 已提交
223
	})
D
dragondriver 已提交
224 225 226
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
227
	return ret.(*datapb.GetInsertBinlogPathsResponse), err
S
sunby 已提交
228 229
}

230 231 232 233 234 235 236 237
// GetCollectionStatistics requests collection statistics
//
// ctx is the context to control request deadline and cancellation
// req contains the collection id to query
//
// response struct `GetCollectionStatisticsResponse` contains the key-value list fields returning related data
// 	only row count for now
// error is returned only when some communication issue occurs
G
godchen 已提交
238
func (c *Client) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
239
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
240 241 242
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
243
		return client.(datapb.DataCoordClient).GetCollectionStatistics(ctx, req)
G
godchen 已提交
244
	})
D
dragondriver 已提交
245 246 247
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
248
	return ret.(*datapb.GetCollectionStatisticsResponse), err
S
sunby 已提交
249 250
}

251 252 253 254 255 256 257 258
// GetPartitionStatistics requests partition statistics
//
// ctx is the context to control request deadline and cancellation
// req contains the collection and partition id to query
//
// response struct `GetPartitionStatisticsResponse` contains the key-value list fields returning related data
// 	only row count for now
// error is returned only when some communication issue occurs
G
godchen 已提交
259
func (c *Client) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) {
260
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
261 262 263
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
264
		return client.(datapb.DataCoordClient).GetPartitionStatistics(ctx, req)
G
godchen 已提交
265
	})
D
dragondriver 已提交
266 267 268
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
269
	return ret.(*datapb.GetPartitionStatisticsResponse), err
S
sunby 已提交
270
}
N
neza2017 已提交
271

272 273
// GetSegmentInfoChannel DEPRECATED
// legacy api to get SegmentInfo Channel name
G
godchen 已提交
274
func (c *Client) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
275
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
276 277 278
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
279
		return client.(datapb.DataCoordClient).GetSegmentInfoChannel(ctx, &datapb.GetSegmentInfoChannelRequest{})
G
godchen 已提交
280
	})
D
dragondriver 已提交
281 282 283
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
284
	return ret.(*milvuspb.StringResponse), err
N
neza2017 已提交
285
}
286

287 288 289 290 291 292 293
// GetSegmentInfo requests segment info
//
// ctx is the context to control request deadline and cancellation
// req contains the list of segment ids to query
//
// response struct `GetSegmentInfoResponse` contains the list of segment info
// error is returned only when some communication issue occurs
G
godchen 已提交
294
func (c *Client) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
295
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
296 297 298
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
299
		return client.(datapb.DataCoordClient).GetSegmentInfo(ctx, req)
G
godchen 已提交
300
	})
D
dragondriver 已提交
301 302 303
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
304
	return ret.(*datapb.GetSegmentInfoResponse), err
X
XuanYang-cn 已提交
305
}
306

307 308 309 310 311 312 313 314 315 316 317 318
// SaveBinlogPaths updates segments binlogs(including insert binlogs, stats logs and delta logs)
//  and related message stream positions
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//
// response status contains the status/error code and failing reason if any
// error is returned only when some communication issue occurs
//
// there is a constraint that the `SaveBinlogPaths` requests of same segment shall be passed in sequence
// 	the root reason is each `SaveBinlogPaths` will overwrite the checkpoint position
//  if the constraint is broken, the checkpoint position will not be monotonically increasing and the integrity will be compromised
319
func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
320 321 322 323 324 325 326 327 328
	// use Call here on purpose
	ret, err := c.grpcClient.Call(ctx, func(client interface{}) (interface{}, error) {
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
		return client.(datapb.DataCoordClient).SaveBinlogPaths(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
D
dragondriver 已提交
329
	}
330
	return ret.(*commonpb.Status), err
331 332
}

333 334 335 336 337 338 339
// GetRecoveryInfo request segment recovery info of collection/partition
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//
// response struct `GetRecoveryInfoResponse` contains the list of segments info and corresponding vchannel info
// error is returned only when some communication issue occurs
340
func (c *Client) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
341
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
342 343 344
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
345
		return client.(datapb.DataCoordClient).GetRecoveryInfo(ctx, req)
346
	})
D
dragondriver 已提交
347 348 349
	if err != nil || ret == nil {
		return nil, err
	}
350 351
	return ret.(*datapb.GetRecoveryInfoResponse), err
}
352

353 354 355 356 357 358 359 360
// GetFlushedSegments returns flushed segment list of requested collection/parition
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//  when partition is lesser or equal to 0, all flushed segments of collection will be returned
//
// response struct `GetFlushedSegmentsResponse` contains flushed segment id list
// error is returned only when some communication issue occurs
361
func (c *Client) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
362
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
363 364 365
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
366
		return client.(datapb.DataCoordClient).GetFlushedSegments(ctx, req)
367
	})
D
dragondriver 已提交
368 369 370
	if err != nil || ret == nil {
		return nil, err
	}
371 372
	return ret.(*datapb.GetFlushedSegmentsResponse), err
}
373

374
// GetMetrics gets all metrics of datacoord
375
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
376
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
377 378 379
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
380
		return client.(datapb.DataCoordClient).GetMetrics(ctx, req)
381
	})
D
dragondriver 已提交
382 383 384
	if err != nil || ret == nil {
		return nil, err
	}
385 386
	return ret.(*milvuspb.GetMetricsResponse), err
}
S
sunby 已提交
387 388

func (c *Client) CompleteCompaction(ctx context.Context, req *datapb.CompactionResult) (*commonpb.Status, error) {
389
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
390 391 392
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
393
		return client.(datapb.DataCoordClient).CompleteCompaction(ctx, req)
S
sunby 已提交
394 395 396 397 398 399 400
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*commonpb.Status), err
}

401
func (c *Client) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
402
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
403 404 405
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
406
		return client.(datapb.DataCoordClient).ManualCompaction(ctx, req)
S
sunby 已提交
407 408 409 410
	})
	if err != nil || ret == nil {
		return nil, err
	}
411
	return ret.(*milvuspb.ManualCompactionResponse), err
S
sunby 已提交
412 413
}

414
func (c *Client) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
415
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
416 417 418
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
419
		return client.(datapb.DataCoordClient).GetCompactionState(ctx, req)
S
sunby 已提交
420 421 422 423
	})
	if err != nil || ret == nil {
		return nil, err
	}
424 425 426 427
	return ret.(*milvuspb.GetCompactionStateResponse), err
}

func (c *Client) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
428
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
429 430 431
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
432
		return client.(datapb.DataCoordClient).GetCompactionStateWithPlans(ctx, req)
433 434 435 436 437
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*milvuspb.GetCompactionPlansResponse), err
S
sunby 已提交
438
}
G
godchen 已提交
439

440
// WatchChannels notifies DataCoord to watch vchannels of a collection
G
godchen 已提交
441
func (c *Client) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
442
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
443 444 445
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
446
		return client.(datapb.DataCoordClient).WatchChannels(ctx, req)
G
godchen 已提交
447 448 449 450 451 452
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*datapb.WatchChannelsResponse), err
}
B
Bingyi Sun 已提交
453 454 455

// GetFlushState gets the flush state of multiple segments
func (c *Client) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
456
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
B
Bingyi Sun 已提交
457 458 459
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
460
		return client.(datapb.DataCoordClient).GetFlushState(ctx, req)
B
Bingyi Sun 已提交
461 462 463 464 465 466
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*milvuspb.GetFlushStateResponse), err
}
467 468 469

// DropVirtualChannel drops virtual channel in datacoord.
func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
470
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
471 472 473
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
474
		return client.(datapb.DataCoordClient).DropVirtualChannel(ctx, req)
475 476 477 478 479 480
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*datapb.DropVirtualChannelResponse), err
}