client.go 18.0 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
C
cai.zhang 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
C
cai.zhang 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
C
cai.zhang 已提交
16

17
package grpcdatacoordclient
S
sunby 已提交
18 19 20

import (
	"context"
G
godchen 已提交
21
	"fmt"
S
sunby 已提交
22

G
godchen 已提交
23
	"github.com/milvus-io/milvus/internal/util/typeutil"
G
godchen 已提交
24
	"go.uber.org/zap"
S
sunby 已提交
25
	"google.golang.org/grpc"
S
sunby 已提交
26

27
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
28 29 30
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
31 32 33 34
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/util/funcutil"
	"github.com/milvus-io/milvus/internal/util/grpcclient"
	"github.com/milvus-io/milvus/internal/util/sessionutil"
S
sunby 已提交
35 36
)

37
// Client is the datacoord grpc client
S
sunby 已提交
38
type Client struct {
39 40
	grpcClient grpcclient.GrpcClient
	sess       *sessionutil.Session
G
godchen 已提交
41 42
}

43 44 45 46 47 48
// NewClient creates a new client instance
func NewClient(ctx context.Context, metaRoot string, etcdEndpoints []string) (*Client, error) {
	sess := sessionutil.NewSession(ctx, metaRoot, etcdEndpoints)
	if sess == nil {
		err := fmt.Errorf("new session error, maybe can not connect to etcd")
		log.Debug("DataCoordClient NewClient failed", zap.Error(err))
D
dragondriver 已提交
49 50
		return nil, err
	}
51 52 53 54 55 56 57 58 59 60 61
	Params.Init()
	client := &Client{
		grpcClient: &grpcclient.ClientBase{
			ClientMaxRecvSize: Params.ClientMaxRecvSize,
			ClientMaxSendSize: Params.ClientMaxSendSize,
		},
		sess: sess,
	}
	client.grpcClient.SetRole(typeutil.DataCoordRole)
	client.grpcClient.SetGetAddrFunc(client.getDataCoordAddr)
	client.grpcClient.SetNewGrpcClientFunc(client.newGrpcClient)
D
dragondriver 已提交
62

63
	return client, nil
D
dragondriver 已提交
64 65
}

66 67
func (c *Client) newGrpcClient(cc *grpc.ClientConn) interface{} {
	return datapb.NewDataCoordClient(cc)
D
dragondriver 已提交
68 69
}

70 71 72
func (c *Client) getDataCoordAddr() (string, error) {
	key := c.grpcClient.GetRole()
	msess, _, err := c.sess.GetSessions(key)
G
godchen 已提交
73
	if err != nil {
74
		log.Debug("DataCoordClient, getSessions failed", zap.Any("key", key), zap.Error(err))
G
godchen 已提交
75 76 77 78
		return "", err
	}
	ms, ok := msess[key]
	if !ok {
79 80
		log.Debug("DataCoordClient, not existed in msess ", zap.Any("key", key), zap.Any("len of msess", len(msess)))
		return "", fmt.Errorf("number of datacoord is incorrect, %d", len(msess))
G
godchen 已提交
81 82
	}
	return ms.Address, nil
S
sunby 已提交
83 84
}

S
sunby 已提交
85
// Init initializes the client
S
sunby 已提交
86 87 88 89
func (c *Client) Init() error {
	return nil
}

90
// Start enables the client
S
sunby 已提交
91 92 93 94 95
func (c *Client) Start() error {
	return nil
}

func (c *Client) Stop() error {
96
	return c.grpcClient.Close()
S
sunby 已提交
97 98
}

99
// Register dummy
100 101 102 103
func (c *Client) Register() error {
	return nil
}

G
godchen 已提交
104
func (c *Client) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
105 106
	log.Debug("ABC", zap.Any("ctx", ctx), zap.Any("func", c.grpcClient.ReCall))
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
107 108 109
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
110 111
		log.Debug("ABC", zap.Any("client", client))
		return client.(datapb.DataCoordClient).GetComponentStates(ctx, &internalpb.GetComponentStatesRequest{})
G
godchen 已提交
112
	})
D
dragondriver 已提交
113 114 115
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
116
	return ret.(*internalpb.ComponentStates), err
S
sunby 已提交
117 118
}

119
// GetTimeTickChannel return the name of time tick channel.
G
godchen 已提交
120
func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
121
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
122 123 124
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
125
		return client.(datapb.DataCoordClient).GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{})
G
godchen 已提交
126
	})
D
dragondriver 已提交
127 128 129
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
130
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
131 132
}

133
// GetStatisticsChannel return the name of statistics channel.
G
godchen 已提交
134
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
135
	ret, err := c.grpcClient.Call(ctx, func(client interface{}) (interface{}, error) {
136 137 138
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
139
		return client.(datapb.DataCoordClient).GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
G
godchen 已提交
140
	})
D
dragondriver 已提交
141 142 143
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
144
	return ret.(*milvuspb.StringResponse), err
S
sunby 已提交
145 146
}

147
func (c *Client) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
148
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
149 150 151
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
152
		return client.(datapb.DataCoordClient).Flush(ctx, req)
G
godchen 已提交
153
	})
D
dragondriver 已提交
154 155 156
	if err != nil || ret == nil {
		return nil, err
	}
157
	return ret.(*datapb.FlushResponse), err
S
sunby 已提交
158 159
}

160 161 162 163 164 165 166 167 168 169 170 171 172
// AssignSegmentID applies allocations for specified Coolection/Partition and related Channel Name(Virtial Channel)
//
// ctx is the context to control request deadline and cancellation
// req contains the requester's info(id and role) and the list of Assignment Request,
// which coontains the specified collection, partitaion id, the related VChannel Name and row count it needs
//
// response struct `AssignSegmentIDResponse` contains the the assignment result for each request
// error is returned only when some communication issue occurs
// if some error occurs in the process of `AssignSegmentID`, it will be recorded and returned in `Status` field of response
//
// `AssignSegmentID` will applies current configured allocation policies for each request
// if the VChannel is newly used, `WatchDmlChannels` will be invoked to notify a `DataNode`(selected by policy) to watch it
// if there is anything make the allocation impossible, the response will not contain the corresponding result
G
godchen 已提交
173
func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
174
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
175 176 177
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
178
		return client.(datapb.DataCoordClient).AssignSegmentID(ctx, req)
G
godchen 已提交
179
	})
D
dragondriver 已提交
180 181 182
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
183
	return ret.(*datapb.AssignSegmentIDResponse), err
S
sunby 已提交
184 185
}

186 187 188 189 190 191 192 193 194
// GetSegmentStates requests segment state information
//
// ctx is the context to control request deadline and cancellation
// req contains the list of segment id to query
//
// response struct `GetSegmentStatesResponse` contains the list of each state query result
// 	when the segment is not found, the state entry will has the field `Status`  to identify failure
// 	otherwise the Segment State and Start position information will be returned
// error is returned only when some communication issue occurs
G
godchen 已提交
195
func (c *Client) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
196
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
197 198 199
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
200
		return client.(datapb.DataCoordClient).GetSegmentStates(ctx, req)
G
godchen 已提交
201
	})
D
dragondriver 已提交
202 203 204
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
205
	return ret.(*datapb.GetSegmentStatesResponse), err
S
sunby 已提交
206 207
}

208 209 210 211 212 213 214 215
// GetInsertBinlogPaths requests binlog paths for specified segment
//
// ctx is the context to control request deadline and cancellation
// req contains the segment id to query
//
// response struct `GetInsertBinlogPathsResponse` contains the fields list
// 	and corresponding binlog path list
// error is returned only when some communication issue occurs
G
godchen 已提交
216
func (c *Client) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
217
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
218 219 220
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
221
		return client.(datapb.DataCoordClient).GetInsertBinlogPaths(ctx, req)
G
godchen 已提交
222
	})
D
dragondriver 已提交
223 224 225
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
226
	return ret.(*datapb.GetInsertBinlogPathsResponse), err
S
sunby 已提交
227 228
}

229 230 231 232 233 234 235 236
// GetCollectionStatistics requests collection statistics
//
// ctx is the context to control request deadline and cancellation
// req contains the collection id to query
//
// response struct `GetCollectionStatisticsResponse` contains the key-value list fields returning related data
// 	only row count for now
// error is returned only when some communication issue occurs
G
godchen 已提交
237
func (c *Client) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
238
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
239 240 241
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
242
		return client.(datapb.DataCoordClient).GetCollectionStatistics(ctx, req)
G
godchen 已提交
243
	})
D
dragondriver 已提交
244 245 246
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
247
	return ret.(*datapb.GetCollectionStatisticsResponse), err
S
sunby 已提交
248 249
}

250 251 252 253 254 255 256 257
// GetPartitionStatistics requests partition statistics
//
// ctx is the context to control request deadline and cancellation
// req contains the collection and partition id to query
//
// response struct `GetPartitionStatisticsResponse` contains the key-value list fields returning related data
// 	only row count for now
// error is returned only when some communication issue occurs
G
godchen 已提交
258
func (c *Client) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) {
259
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
260 261 262
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
263
		return client.(datapb.DataCoordClient).GetPartitionStatistics(ctx, req)
G
godchen 已提交
264
	})
D
dragondriver 已提交
265 266 267
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
268
	return ret.(*datapb.GetPartitionStatisticsResponse), err
S
sunby 已提交
269
}
N
neza2017 已提交
270

271 272
// GetSegmentInfoChannel DEPRECATED
// legacy api to get SegmentInfo Channel name
G
godchen 已提交
273
func (c *Client) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
274
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
275 276 277
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
278
		return client.(datapb.DataCoordClient).GetSegmentInfoChannel(ctx, &datapb.GetSegmentInfoChannelRequest{})
G
godchen 已提交
279
	})
D
dragondriver 已提交
280 281 282
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
283
	return ret.(*milvuspb.StringResponse), err
N
neza2017 已提交
284
}
285

286 287 288 289 290 291 292
// GetSegmentInfo requests segment info
//
// ctx is the context to control request deadline and cancellation
// req contains the list of segment ids to query
//
// response struct `GetSegmentInfoResponse` contains the list of segment info
// error is returned only when some communication issue occurs
G
godchen 已提交
293
func (c *Client) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
294
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
295 296 297
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
298
		return client.(datapb.DataCoordClient).GetSegmentInfo(ctx, req)
G
godchen 已提交
299
	})
D
dragondriver 已提交
300 301 302
	if err != nil || ret == nil {
		return nil, err
	}
G
godchen 已提交
303
	return ret.(*datapb.GetSegmentInfoResponse), err
X
XuanYang-cn 已提交
304
}
305

306 307 308 309 310 311 312 313 314 315 316 317
// SaveBinlogPaths updates segments binlogs(including insert binlogs, stats logs and delta logs)
//  and related message stream positions
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//
// response status contains the status/error code and failing reason if any
// error is returned only when some communication issue occurs
//
// there is a constraint that the `SaveBinlogPaths` requests of same segment shall be passed in sequence
// 	the root reason is each `SaveBinlogPaths` will overwrite the checkpoint position
//  if the constraint is broken, the checkpoint position will not be monotonically increasing and the integrity will be compromised
318
func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
319 320 321 322 323 324 325 326 327
	// use Call here on purpose
	ret, err := c.grpcClient.Call(ctx, func(client interface{}) (interface{}, error) {
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
		return client.(datapb.DataCoordClient).SaveBinlogPaths(ctx, req)
	})
	if err != nil || ret == nil {
		return nil, err
D
dragondriver 已提交
328
	}
329
	return ret.(*commonpb.Status), err
330 331
}

332 333 334 335 336 337 338
// GetRecoveryInfo request segment recovery info of collection/partition
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//
// response struct `GetRecoveryInfoResponse` contains the list of segments info and corresponding vchannel info
// error is returned only when some communication issue occurs
339
func (c *Client) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
340
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
341 342 343
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
344
		return client.(datapb.DataCoordClient).GetRecoveryInfo(ctx, req)
345
	})
D
dragondriver 已提交
346 347 348
	if err != nil || ret == nil {
		return nil, err
	}
349 350
	return ret.(*datapb.GetRecoveryInfoResponse), err
}
351

352 353 354 355 356 357 358 359
// GetFlushedSegments returns flushed segment list of requested collection/parition
//
// ctx is the context to control request deadline and cancellation
// req contains the collection/partition id to query
//  when partition is lesser or equal to 0, all flushed segments of collection will be returned
//
// response struct `GetFlushedSegmentsResponse` contains flushed segment id list
// error is returned only when some communication issue occurs
360
func (c *Client) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
361
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
362 363 364
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
365
		return client.(datapb.DataCoordClient).GetFlushedSegments(ctx, req)
366
	})
D
dragondriver 已提交
367 368 369
	if err != nil || ret == nil {
		return nil, err
	}
370 371
	return ret.(*datapb.GetFlushedSegmentsResponse), err
}
372

373
// GetMetrics gets all metrics of datacoord
374
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
375
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
376 377 378
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
379
		return client.(datapb.DataCoordClient).GetMetrics(ctx, req)
380
	})
D
dragondriver 已提交
381 382 383
	if err != nil || ret == nil {
		return nil, err
	}
384 385
	return ret.(*milvuspb.GetMetricsResponse), err
}
S
sunby 已提交
386 387

func (c *Client) CompleteCompaction(ctx context.Context, req *datapb.CompactionResult) (*commonpb.Status, error) {
388
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
389 390 391
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
392
		return client.(datapb.DataCoordClient).CompleteCompaction(ctx, req)
S
sunby 已提交
393 394 395 396 397 398 399
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*commonpb.Status), err
}

400
func (c *Client) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompactionRequest) (*milvuspb.ManualCompactionResponse, error) {
401
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
402 403 404
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
405
		return client.(datapb.DataCoordClient).ManualCompaction(ctx, req)
S
sunby 已提交
406 407 408 409
	})
	if err != nil || ret == nil {
		return nil, err
	}
410
	return ret.(*milvuspb.ManualCompactionResponse), err
S
sunby 已提交
411 412
}

413
func (c *Client) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
414
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
415 416 417
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
418
		return client.(datapb.DataCoordClient).GetCompactionState(ctx, req)
S
sunby 已提交
419 420 421 422
	})
	if err != nil || ret == nil {
		return nil, err
	}
423 424 425 426
	return ret.(*milvuspb.GetCompactionStateResponse), err
}

func (c *Client) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
427
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
428 429 430
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
431
		return client.(datapb.DataCoordClient).GetCompactionStateWithPlans(ctx, req)
432 433 434 435 436
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*milvuspb.GetCompactionPlansResponse), err
S
sunby 已提交
437
}
G
godchen 已提交
438

439
// WatchChannels notifies DataCoord to watch vchannels of a collection
G
godchen 已提交
440
func (c *Client) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
441
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
442 443 444
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
445
		return client.(datapb.DataCoordClient).WatchChannels(ctx, req)
G
godchen 已提交
446 447 448 449 450 451
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*datapb.WatchChannelsResponse), err
}
B
Bingyi Sun 已提交
452 453 454

// GetFlushState gets the flush state of multiple segments
func (c *Client) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
455
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
B
Bingyi Sun 已提交
456 457 458
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
459
		return client.(datapb.DataCoordClient).GetFlushState(ctx, req)
B
Bingyi Sun 已提交
460 461 462 463 464 465
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*milvuspb.GetFlushStateResponse), err
}
466 467 468

// DropVirtualChannel drops virtual channel in datacoord.
func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
469
	ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
470 471 472
		if !funcutil.CheckCtxValid(ctx) {
			return nil, ctx.Err()
		}
473
		return client.(datapb.DataCoordClient).DropVirtualChannel(ctx, req)
474 475 476 477 478 479
	})
	if err != nil || ret == nil {
		return nil, err
	}
	return ret.(*datapb.DropVirtualChannelResponse), err
}