query_node.go 16.7 KB
Newer Older
1
package querynode
B
bigsheeper 已提交
2

3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
8

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12

*/
B
bigsheeper 已提交
13
import "C"
14

B
bigsheeper 已提交
15
import (
16
	"context"
Z
zhenshan.cao 已提交
17
	"errors"
18
	"fmt"
S
sunby 已提交
19
	"math/rand"
Z
zhenshan.cao 已提交
20
	"strconv"
X
Xiangyu Wang 已提交
21
	"strings"
C
cai.zhang 已提交
22
	"sync/atomic"
S
sunby 已提交
23
	"time"
24

B
bigsheeper 已提交
25 26 27
	"go.uber.org/zap"

	"github.com/zilliztech/milvus-distributed/internal/log"
G
groot 已提交
28
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
X
Xiangyu Wang 已提交
29
	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
G
groot 已提交
30
	"github.com/zilliztech/milvus-distributed/internal/msgstream/rmqms"
31
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
G
godchen 已提交
32
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
Z
zhenshan.cao 已提交
33
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
34
	queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
Z
zhenshan.cao 已提交
35
	"github.com/zilliztech/milvus-distributed/internal/types"
36
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
B
bigsheeper 已提交
37 38 39
)

type QueryNode struct {
X
XuanYang-cn 已提交
40
	queryNodeLoopCtx    context.Context
41
	queryNodeLoopCancel context.CancelFunc
42

43
	QueryNodeID UniqueID
C
cai.zhang 已提交
44
	stateCode   atomic.Value
B
bigsheeper 已提交
45

46
	replica ReplicaInterface
B
bigsheeper 已提交
47

48
	// internal services
Z
zhenshan.cao 已提交
49 50 51 52 53
	dataSyncServices map[UniqueID]*dataSyncService
	metaService      *metaService
	searchService    *searchService
	loadService      *loadService
	statsService     *statsService
54

55
	// clients
T
ThreadDao 已提交
56 57 58 59
	masterService types.MasterService
	queryService  types.QueryService
	indexService  types.IndexService
	dataService   types.DataService
G
groot 已提交
60 61

	msFactory msgstream.Factory
B
bigsheeper 已提交
62
}
63

64
func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.Factory) *QueryNode {
S
sunby 已提交
65
	rand.Seed(time.Now().UnixNano())
X
XuanYang-cn 已提交
66
	ctx1, cancel := context.WithCancel(ctx)
C
cai.zhang 已提交
67
	node := &QueryNode{
68 69 70 71
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,
		QueryNodeID:         queryNodeID,

Z
zhenshan.cao 已提交
72 73 74 75
		dataSyncServices: make(map[UniqueID]*dataSyncService),
		metaService:      nil,
		searchService:    nil,
		statsService:     nil,
G
groot 已提交
76 77

		msFactory: factory,
78 79
	}

80
	node.replica = newCollectionReplica()
G
godchen 已提交
81
	node.UpdateStateCode(internalpb.StateCode_Abnormal)
C
cai.zhang 已提交
82 83
	return node
}
G
godchen 已提交
84

G
groot 已提交
85
func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *QueryNode {
86 87 88 89 90
	ctx1, cancel := context.WithCancel(ctx)
	node := &QueryNode{
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,

Z
zhenshan.cao 已提交
91 92 93 94
		dataSyncServices: make(map[UniqueID]*dataSyncService),
		metaService:      nil,
		searchService:    nil,
		statsService:     nil,
G
groot 已提交
95 96

		msFactory: factory,
97 98
	}

99
	node.replica = newCollectionReplica()
G
godchen 已提交
100
	node.UpdateStateCode(internalpb.StateCode_Abnormal)
101

102
	return node
B
bigsheeper 已提交
103 104
}

N
neza2017 已提交
105
func (node *QueryNode) Init() error {
G
godchen 已提交
106
	ctx := context.Background()
X
xige-16 已提交
107
	registerReq := &queryPb.RegisterNodeRequest{
108 109 110
		Base: &commonpb.MsgBase{
			SourceID: Params.QueryNodeID,
		},
C
cai.zhang 已提交
111 112 113 114 115
		Address: &commonpb.Address{
			Ip:   Params.QueryNodeIP,
			Port: Params.QueryNodePort,
		},
	}
116

T
ThreadDao 已提交
117
	resp, err := node.queryService.RegisterNode(ctx, registerReq)
C
cai.zhang 已提交
118 119 120
	if err != nil {
		panic(err)
	}
121
	if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
122 123 124 125 126 127 128 129 130 131 132 133 134 135
		panic(resp.Status.Reason)
	}

	for _, kv := range resp.InitParams.StartParams {
		switch kv.Key {
		case "StatsChannelName":
			Params.StatsChannelName = kv.Value
		case "TimeTickChannelName":
			Params.QueryTimeTickChannelName = kv.Value
		case "QueryChannelName":
			Params.SearchChannelNames = append(Params.SearchChannelNames, kv.Value)
		case "QueryResultChannelName":
			Params.SearchResultChannelNames = append(Params.SearchResultChannelNames, kv.Value)
		default:
S
sunby 已提交
136
			return fmt.Errorf("Invalid key: %v", kv.Key)
137
		}
C
cai.zhang 已提交
138 139
	}

B
bigsheeper 已提交
140
	log.Debug("", zap.Int64("QueryNodeID", Params.QueryNodeID))
C
cai.zhang 已提交
141

T
ThreadDao 已提交
142
	if node.masterService == nil {
B
bigsheeper 已提交
143
		log.Error("null master service detected")
144 145
	}

T
ThreadDao 已提交
146
	if node.indexService == nil {
B
bigsheeper 已提交
147
		log.Error("null index service detected")
148 149
	}

T
ThreadDao 已提交
150
	if node.dataService == nil {
B
bigsheeper 已提交
151
		log.Error("null data service detected")
152 153
	}

154 155 156 157
	return nil
}

func (node *QueryNode) Start() error {
G
groot 已提交
158 159 160 161 162 163 164 165 166 167
	var err error
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = node.msFactory.SetParams(m)
	if err != nil {
		return err
	}

X
XuanYang-cn 已提交
168
	// init services and manager
G
groot 已提交
169
	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, node.msFactory)
B
bigsheeper 已提交
170
	//node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
G
groot 已提交
171

Z
zhenshan.cao 已提交
172
	node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterService, node.dataService, node.indexService, node.replica)
G
groot 已提交
173
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan, node.msFactory)
B
bigsheeper 已提交
174

X
XuanYang-cn 已提交
175
	// start services
N
neza2017 已提交
176
	go node.searchService.start()
B
bigsheeper 已提交
177
	//go node.metaService.start()
178
	go node.loadService.start()
X
XuanYang-cn 已提交
179
	go node.statsService.start()
G
godchen 已提交
180
	node.UpdateStateCode(internalpb.StateCode_Healthy)
N
neza2017 已提交
181
	return nil
B
bigsheeper 已提交
182
}
B
bigsheeper 已提交
183

N
neza2017 已提交
184
func (node *QueryNode) Stop() error {
G
godchen 已提交
185
	node.UpdateStateCode(internalpb.StateCode_Abnormal)
X
XuanYang-cn 已提交
186 187
	node.queryNodeLoopCancel()

B
bigsheeper 已提交
188
	// free collectionReplica
X
XuanYang-cn 已提交
189
	node.replica.freeAll()
B
bigsheeper 已提交
190 191

	// close services
Z
zhenshan.cao 已提交
192 193 194 195
	for _, dsService := range node.dataSyncServices {
		if dsService != nil {
			dsService.close()
		}
B
bigsheeper 已提交
196 197
	}
	if node.searchService != nil {
X
XuanYang-cn 已提交
198
		node.searchService.close()
B
bigsheeper 已提交
199
	}
200 201
	if node.loadService != nil {
		node.loadService.close()
B
bigsheeper 已提交
202
	}
B
bigsheeper 已提交
203
	if node.statsService != nil {
X
XuanYang-cn 已提交
204
		node.statsService.close()
B
bigsheeper 已提交
205
	}
N
neza2017 已提交
206
	return nil
X
XuanYang-cn 已提交
207 208
}

G
godchen 已提交
209
func (node *QueryNode) UpdateStateCode(code internalpb.StateCode) {
210 211 212
	node.stateCode.Store(code)
}

T
ThreadDao 已提交
213
func (node *QueryNode) SetMasterService(master types.MasterService) error {
B
bigsheeper 已提交
214 215 216
	if master == nil {
		return errors.New("null master service interface")
	}
T
ThreadDao 已提交
217
	node.masterService = master
B
bigsheeper 已提交
218 219 220
	return nil
}

T
ThreadDao 已提交
221
func (node *QueryNode) SetQueryService(query types.QueryService) error {
222
	if query == nil {
B
bigsheeper 已提交
223
		return errors.New("null query service interface")
224
	}
T
ThreadDao 已提交
225
	node.queryService = query
226 227 228
	return nil
}

T
ThreadDao 已提交
229
func (node *QueryNode) SetIndexService(index types.IndexService) error {
230 231 232
	if index == nil {
		return errors.New("null index service interface")
	}
T
ThreadDao 已提交
233
	node.indexService = index
234 235 236
	return nil
}

T
ThreadDao 已提交
237
func (node *QueryNode) SetDataService(data types.DataService) error {
238 239 240
	if data == nil {
		return errors.New("null data service interface")
	}
T
ThreadDao 已提交
241
	node.dataService = data
242 243 244
	return nil
}

G
godchen 已提交
245 246
func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
	stats := &internalpb.ComponentStates{
247
		Status: &commonpb.Status{
248
			ErrorCode: commonpb.ErrorCode_Success,
249 250
		},
	}
G
godchen 已提交
251
	code, ok := node.stateCode.Load().(internalpb.StateCode)
C
cai.zhang 已提交
252
	if !ok {
253 254
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
255
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
256 257 258
			Reason:    errMsg,
		}
		return stats, errors.New(errMsg)
C
cai.zhang 已提交
259
	}
G
godchen 已提交
260
	info := &internalpb.ComponentInfo{
C
cai.zhang 已提交
261
		NodeID:    Params.QueryNodeID,
X
XuanYang-cn 已提交
262
		Role:      typeutil.QueryNodeRole,
C
cai.zhang 已提交
263 264
		StateCode: code,
	}
265
	stats.State = info
C
cai.zhang 已提交
266 267 268
	return stats, nil
}

G
godchen 已提交
269 270 271 272 273 274 275 276
func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.QueryTimeTickChannelName,
	}, nil
C
cai.zhang 已提交
277 278
}

G
godchen 已提交
279 280 281 282 283 284 285 286
func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.StatsChannelName,
	}, nil
C
cai.zhang 已提交
287 288
}

G
godchen 已提交
289
func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
290 291 292
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search message stream"
		status := &commonpb.Status{
293
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
294 295 296 297 298 299 300 301 302
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// add request channel
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
X
xige-16 已提交
303
	node.searchService.searchMsgStream.AsConsumer(consumeChannels, consumeSubName)
X
Xiangyu Wang 已提交
304
	log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
X
XuanYang-cn 已提交
305 306 307

	// add result channel
	producerChannels := []string{in.ResultChannelID}
X
xige-16 已提交
308
	node.searchService.searchResultMsgStream.AsProducer(producerChannels)
X
Xiangyu Wang 已提交
309
	log.Debug("querynode AsProducer: " + strings.Join(producerChannels, ", "))
X
XuanYang-cn 已提交
310 311

	status := &commonpb.Status{
312
		ErrorCode: commonpb.ErrorCode_Success,
X
XuanYang-cn 已提交
313 314 315 316
	}
	return status, nil
}

G
godchen 已提交
317
func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
318 319 320
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search result message stream"
		status := &commonpb.Status{
321
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
322 323 324 325 326 327
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
328
	searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
329 330 331
	if !ok {
		errMsg := "type assertion failed for search message stream"
		status := &commonpb.Status{
332
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
333 334 335 336 337 338
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
339
	resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
340 341 342
	if !ok {
		errMsg := "type assertion failed for search result message stream"
		status := &commonpb.Status{
343
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
344 345 346 347 348 349 350 351 352 353
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// remove request channel
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
	// TODO: searchStream.RemovePulsarConsumers(producerChannels)
Z
zhenshan.cao 已提交
354
	searchStream.AsConsumer(consumeChannels, consumeSubName)
X
XuanYang-cn 已提交
355 356 357 358

	// remove result channel
	producerChannels := []string{in.ResultChannelID}
	// TODO: resultStream.RemovePulsarProducer(producerChannels)
Z
zhenshan.cao 已提交
359
	resultStream.AsProducer(producerChannels)
X
XuanYang-cn 已提交
360 361

	status := &commonpb.Status{
362
		ErrorCode: commonpb.ErrorCode_Success,
X
XuanYang-cn 已提交
363 364 365 366
	}
	return status, nil
}

G
godchen 已提交
367
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
368 369 370 371 372
	log.Debug("starting WatchDmChannels ...", zap.String("ChannelIDs", fmt.Sprintln(in.ChannelIDs)))
	collectionID := in.CollectionID
	service, ok := node.dataSyncServices[collectionID]
	if !ok || service.dmStream == nil {
		errMsg := "null data sync service or null data manipulation stream, collectionID = " + fmt.Sprintln(collectionID)
X
XuanYang-cn 已提交
373
		status := &commonpb.Status{
374
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
375 376
			Reason:    errMsg,
		}
Z
zhenshan.cao 已提交
377
		log.Error(errMsg)
X
XuanYang-cn 已提交
378 379 380
		return status, errors.New(errMsg)
	}

Z
zhenshan.cao 已提交
381
	switch t := service.dmStream.(type) {
G
groot 已提交
382 383 384 385
	case *pulsarms.PulsarTtMsgStream:
	case *rmqms.RmqTtMsgStream:
	default:
		_ = t
X
XuanYang-cn 已提交
386 387
		errMsg := "type assertion failed for dm message stream"
		status := &commonpb.Status{
388
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
389 390
			Reason:    errMsg,
		}
Z
zhenshan.cao 已提交
391
		log.Error(errMsg)
X
XuanYang-cn 已提交
392 393 394
		return status, errors.New(errMsg)
	}

Z
zhenshan.cao 已提交
395 396 397 398 399
	getUniqueSubName := func() string {
		prefixName := Params.MsgChannelSubName
		return prefixName + "-" + strconv.FormatInt(collectionID, 10)
	}

X
XuanYang-cn 已提交
400 401
	// add request channel
	consumeChannels := in.ChannelIDs
Z
zhenshan.cao 已提交
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
	toSeekInfo := make([]*internalpb.MsgPosition, 0)
	toDirSubChannels := make([]string, 0)

	consumeSubName := getUniqueSubName()

	for _, info := range in.Infos {
		if len(info.Pos.MsgID) == 0 {
			toDirSubChannels = append(toDirSubChannels, info.ChannelID)
			continue
		}
		info.Pos.MsgGroup = consumeSubName
		toSeekInfo = append(toSeekInfo, info.Pos)

		log.Debug("prevent inserting segments", zap.String("segmentIDs", fmt.Sprintln(info.ExcludedSegments)))
		err := node.replica.addExcludedSegments(collectionID, info.ExcludedSegments)
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
			log.Error(err.Error())
			return status, err
		}
	}

	service.dmStream.AsConsumer(toDirSubChannels, consumeSubName)
	for _, pos := range toSeekInfo {
		err := service.dmStream.Seek(pos)
		if err != nil {
			errMsg := "msgStream seek error :" + err.Error()
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    errMsg,
			}
			log.Error(errMsg)
			return status, errors.New(errMsg)
		}
	}
X
Xiangyu Wang 已提交
440
	log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
X
XuanYang-cn 已提交
441 442

	status := &commonpb.Status{
443
		ErrorCode: commonpb.ErrorCode_Success,
X
XuanYang-cn 已提交
444
	}
Z
zhenshan.cao 已提交
445
	log.Debug("WatchDmChannels done", zap.String("ChannelIDs", fmt.Sprintln(in.ChannelIDs)))
X
XuanYang-cn 已提交
446 447 448
	return status, nil
}

G
godchen 已提交
449
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentsRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
450
	// TODO: support db
Z
zhenshan.cao 已提交
451
	collectionID := in.CollectionID
C
cai.zhang 已提交
452 453
	partitionID := in.PartitionID
	segmentIDs := in.SegmentIDs
X
XuanYang-cn 已提交
454
	fieldIDs := in.FieldIDs
455
	schema := in.Schema
456

B
bigsheeper 已提交
457
	log.Debug("query node load segment", zap.String("loadSegmentRequest", fmt.Sprintln(in)))
X
xige-16 已提交
458 459

	status := &commonpb.Status{
460
		ErrorCode: commonpb.ErrorCode_Success,
X
xige-16 已提交
461
	}
462 463 464
	hasCollection := node.replica.hasCollection(collectionID)
	hasPartition := node.replica.hasPartition(partitionID)
	if !hasCollection {
Z
zhenshan.cao 已提交
465
		// loading init
466 467
		err := node.replica.addCollection(collectionID, schema)
		if err != nil {
468
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
469
			status.Reason = err.Error()
470 471
			return status, err
		}
Z
zhenshan.cao 已提交
472 473 474 475 476
		node.replica.initExcludedSegments(collectionID)
		node.dataSyncServices[collectionID] = newDataSyncService(node.queryNodeLoopCtx, node.replica, node.msFactory, collectionID)
		go node.dataSyncServices[collectionID].start()
		node.replica.addTSafe(collectionID)
		node.searchService.register(collectionID)
477 478 479 480
	}
	if !hasPartition {
		err := node.replica.addPartition(collectionID, partitionID)
		if err != nil {
481
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
482
			status.Reason = err.Error()
483 484 485
			return status, err
		}
	}
486
	err := node.replica.enablePartition(partitionID)
C
cai.zhang 已提交
487
	if err != nil {
488
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
489 490 491 492 493 494 495 496
		status.Reason = err.Error()
		return status, err
	}

	if len(segmentIDs) == 0 {
		return status, nil
	}

Z
zhenshan.cao 已提交
497
	err = node.loadService.loadSegmentPassively(collectionID, partitionID, segmentIDs, fieldIDs)
X
xige-16 已提交
498
	if err != nil {
499
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
500 501 502
		status.Reason = err.Error()
		return status, err
	}
Z
zhenshan.cao 已提交
503 504

	log.Debug("LoadSegments done", zap.String("segmentIDs", fmt.Sprintln(in.SegmentIDs)))
X
xige-16 已提交
505
	return status, nil
C
cai.zhang 已提交
506 507
}

G
godchen 已提交
508
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
509 510 511 512 513 514 515 516 517 518
	if _, ok := node.dataSyncServices[in.CollectionID]; ok {
		node.dataSyncServices[in.CollectionID].close()
		delete(node.dataSyncServices, in.CollectionID)
		node.searchService.tSafeMutex.Lock()
		delete(node.searchService.tSafeWatcher, in.CollectionID)
		node.searchService.tSafeMutex.Unlock()
		node.replica.removeTSafe(in.CollectionID)
		node.replica.removeExcludedSegments(in.CollectionID)
	}

B
bigsheeper 已提交
519 520 521
	err := node.replica.removeCollection(in.CollectionID)
	if err != nil {
		status := &commonpb.Status{
522
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
bigsheeper 已提交
523 524 525 526 527
			Reason:    err.Error(),
		}
		return status, err
	}

Z
zhenshan.cao 已提交
528
	log.Debug("ReleaseCollection done", zap.Int64("collectionID", in.CollectionID))
B
bigsheeper 已提交
529
	return &commonpb.Status{
530
		ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
531 532 533
	}, nil
}

G
godchen 已提交
534
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.ReleasePartitionsRequest) (*commonpb.Status, error) {
B
bigsheeper 已提交
535
	status := &commonpb.Status{
536
		ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
537
	}
C
cai.zhang 已提交
538
	for _, id := range in.PartitionIDs {
B
bigsheeper 已提交
539
		err := node.loadService.segLoader.replica.removePartition(id)
C
cai.zhang 已提交
540
		if err != nil {
B
bigsheeper 已提交
541
			// not return, try to release all partitions
542
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
B
bigsheeper 已提交
543
			status.Reason = err.Error()
C
cai.zhang 已提交
544 545
		}
	}
B
bigsheeper 已提交
546 547
	return status, nil
}
C
cai.zhang 已提交
548

G
godchen 已提交
549
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
B
bigsheeper 已提交
550
	status := &commonpb.Status{
551
		ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
552
	}
C
cai.zhang 已提交
553
	for _, id := range in.SegmentIDs {
B
bigsheeper 已提交
554 555 556
		err2 := node.loadService.segLoader.replica.removeSegment(id)
		if err2 != nil {
			// not return, try to release all segments
557
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
B
bigsheeper 已提交
558
			status.Reason = err2.Error()
X
XuanYang-cn 已提交
559 560
		}
	}
B
bigsheeper 已提交
561
	return status, nil
X
XuanYang-cn 已提交
562
}
B
bigsheeper 已提交
563

G
godchen 已提交
564
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmentInfoRequest) (*queryPb.GetSegmentInfoResponse, error) {
B
bigsheeper 已提交
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
	infos := make([]*queryPb.SegmentInfo, 0)
	for _, id := range in.SegmentIDs {
		segment, err := node.replica.getSegmentByID(id)
		if err != nil {
			continue
		}
		info := &queryPb.SegmentInfo{
			SegmentID:    segment.ID(),
			CollectionID: segment.collectionID,
			PartitionID:  segment.partitionID,
			MemSize:      segment.getMemSize(),
			NumRows:      segment.getRowCount(),
			IndexName:    segment.getIndexName(),
			IndexID:      segment.getIndexID(),
		}
		infos = append(infos, info)
	}
G
godchen 已提交
582
	return &queryPb.GetSegmentInfoResponse{
B
bigsheeper 已提交
583
		Status: &commonpb.Status{
584
			ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
585 586 587 588
		},
		Infos: infos,
	}, nil
}