query_node.go 15.3 KB
Newer Older
1
package querynode
B
bigsheeper 已提交
2

3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
8

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12

*/
B
bigsheeper 已提交
13
import "C"
14

B
bigsheeper 已提交
15
import (
16
	"context"
17
	"fmt"
G
godchen 已提交
18
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
S
sunby 已提交
19
	"math/rand"
X
Xiangyu Wang 已提交
20
	"strings"
C
cai.zhang 已提交
21
	"sync/atomic"
S
sunby 已提交
22
	"time"
23

T
ThreadDao 已提交
24 25
	"github.com/zilliztech/milvus-distributed/internal/types"

S
sunby 已提交
26 27
	"errors"

B
bigsheeper 已提交
28 29 30
	"go.uber.org/zap"

	"github.com/zilliztech/milvus-distributed/internal/log"
G
groot 已提交
31
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
X
Xiangyu Wang 已提交
32
	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
G
groot 已提交
33
	"github.com/zilliztech/milvus-distributed/internal/msgstream/rmqms"
34
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
G
godchen 已提交
35
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
36
	queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
37
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
B
bigsheeper 已提交
38 39 40
)

type QueryNode struct {
X
XuanYang-cn 已提交
41
	queryNodeLoopCtx    context.Context
42
	queryNodeLoopCancel context.CancelFunc
43

44
	QueryNodeID UniqueID
C
cai.zhang 已提交
45
	stateCode   atomic.Value
B
bigsheeper 已提交
46

47
	replica ReplicaInterface
B
bigsheeper 已提交
48

49
	// internal services
50 51 52 53 54
	dataSyncService *dataSyncService
	metaService     *metaService
	searchService   *searchService
	loadService     *loadService
	statsService    *statsService
55

56
	// clients
T
ThreadDao 已提交
57 58 59 60
	masterService types.MasterService
	queryService  types.QueryService
	indexService  types.IndexService
	dataService   types.DataService
G
groot 已提交
61 62

	msFactory msgstream.Factory
B
bigsheeper 已提交
63
}
64

65
func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.Factory) *QueryNode {
S
sunby 已提交
66
	rand.Seed(time.Now().UnixNano())
X
XuanYang-cn 已提交
67
	ctx1, cancel := context.WithCancel(ctx)
C
cai.zhang 已提交
68
	node := &QueryNode{
69 70 71 72 73 74 75 76
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,
		QueryNodeID:         queryNodeID,

		dataSyncService: nil,
		metaService:     nil,
		searchService:   nil,
		statsService:    nil,
G
groot 已提交
77 78

		msFactory: factory,
79 80
	}

81
	node.replica = newCollectionReplica()
G
godchen 已提交
82
	node.UpdateStateCode(internalpb.StateCode_Abnormal)
C
cai.zhang 已提交
83 84
	return node
}
G
godchen 已提交
85

G
groot 已提交
86
func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *QueryNode {
87 88 89 90 91 92 93 94 95
	ctx1, cancel := context.WithCancel(ctx)
	node := &QueryNode{
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,

		dataSyncService: nil,
		metaService:     nil,
		searchService:   nil,
		statsService:    nil,
G
groot 已提交
96 97

		msFactory: factory,
98 99
	}

100
	node.replica = newCollectionReplica()
G
godchen 已提交
101
	node.UpdateStateCode(internalpb.StateCode_Abnormal)
102

103
	return node
B
bigsheeper 已提交
104 105
}

N
neza2017 已提交
106
func (node *QueryNode) Init() error {
G
godchen 已提交
107
	ctx := context.Background()
X
xige-16 已提交
108
	registerReq := &queryPb.RegisterNodeRequest{
109 110 111
		Base: &commonpb.MsgBase{
			SourceID: Params.QueryNodeID,
		},
C
cai.zhang 已提交
112 113 114 115 116
		Address: &commonpb.Address{
			Ip:   Params.QueryNodeIP,
			Port: Params.QueryNodePort,
		},
	}
117

T
ThreadDao 已提交
118
	resp, err := node.queryService.RegisterNode(ctx, registerReq)
C
cai.zhang 已提交
119 120 121
	if err != nil {
		panic(err)
	}
122
	if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
123 124 125 126 127 128 129 130 131 132 133 134 135 136
		panic(resp.Status.Reason)
	}

	for _, kv := range resp.InitParams.StartParams {
		switch kv.Key {
		case "StatsChannelName":
			Params.StatsChannelName = kv.Value
		case "TimeTickChannelName":
			Params.QueryTimeTickChannelName = kv.Value
		case "QueryChannelName":
			Params.SearchChannelNames = append(Params.SearchChannelNames, kv.Value)
		case "QueryResultChannelName":
			Params.SearchResultChannelNames = append(Params.SearchResultChannelNames, kv.Value)
		default:
S
sunby 已提交
137
			return fmt.Errorf("Invalid key: %v", kv.Key)
138
		}
C
cai.zhang 已提交
139 140
	}

B
bigsheeper 已提交
141
	log.Debug("", zap.Int64("QueryNodeID", Params.QueryNodeID))
C
cai.zhang 已提交
142

T
ThreadDao 已提交
143
	if node.masterService == nil {
B
bigsheeper 已提交
144
		log.Error("null master service detected")
145 146
	}

T
ThreadDao 已提交
147
	if node.indexService == nil {
B
bigsheeper 已提交
148
		log.Error("null index service detected")
149 150
	}

T
ThreadDao 已提交
151
	if node.dataService == nil {
B
bigsheeper 已提交
152
		log.Error("null data service detected")
153 154
	}

155 156 157 158
	return nil
}

func (node *QueryNode) Start() error {
G
groot 已提交
159 160 161 162 163 164 165 166 167 168
	var err error
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = node.msFactory.SetParams(m)
	if err != nil {
		return err
	}

X
XuanYang-cn 已提交
169
	// init services and manager
G
groot 已提交
170 171
	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica, node.msFactory)
	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, node.msFactory)
B
bigsheeper 已提交
172
	//node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
G
groot 已提交
173

T
ThreadDao 已提交
174
	node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterService, node.dataService, node.indexService, node.replica, node.dataSyncService.dmStream)
G
groot 已提交
175
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan, node.msFactory)
B
bigsheeper 已提交
176

X
XuanYang-cn 已提交
177
	// start services
178
	go node.dataSyncService.start()
N
neza2017 已提交
179
	go node.searchService.start()
B
bigsheeper 已提交
180
	//go node.metaService.start()
181
	go node.loadService.start()
X
XuanYang-cn 已提交
182
	go node.statsService.start()
G
godchen 已提交
183
	node.UpdateStateCode(internalpb.StateCode_Healthy)
N
neza2017 已提交
184
	return nil
B
bigsheeper 已提交
185
}
B
bigsheeper 已提交
186

N
neza2017 已提交
187
func (node *QueryNode) Stop() error {
G
godchen 已提交
188
	node.UpdateStateCode(internalpb.StateCode_Abnormal)
X
XuanYang-cn 已提交
189 190
	node.queryNodeLoopCancel()

B
bigsheeper 已提交
191
	// free collectionReplica
X
XuanYang-cn 已提交
192
	node.replica.freeAll()
B
bigsheeper 已提交
193 194 195

	// close services
	if node.dataSyncService != nil {
X
XuanYang-cn 已提交
196
		node.dataSyncService.close()
B
bigsheeper 已提交
197 198
	}
	if node.searchService != nil {
X
XuanYang-cn 已提交
199
		node.searchService.close()
B
bigsheeper 已提交
200
	}
201 202
	if node.loadService != nil {
		node.loadService.close()
B
bigsheeper 已提交
203
	}
B
bigsheeper 已提交
204
	if node.statsService != nil {
X
XuanYang-cn 已提交
205
		node.statsService.close()
B
bigsheeper 已提交
206
	}
N
neza2017 已提交
207
	return nil
X
XuanYang-cn 已提交
208 209
}

G
godchen 已提交
210
func (node *QueryNode) UpdateStateCode(code internalpb.StateCode) {
211 212 213
	node.stateCode.Store(code)
}

T
ThreadDao 已提交
214
func (node *QueryNode) SetMasterService(master types.MasterService) error {
B
bigsheeper 已提交
215 216 217
	if master == nil {
		return errors.New("null master service interface")
	}
T
ThreadDao 已提交
218
	node.masterService = master
B
bigsheeper 已提交
219 220 221
	return nil
}

T
ThreadDao 已提交
222
func (node *QueryNode) SetQueryService(query types.QueryService) error {
223
	if query == nil {
B
bigsheeper 已提交
224
		return errors.New("null query service interface")
225
	}
T
ThreadDao 已提交
226
	node.queryService = query
227 228 229
	return nil
}

T
ThreadDao 已提交
230
func (node *QueryNode) SetIndexService(index types.IndexService) error {
231 232 233
	if index == nil {
		return errors.New("null index service interface")
	}
T
ThreadDao 已提交
234
	node.indexService = index
235 236 237
	return nil
}

T
ThreadDao 已提交
238
func (node *QueryNode) SetDataService(data types.DataService) error {
239 240 241
	if data == nil {
		return errors.New("null data service interface")
	}
T
ThreadDao 已提交
242
	node.dataService = data
243 244 245
	return nil
}

G
godchen 已提交
246 247
func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
	stats := &internalpb.ComponentStates{
248
		Status: &commonpb.Status{
249
			ErrorCode: commonpb.ErrorCode_Success,
250 251
		},
	}
G
godchen 已提交
252
	code, ok := node.stateCode.Load().(internalpb.StateCode)
C
cai.zhang 已提交
253
	if !ok {
254 255
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
256
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
257 258 259
			Reason:    errMsg,
		}
		return stats, errors.New(errMsg)
C
cai.zhang 已提交
260
	}
G
godchen 已提交
261
	info := &internalpb.ComponentInfo{
C
cai.zhang 已提交
262
		NodeID:    Params.QueryNodeID,
X
XuanYang-cn 已提交
263
		Role:      typeutil.QueryNodeRole,
C
cai.zhang 已提交
264 265
		StateCode: code,
	}
266
	stats.State = info
C
cai.zhang 已提交
267 268 269
	return stats, nil
}

G
godchen 已提交
270 271 272 273 274 275 276 277
func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.QueryTimeTickChannelName,
	}, nil
C
cai.zhang 已提交
278 279
}

G
godchen 已提交
280 281 282 283 284 285 286 287
func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.StatsChannelName,
	}, nil
C
cai.zhang 已提交
288 289
}

G
godchen 已提交
290
func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
291 292 293
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search message stream"
		status := &commonpb.Status{
294
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
295 296 297 298 299 300 301 302 303
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// add request channel
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
X
xige-16 已提交
304
	node.searchService.searchMsgStream.AsConsumer(consumeChannels, consumeSubName)
X
Xiangyu Wang 已提交
305
	log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
X
XuanYang-cn 已提交
306 307 308

	// add result channel
	producerChannels := []string{in.ResultChannelID}
X
xige-16 已提交
309
	node.searchService.searchResultMsgStream.AsProducer(producerChannels)
X
Xiangyu Wang 已提交
310
	log.Debug("querynode AsProducer: " + strings.Join(producerChannels, ", "))
X
XuanYang-cn 已提交
311 312

	status := &commonpb.Status{
313
		ErrorCode: commonpb.ErrorCode_Success,
X
XuanYang-cn 已提交
314 315 316 317
	}
	return status, nil
}

G
godchen 已提交
318
func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
319 320 321
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search result message stream"
		status := &commonpb.Status{
322
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
323 324 325 326 327 328
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
329
	searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
330 331 332
	if !ok {
		errMsg := "type assertion failed for search message stream"
		status := &commonpb.Status{
333
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
334 335 336 337 338 339
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
340
	resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
341 342 343
	if !ok {
		errMsg := "type assertion failed for search result message stream"
		status := &commonpb.Status{
344
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
345 346 347 348 349 350 351 352 353 354
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// remove request channel
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
	// TODO: searchStream.RemovePulsarConsumers(producerChannels)
Z
zhenshan.cao 已提交
355
	searchStream.AsConsumer(consumeChannels, consumeSubName)
X
XuanYang-cn 已提交
356 357 358 359

	// remove result channel
	producerChannels := []string{in.ResultChannelID}
	// TODO: resultStream.RemovePulsarProducer(producerChannels)
Z
zhenshan.cao 已提交
360
	resultStream.AsProducer(producerChannels)
X
XuanYang-cn 已提交
361 362

	status := &commonpb.Status{
363
		ErrorCode: commonpb.ErrorCode_Success,
X
XuanYang-cn 已提交
364 365 366 367
	}
	return status, nil
}

G
godchen 已提交
368
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
369 370 371
	if node.dataSyncService == nil || node.dataSyncService.dmStream == nil {
		errMsg := "null data sync service or null data manipulation stream"
		status := &commonpb.Status{
372
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
373 374 375 376 377 378
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

G
groot 已提交
379 380 381 382 383
	switch t := node.dataSyncService.dmStream.(type) {
	case *pulsarms.PulsarTtMsgStream:
	case *rmqms.RmqTtMsgStream:
	default:
		_ = t
X
XuanYang-cn 已提交
384 385
		errMsg := "type assertion failed for dm message stream"
		status := &commonpb.Status{
386
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
387 388 389 390 391 392 393 394 395
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// add request channel
	consumeChannels := in.ChannelIDs
	consumeSubName := Params.MsgChannelSubName
G
groot 已提交
396
	node.dataSyncService.dmStream.AsConsumer(consumeChannels, consumeSubName)
X
Xiangyu Wang 已提交
397
	log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
X
XuanYang-cn 已提交
398 399

	status := &commonpb.Status{
400
		ErrorCode: commonpb.ErrorCode_Success,
X
XuanYang-cn 已提交
401 402 403 404
	}
	return status, nil
}

G
godchen 已提交
405
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentsRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
406
	// TODO: support db
Z
zhenshan.cao 已提交
407
	collectionID := in.CollectionID
C
cai.zhang 已提交
408 409
	partitionID := in.PartitionID
	segmentIDs := in.SegmentIDs
X
XuanYang-cn 已提交
410
	fieldIDs := in.FieldIDs
411
	schema := in.Schema
412

B
bigsheeper 已提交
413
	log.Debug("query node load segment", zap.String("loadSegmentRequest", fmt.Sprintln(in)))
X
xige-16 已提交
414 415

	status := &commonpb.Status{
416
		ErrorCode: commonpb.ErrorCode_Success,
X
xige-16 已提交
417
	}
418 419 420 421 422
	hasCollection := node.replica.hasCollection(collectionID)
	hasPartition := node.replica.hasPartition(partitionID)
	if !hasCollection {
		err := node.replica.addCollection(collectionID, schema)
		if err != nil {
423
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
424
			status.Reason = err.Error()
425 426 427 428 429 430
			return status, err
		}
	}
	if !hasPartition {
		err := node.replica.addPartition(collectionID, partitionID)
		if err != nil {
431
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
432
			status.Reason = err.Error()
433 434 435
			return status, err
		}
	}
436
	err := node.replica.enablePartition(partitionID)
C
cai.zhang 已提交
437
	if err != nil {
438
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
439 440 441 442 443 444 445 446 447 448
		status.Reason = err.Error()
		return status, err
	}

	if len(segmentIDs) == 0 {
		return status, nil
	}

	if len(in.SegmentIDs) != len(in.SegmentStates) {
		err := errors.New("len(segmentIDs) should equal to len(segmentStates)")
449
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
450
		status.Reason = err.Error()
C
cai.zhang 已提交
451 452 453
		return status, err
	}

454
	// segments are ordered before LoadSegments calling
G
godchen 已提交
455
	//var position *internalpb.MsgPosition = nil
456
	for i, state := range in.SegmentStates {
457
		//thisPosition := state.StartPosition
T
ThreadDao 已提交
458
		if state.State <= commonpb.SegmentState_Growing {
459 460 461 462 463
			//if position == nil {
			//	position = &internalpb2.MsgPosition{
			//		ChannelName: thisPosition.ChannelName,
			//	}
			//}
464 465
			segmentIDs = segmentIDs[:i]
			break
C
cai.zhang 已提交
466
		}
467
		//position = state.StartPosition
468 469
	}

470 471 472 473 474 475 476 477
	//err = node.dataSyncService.seekSegment(position)
	//if err != nil {
	//	status := &commonpb.Status{
	//		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	//		Reason:    err.Error(),
	//	}
	//	return status, err
	//}
X
xige-16 已提交
478 479 480

	err = node.loadService.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs)
	if err != nil {
481
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
482 483 484 485
		status.Reason = err.Error()
		return status, err
	}
	return status, nil
C
cai.zhang 已提交
486 487
}

G
godchen 已提交
488
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {
B
bigsheeper 已提交
489 490 491
	err := node.replica.removeCollection(in.CollectionID)
	if err != nil {
		status := &commonpb.Status{
492
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
bigsheeper 已提交
493 494 495 496 497 498
			Reason:    err.Error(),
		}
		return status, err
	}

	return &commonpb.Status{
499
		ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
500 501 502
	}, nil
}

G
godchen 已提交
503
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.ReleasePartitionsRequest) (*commonpb.Status, error) {
B
bigsheeper 已提交
504
	status := &commonpb.Status{
505
		ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
506
	}
C
cai.zhang 已提交
507
	for _, id := range in.PartitionIDs {
B
bigsheeper 已提交
508
		err := node.loadService.segLoader.replica.removePartition(id)
C
cai.zhang 已提交
509
		if err != nil {
B
bigsheeper 已提交
510
			// not return, try to release all partitions
511
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
B
bigsheeper 已提交
512
			status.Reason = err.Error()
C
cai.zhang 已提交
513 514
		}
	}
B
bigsheeper 已提交
515 516
	return status, nil
}
C
cai.zhang 已提交
517

G
godchen 已提交
518
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
B
bigsheeper 已提交
519
	status := &commonpb.Status{
520
		ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
521
	}
C
cai.zhang 已提交
522
	for _, id := range in.SegmentIDs {
B
bigsheeper 已提交
523 524 525
		err2 := node.loadService.segLoader.replica.removeSegment(id)
		if err2 != nil {
			// not return, try to release all segments
526
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
B
bigsheeper 已提交
527
			status.Reason = err2.Error()
X
XuanYang-cn 已提交
528 529
		}
	}
B
bigsheeper 已提交
530
	return status, nil
X
XuanYang-cn 已提交
531
}
B
bigsheeper 已提交
532

G
godchen 已提交
533
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmentInfoRequest) (*queryPb.GetSegmentInfoResponse, error) {
B
bigsheeper 已提交
534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
	infos := make([]*queryPb.SegmentInfo, 0)
	for _, id := range in.SegmentIDs {
		segment, err := node.replica.getSegmentByID(id)
		if err != nil {
			continue
		}
		info := &queryPb.SegmentInfo{
			SegmentID:    segment.ID(),
			CollectionID: segment.collectionID,
			PartitionID:  segment.partitionID,
			MemSize:      segment.getMemSize(),
			NumRows:      segment.getRowCount(),
			IndexName:    segment.getIndexName(),
			IndexID:      segment.getIndexID(),
		}
		infos = append(infos, info)
	}
G
godchen 已提交
551
	return &queryPb.GetSegmentInfoResponse{
B
bigsheeper 已提交
552
		Status: &commonpb.Status{
553
			ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
554 555 556 557
		},
		Infos: infos,
	}, nil
}