query_node.go 17.9 KB
Newer Older
1
package querynode
B
bigsheeper 已提交
2

3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
8

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12

*/
B
bigsheeper 已提交
13
import "C"
14

B
bigsheeper 已提交
15
import (
16
	"context"
Z
zhenshan.cao 已提交
17
	"errors"
18
	"fmt"
S
sunby 已提交
19
	"math/rand"
Z
zhenshan.cao 已提交
20
	"strconv"
X
Xiangyu Wang 已提交
21
	"strings"
22
	"sync"
C
cai.zhang 已提交
23
	"sync/atomic"
S
sunby 已提交
24
	"time"
25

B
bigsheeper 已提交
26 27 28
	"go.uber.org/zap"

	"github.com/zilliztech/milvus-distributed/internal/log"
G
groot 已提交
29
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
30
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
G
godchen 已提交
31
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
Z
zhenshan.cao 已提交
32
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
33
	queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
Z
zhenshan.cao 已提交
34
	"github.com/zilliztech/milvus-distributed/internal/types"
35
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
B
bigsheeper 已提交
36 37 38
)

type QueryNode struct {
X
XuanYang-cn 已提交
39
	queryNodeLoopCtx    context.Context
40
	queryNodeLoopCancel context.CancelFunc
41

42
	QueryNodeID UniqueID
C
cai.zhang 已提交
43
	stateCode   atomic.Value
B
bigsheeper 已提交
44

45
	replica ReplicaInterface
B
bigsheeper 已提交
46

47
	// internal services
Z
zhenshan.cao 已提交
48 49 50 51
	metaService      *metaService
	searchService    *searchService
	loadService      *loadService
	statsService     *statsService
52 53
	dsServicesMu     sync.Mutex // guards dataSyncServices
	dataSyncServices map[UniqueID]*dataSyncService
54

55
	// clients
T
ThreadDao 已提交
56 57 58 59
	masterService types.MasterService
	queryService  types.QueryService
	indexService  types.IndexService
	dataService   types.DataService
G
groot 已提交
60 61

	msFactory msgstream.Factory
B
bigsheeper 已提交
62
}
63

64
func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.Factory) *QueryNode {
S
sunby 已提交
65
	rand.Seed(time.Now().UnixNano())
X
XuanYang-cn 已提交
66
	ctx1, cancel := context.WithCancel(ctx)
C
cai.zhang 已提交
67
	node := &QueryNode{
68 69 70 71
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,
		QueryNodeID:         queryNodeID,

Z
zhenshan.cao 已提交
72 73 74 75
		dataSyncServices: make(map[UniqueID]*dataSyncService),
		metaService:      nil,
		searchService:    nil,
		statsService:     nil,
G
groot 已提交
76 77

		msFactory: factory,
78 79
	}

80
	node.replica = newCollectionReplica()
G
godchen 已提交
81
	node.UpdateStateCode(internalpb.StateCode_Abnormal)
C
cai.zhang 已提交
82 83
	return node
}
G
godchen 已提交
84

G
groot 已提交
85
func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *QueryNode {
86 87 88 89 90
	ctx1, cancel := context.WithCancel(ctx)
	node := &QueryNode{
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,

Z
zhenshan.cao 已提交
91 92 93 94
		dataSyncServices: make(map[UniqueID]*dataSyncService),
		metaService:      nil,
		searchService:    nil,
		statsService:     nil,
G
groot 已提交
95 96

		msFactory: factory,
97 98
	}

99
	node.replica = newCollectionReplica()
G
godchen 已提交
100
	node.UpdateStateCode(internalpb.StateCode_Abnormal)
101

102
	return node
B
bigsheeper 已提交
103 104
}

N
neza2017 已提交
105
func (node *QueryNode) Init() error {
G
godchen 已提交
106
	ctx := context.Background()
X
xige-16 已提交
107
	registerReq := &queryPb.RegisterNodeRequest{
108 109 110
		Base: &commonpb.MsgBase{
			SourceID: Params.QueryNodeID,
		},
C
cai.zhang 已提交
111 112 113 114 115
		Address: &commonpb.Address{
			Ip:   Params.QueryNodeIP,
			Port: Params.QueryNodePort,
		},
	}
116

T
ThreadDao 已提交
117
	resp, err := node.queryService.RegisterNode(ctx, registerReq)
C
cai.zhang 已提交
118 119 120
	if err != nil {
		panic(err)
	}
121
	if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
122 123 124 125 126 127 128 129 130 131 132 133 134 135
		panic(resp.Status.Reason)
	}

	for _, kv := range resp.InitParams.StartParams {
		switch kv.Key {
		case "StatsChannelName":
			Params.StatsChannelName = kv.Value
		case "TimeTickChannelName":
			Params.QueryTimeTickChannelName = kv.Value
		case "QueryChannelName":
			Params.SearchChannelNames = append(Params.SearchChannelNames, kv.Value)
		case "QueryResultChannelName":
			Params.SearchResultChannelNames = append(Params.SearchResultChannelNames, kv.Value)
		default:
S
sunby 已提交
136
			return fmt.Errorf("Invalid key: %v", kv.Key)
137
		}
C
cai.zhang 已提交
138 139
	}

B
bigsheeper 已提交
140
	log.Debug("", zap.Int64("QueryNodeID", Params.QueryNodeID))
C
cai.zhang 已提交
141

T
ThreadDao 已提交
142
	if node.masterService == nil {
B
bigsheeper 已提交
143
		log.Error("null master service detected")
144 145
	}

T
ThreadDao 已提交
146
	if node.indexService == nil {
B
bigsheeper 已提交
147
		log.Error("null index service detected")
148 149
	}

T
ThreadDao 已提交
150
	if node.dataService == nil {
B
bigsheeper 已提交
151
		log.Error("null data service detected")
152 153
	}

154 155 156 157
	return nil
}

func (node *QueryNode) Start() error {
G
groot 已提交
158 159 160 161 162 163 164 165 166 167
	var err error
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = node.msFactory.SetParams(m)
	if err != nil {
		return err
	}

X
XuanYang-cn 已提交
168
	// init services and manager
G
groot 已提交
169
	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, node.msFactory)
B
bigsheeper 已提交
170
	//node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
G
groot 已提交
171

Z
zhenshan.cao 已提交
172
	node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterService, node.dataService, node.indexService, node.replica)
G
groot 已提交
173
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan, node.msFactory)
B
bigsheeper 已提交
174

X
XuanYang-cn 已提交
175
	// start services
N
neza2017 已提交
176
	go node.searchService.start()
B
bigsheeper 已提交
177
	//go node.metaService.start()
178
	go node.loadService.start()
X
XuanYang-cn 已提交
179
	go node.statsService.start()
G
godchen 已提交
180
	node.UpdateStateCode(internalpb.StateCode_Healthy)
N
neza2017 已提交
181
	return nil
B
bigsheeper 已提交
182
}
B
bigsheeper 已提交
183

N
neza2017 已提交
184
func (node *QueryNode) Stop() error {
G
godchen 已提交
185
	node.UpdateStateCode(internalpb.StateCode_Abnormal)
X
XuanYang-cn 已提交
186 187
	node.queryNodeLoopCancel()

B
bigsheeper 已提交
188
	// free collectionReplica
X
XuanYang-cn 已提交
189
	node.replica.freeAll()
B
bigsheeper 已提交
190 191

	// close services
Z
zhenshan.cao 已提交
192 193 194 195
	for _, dsService := range node.dataSyncServices {
		if dsService != nil {
			dsService.close()
		}
B
bigsheeper 已提交
196 197
	}
	if node.searchService != nil {
X
XuanYang-cn 已提交
198
		node.searchService.close()
B
bigsheeper 已提交
199
	}
200 201
	if node.loadService != nil {
		node.loadService.close()
B
bigsheeper 已提交
202
	}
B
bigsheeper 已提交
203
	if node.statsService != nil {
X
XuanYang-cn 已提交
204
		node.statsService.close()
B
bigsheeper 已提交
205
	}
N
neza2017 已提交
206
	return nil
X
XuanYang-cn 已提交
207 208
}

G
godchen 已提交
209
func (node *QueryNode) UpdateStateCode(code internalpb.StateCode) {
210 211 212
	node.stateCode.Store(code)
}

T
ThreadDao 已提交
213
func (node *QueryNode) SetMasterService(master types.MasterService) error {
B
bigsheeper 已提交
214 215 216
	if master == nil {
		return errors.New("null master service interface")
	}
T
ThreadDao 已提交
217
	node.masterService = master
B
bigsheeper 已提交
218 219 220
	return nil
}

T
ThreadDao 已提交
221
func (node *QueryNode) SetQueryService(query types.QueryService) error {
222
	if query == nil {
B
bigsheeper 已提交
223
		return errors.New("null query service interface")
224
	}
T
ThreadDao 已提交
225
	node.queryService = query
226 227 228
	return nil
}

T
ThreadDao 已提交
229
func (node *QueryNode) SetIndexService(index types.IndexService) error {
230 231 232
	if index == nil {
		return errors.New("null index service interface")
	}
T
ThreadDao 已提交
233
	node.indexService = index
234 235 236
	return nil
}

T
ThreadDao 已提交
237
func (node *QueryNode) SetDataService(data types.DataService) error {
238 239 240
	if data == nil {
		return errors.New("null data service interface")
	}
T
ThreadDao 已提交
241
	node.dataService = data
242 243 244
	return nil
}

245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
func (node *QueryNode) getDataSyncService(collectionID UniqueID) (*dataSyncService, error) {
	node.dsServicesMu.Lock()
	defer node.dsServicesMu.Unlock()
	ds, ok := node.dataSyncServices[collectionID]
	if !ok {
		return nil, errors.New("cannot found dataSyncService, collectionID =" + fmt.Sprintln(collectionID))
	}
	return ds, nil
}

func (node *QueryNode) addDataSyncService(collectionID UniqueID, ds *dataSyncService) error {
	node.dsServicesMu.Lock()
	defer node.dsServicesMu.Unlock()
	if _, ok := node.dataSyncServices[collectionID]; ok {
		return errors.New("dataSyncService has been existed, collectionID =" + fmt.Sprintln(collectionID))
	}
	node.dataSyncServices[collectionID] = ds
	return nil
}

func (node *QueryNode) removeDataSyncService(collectionID UniqueID) {
	node.dsServicesMu.Lock()
	defer node.dsServicesMu.Unlock()
	delete(node.dataSyncServices, collectionID)
}

G
godchen 已提交
271 272
func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
	stats := &internalpb.ComponentStates{
273
		Status: &commonpb.Status{
274
			ErrorCode: commonpb.ErrorCode_Success,
275 276
		},
	}
G
godchen 已提交
277
	code, ok := node.stateCode.Load().(internalpb.StateCode)
C
cai.zhang 已提交
278
	if !ok {
279 280
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
281
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
282 283 284
			Reason:    errMsg,
		}
		return stats, errors.New(errMsg)
C
cai.zhang 已提交
285
	}
G
godchen 已提交
286
	info := &internalpb.ComponentInfo{
C
cai.zhang 已提交
287
		NodeID:    Params.QueryNodeID,
X
XuanYang-cn 已提交
288
		Role:      typeutil.QueryNodeRole,
C
cai.zhang 已提交
289 290
		StateCode: code,
	}
291
	stats.State = info
C
cai.zhang 已提交
292 293 294
	return stats, nil
}

G
godchen 已提交
295 296 297 298 299 300 301 302
func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.QueryTimeTickChannelName,
	}, nil
C
cai.zhang 已提交
303 304
}

G
godchen 已提交
305 306 307 308 309 310 311 312
func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.StatsChannelName,
	}, nil
C
cai.zhang 已提交
313 314
}

G
godchen 已提交
315
func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
316 317 318
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search message stream"
		status := &commonpb.Status{
319
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
320 321 322 323 324 325 326 327 328
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// add request channel
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
X
xige-16 已提交
329
	node.searchService.searchMsgStream.AsConsumer(consumeChannels, consumeSubName)
X
Xiangyu Wang 已提交
330
	log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
X
XuanYang-cn 已提交
331 332 333

	// add result channel
	producerChannels := []string{in.ResultChannelID}
X
xige-16 已提交
334
	node.searchService.searchResultMsgStream.AsProducer(producerChannels)
X
Xiangyu Wang 已提交
335
	log.Debug("querynode AsProducer: " + strings.Join(producerChannels, ", "))
X
XuanYang-cn 已提交
336 337

	status := &commonpb.Status{
338
		ErrorCode: commonpb.ErrorCode_Success,
X
XuanYang-cn 已提交
339 340 341 342
	}
	return status, nil
}

G
godchen 已提交
343
func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelRequest) (*commonpb.Status, error) {
X
Xiangyu Wang 已提交
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
	// if node.searchService == nil || node.searchService.searchMsgStream == nil {
	// 	errMsg := "null search service or null search result message stream"
	// 	status := &commonpb.Status{
	// 		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	// 		Reason:    errMsg,
	// 	}

	// 	return status, errors.New(errMsg)
	// }

	// searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
	// if !ok {
	// 	errMsg := "type assertion failed for search message stream"
	// 	status := &commonpb.Status{
	// 		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	// 		Reason:    errMsg,
	// 	}

	// 	return status, errors.New(errMsg)
	// }

	// resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
	// if !ok {
	// 	errMsg := "type assertion failed for search result message stream"
	// 	status := &commonpb.Status{
	// 		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	// 		Reason:    errMsg,
	// 	}

	// 	return status, errors.New(errMsg)
	// }

	// // remove request channel
	// consumeChannels := []string{in.RequestChannelID}
	// consumeSubName := Params.MsgChannelSubName
	// // TODO: searchStream.RemovePulsarConsumers(producerChannels)
	// searchStream.AsConsumer(consumeChannels, consumeSubName)

	// // remove result channel
	// producerChannels := []string{in.ResultChannelID}
	// // TODO: resultStream.RemovePulsarProducer(producerChannels)
	// resultStream.AsProducer(producerChannels)
X
XuanYang-cn 已提交
386 387

	status := &commonpb.Status{
388
		ErrorCode: commonpb.ErrorCode_Success,
X
XuanYang-cn 已提交
389 390 391 392
	}
	return status, nil
}

G
godchen 已提交
393
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
394 395
	log.Debug("starting WatchDmChannels ...", zap.String("ChannelIDs", fmt.Sprintln(in.ChannelIDs)))
	collectionID := in.CollectionID
396 397
	ds, err := node.getDataSyncService(collectionID)
	if err != nil || ds.dmStream == nil {
Z
zhenshan.cao 已提交
398
		errMsg := "null data sync service or null data manipulation stream, collectionID = " + fmt.Sprintln(collectionID)
X
XuanYang-cn 已提交
399
		status := &commonpb.Status{
400
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
401 402
			Reason:    errMsg,
		}
Z
zhenshan.cao 已提交
403
		log.Error(errMsg)
X
XuanYang-cn 已提交
404 405 406
		return status, errors.New(errMsg)
	}

407
	switch t := ds.dmStream.(type) {
X
Xiangyu Wang 已提交
408
	case *msgstream.MqTtMsgStream:
G
groot 已提交
409 410
	default:
		_ = t
X
XuanYang-cn 已提交
411 412
		errMsg := "type assertion failed for dm message stream"
		status := &commonpb.Status{
413
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
414 415
			Reason:    errMsg,
		}
Z
zhenshan.cao 已提交
416
		log.Error(errMsg)
X
XuanYang-cn 已提交
417 418 419
		return status, errors.New(errMsg)
	}

Z
zhenshan.cao 已提交
420 421 422 423 424
	getUniqueSubName := func() string {
		prefixName := Params.MsgChannelSubName
		return prefixName + "-" + strconv.FormatInt(collectionID, 10)
	}

X
XuanYang-cn 已提交
425 426
	// add request channel
	consumeChannels := in.ChannelIDs
Z
zhenshan.cao 已提交
427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
	toSeekInfo := make([]*internalpb.MsgPosition, 0)
	toDirSubChannels := make([]string, 0)

	consumeSubName := getUniqueSubName()

	for _, info := range in.Infos {
		if len(info.Pos.MsgID) == 0 {
			toDirSubChannels = append(toDirSubChannels, info.ChannelID)
			continue
		}
		info.Pos.MsgGroup = consumeSubName
		toSeekInfo = append(toSeekInfo, info.Pos)

		log.Debug("prevent inserting segments", zap.String("segmentIDs", fmt.Sprintln(info.ExcludedSegments)))
		err := node.replica.addExcludedSegments(collectionID, info.ExcludedSegments)
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
			log.Error(err.Error())
			return status, err
		}
	}

452
	ds.dmStream.AsConsumer(toDirSubChannels, consumeSubName)
Z
zhenshan.cao 已提交
453
	for _, pos := range toSeekInfo {
454
		err := ds.dmStream.Seek(pos)
Z
zhenshan.cao 已提交
455 456 457 458 459 460 461 462 463 464
		if err != nil {
			errMsg := "msgStream seek error :" + err.Error()
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    errMsg,
			}
			log.Error(errMsg)
			return status, errors.New(errMsg)
		}
	}
X
Xiangyu Wang 已提交
465
	log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
X
XuanYang-cn 已提交
466 467

	status := &commonpb.Status{
468
		ErrorCode: commonpb.ErrorCode_Success,
X
XuanYang-cn 已提交
469
	}
Z
zhenshan.cao 已提交
470
	log.Debug("WatchDmChannels done", zap.String("ChannelIDs", fmt.Sprintln(in.ChannelIDs)))
X
XuanYang-cn 已提交
471 472 473
	return status, nil
}

G
godchen 已提交
474
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentsRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
475
	// TODO: support db
Z
zhenshan.cao 已提交
476
	collectionID := in.CollectionID
C
cai.zhang 已提交
477 478
	partitionID := in.PartitionID
	segmentIDs := in.SegmentIDs
X
XuanYang-cn 已提交
479
	fieldIDs := in.FieldIDs
480
	schema := in.Schema
481

B
bigsheeper 已提交
482
	log.Debug("query node load segment", zap.String("loadSegmentRequest", fmt.Sprintln(in)))
X
xige-16 已提交
483 484

	status := &commonpb.Status{
485
		ErrorCode: commonpb.ErrorCode_Success,
X
xige-16 已提交
486
	}
487 488 489
	hasCollection := node.replica.hasCollection(collectionID)
	hasPartition := node.replica.hasPartition(partitionID)
	if !hasCollection {
Z
zhenshan.cao 已提交
490
		// loading init
491 492
		err := node.replica.addCollection(collectionID, schema)
		if err != nil {
493
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
494
			status.Reason = err.Error()
495 496
			return status, err
		}
Z
zhenshan.cao 已提交
497
		node.replica.initExcludedSegments(collectionID)
498 499 500 501 502 503 504 505 506 507
		newDS := newDataSyncService(node.queryNodeLoopCtx, node.replica, node.msFactory, collectionID)
		// ignore duplicated dataSyncService error
		node.addDataSyncService(collectionID, newDS)
		ds, err := node.getDataSyncService(collectionID)
		if err != nil {
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
			status.Reason = err.Error()
			return status, err
		}
		go ds.start()
B
bigsheeper 已提交
508
		node.searchService.startSearchCollection(collectionID)
509 510 511 512
	}
	if !hasPartition {
		err := node.replica.addPartition(collectionID, partitionID)
		if err != nil {
513
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
514
			status.Reason = err.Error()
515 516 517
			return status, err
		}
	}
518
	err := node.replica.enablePartition(partitionID)
C
cai.zhang 已提交
519
	if err != nil {
520
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
521 522 523 524 525 526 527 528
		status.Reason = err.Error()
		return status, err
	}

	if len(segmentIDs) == 0 {
		return status, nil
	}

Z
zhenshan.cao 已提交
529
	err = node.loadService.loadSegmentPassively(collectionID, partitionID, segmentIDs, fieldIDs)
X
xige-16 已提交
530
	if err != nil {
531
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
532 533 534
		status.Reason = err.Error()
		return status, err
	}
Z
zhenshan.cao 已提交
535 536

	log.Debug("LoadSegments done", zap.String("segmentIDs", fmt.Sprintln(in.SegmentIDs)))
X
xige-16 已提交
537
	return status, nil
C
cai.zhang 已提交
538 539
}

G
godchen 已提交
540
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {
541 542 543 544
	ds, err := node.getDataSyncService(in.CollectionID)
	if err == nil && ds != nil {
		ds.close()
		node.removeDataSyncService(in.CollectionID)
Z
zhenshan.cao 已提交
545 546 547 548
		node.replica.removeTSafe(in.CollectionID)
		node.replica.removeExcludedSegments(in.CollectionID)
	}

B
bigsheeper 已提交
549 550 551 552
	if node.searchService.hasSearchCollection(in.CollectionID) {
		node.searchService.stopSearchCollection(in.CollectionID)
	}

553
	err = node.replica.removeCollection(in.CollectionID)
B
bigsheeper 已提交
554 555
	if err != nil {
		status := &commonpb.Status{
556
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
bigsheeper 已提交
557 558 559 560 561
			Reason:    err.Error(),
		}
		return status, err
	}

Z
zhenshan.cao 已提交
562
	log.Debug("ReleaseCollection done", zap.Int64("collectionID", in.CollectionID))
B
bigsheeper 已提交
563
	return &commonpb.Status{
564
		ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
565 566 567
	}, nil
}

G
godchen 已提交
568
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.ReleasePartitionsRequest) (*commonpb.Status, error) {
B
bigsheeper 已提交
569
	status := &commonpb.Status{
570
		ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
571
	}
C
cai.zhang 已提交
572
	for _, id := range in.PartitionIDs {
B
bigsheeper 已提交
573
		err := node.loadService.segLoader.replica.removePartition(id)
C
cai.zhang 已提交
574
		if err != nil {
B
bigsheeper 已提交
575
			// not return, try to release all partitions
576
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
B
bigsheeper 已提交
577
			status.Reason = err.Error()
C
cai.zhang 已提交
578 579
		}
	}
B
bigsheeper 已提交
580 581
	return status, nil
}
C
cai.zhang 已提交
582

G
godchen 已提交
583
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
B
bigsheeper 已提交
584
	status := &commonpb.Status{
585
		ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
586
	}
C
cai.zhang 已提交
587
	for _, id := range in.SegmentIDs {
B
bigsheeper 已提交
588 589 590
		err2 := node.loadService.segLoader.replica.removeSegment(id)
		if err2 != nil {
			// not return, try to release all segments
591
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
B
bigsheeper 已提交
592
			status.Reason = err2.Error()
X
XuanYang-cn 已提交
593 594
		}
	}
B
bigsheeper 已提交
595
	return status, nil
X
XuanYang-cn 已提交
596
}
B
bigsheeper 已提交
597

G
godchen 已提交
598
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmentInfoRequest) (*queryPb.GetSegmentInfoResponse, error) {
B
bigsheeper 已提交
599 600 601 602 603 604
	infos := make([]*queryPb.SegmentInfo, 0)
	for _, id := range in.SegmentIDs {
		segment, err := node.replica.getSegmentByID(id)
		if err != nil {
			continue
		}
X
xige-16 已提交
605 606 607 608 609 610 611 612 613 614
		var indexName string
		var indexID int64
		// TODO:: segment has multi vec column
		if len(segment.indexInfos) > 0 {
			for fieldID := range segment.indexInfos {
				indexName = segment.getIndexName(fieldID)
				indexID = segment.getIndexID(fieldID)
				break
			}
		}
B
bigsheeper 已提交
615 616 617 618 619 620
		info := &queryPb.SegmentInfo{
			SegmentID:    segment.ID(),
			CollectionID: segment.collectionID,
			PartitionID:  segment.partitionID,
			MemSize:      segment.getMemSize(),
			NumRows:      segment.getRowCount(),
X
xige-16 已提交
621 622
			IndexName:    indexName,
			IndexID:      indexID,
B
bigsheeper 已提交
623 624 625
		}
		infos = append(infos, info)
	}
G
godchen 已提交
626
	return &queryPb.GetSegmentInfoResponse{
B
bigsheeper 已提交
627
		Status: &commonpb.Status{
628
			ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
629 630 631 632
		},
		Infos: infos,
	}, nil
}