query_node.go 17.7 KB
Newer Older
1
package querynode
B
bigsheeper 已提交
2

3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
8

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12

*/
B
bigsheeper 已提交
13
import "C"
14

B
bigsheeper 已提交
15
import (
16
	"context"
Z
zhenshan.cao 已提交
17
	"errors"
18
	"fmt"
S
sunby 已提交
19
	"math/rand"
Z
zhenshan.cao 已提交
20
	"strconv"
X
Xiangyu Wang 已提交
21
	"strings"
22
	"sync"
C
cai.zhang 已提交
23
	"sync/atomic"
S
sunby 已提交
24
	"time"
25

B
bigsheeper 已提交
26 27 28
	"go.uber.org/zap"

	"github.com/zilliztech/milvus-distributed/internal/log"
G
groot 已提交
29
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
Y
yukun 已提交
30
	"github.com/zilliztech/milvus-distributed/internal/msgstream/ms"
X
Xiangyu Wang 已提交
31
	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
32
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
G
godchen 已提交
33
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
Z
zhenshan.cao 已提交
34
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
35
	queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
Z
zhenshan.cao 已提交
36
	"github.com/zilliztech/milvus-distributed/internal/types"
37
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
B
bigsheeper 已提交
38 39 40
)

type QueryNode struct {
X
XuanYang-cn 已提交
41
	queryNodeLoopCtx    context.Context
42
	queryNodeLoopCancel context.CancelFunc
43

44
	QueryNodeID UniqueID
C
cai.zhang 已提交
45
	stateCode   atomic.Value
B
bigsheeper 已提交
46

47
	replica ReplicaInterface
B
bigsheeper 已提交
48

49
	// internal services
Z
zhenshan.cao 已提交
50 51 52 53
	metaService      *metaService
	searchService    *searchService
	loadService      *loadService
	statsService     *statsService
54 55
	dsServicesMu     sync.Mutex // guards dataSyncServices
	dataSyncServices map[UniqueID]*dataSyncService
56

57
	// clients
T
ThreadDao 已提交
58 59 60 61
	masterService types.MasterService
	queryService  types.QueryService
	indexService  types.IndexService
	dataService   types.DataService
G
groot 已提交
62 63

	msFactory msgstream.Factory
B
bigsheeper 已提交
64
}
65

66
func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.Factory) *QueryNode {
S
sunby 已提交
67
	rand.Seed(time.Now().UnixNano())
X
XuanYang-cn 已提交
68
	ctx1, cancel := context.WithCancel(ctx)
C
cai.zhang 已提交
69
	node := &QueryNode{
70 71 72 73
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,
		QueryNodeID:         queryNodeID,

Z
zhenshan.cao 已提交
74 75 76 77
		dataSyncServices: make(map[UniqueID]*dataSyncService),
		metaService:      nil,
		searchService:    nil,
		statsService:     nil,
G
groot 已提交
78 79

		msFactory: factory,
80 81
	}

82
	node.replica = newCollectionReplica()
G
godchen 已提交
83
	node.UpdateStateCode(internalpb.StateCode_Abnormal)
C
cai.zhang 已提交
84 85
	return node
}
G
godchen 已提交
86

G
groot 已提交
87
func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *QueryNode {
88 89 90 91 92
	ctx1, cancel := context.WithCancel(ctx)
	node := &QueryNode{
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,

Z
zhenshan.cao 已提交
93 94 95 96
		dataSyncServices: make(map[UniqueID]*dataSyncService),
		metaService:      nil,
		searchService:    nil,
		statsService:     nil,
G
groot 已提交
97 98

		msFactory: factory,
99 100
	}

101
	node.replica = newCollectionReplica()
G
godchen 已提交
102
	node.UpdateStateCode(internalpb.StateCode_Abnormal)
103

104
	return node
B
bigsheeper 已提交
105 106
}

N
neza2017 已提交
107
func (node *QueryNode) Init() error {
G
godchen 已提交
108
	ctx := context.Background()
X
xige-16 已提交
109
	registerReq := &queryPb.RegisterNodeRequest{
110 111 112
		Base: &commonpb.MsgBase{
			SourceID: Params.QueryNodeID,
		},
C
cai.zhang 已提交
113 114 115 116 117
		Address: &commonpb.Address{
			Ip:   Params.QueryNodeIP,
			Port: Params.QueryNodePort,
		},
	}
118

T
ThreadDao 已提交
119
	resp, err := node.queryService.RegisterNode(ctx, registerReq)
C
cai.zhang 已提交
120 121 122
	if err != nil {
		panic(err)
	}
123
	if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
124 125 126 127 128 129 130 131 132 133 134 135 136 137
		panic(resp.Status.Reason)
	}

	for _, kv := range resp.InitParams.StartParams {
		switch kv.Key {
		case "StatsChannelName":
			Params.StatsChannelName = kv.Value
		case "TimeTickChannelName":
			Params.QueryTimeTickChannelName = kv.Value
		case "QueryChannelName":
			Params.SearchChannelNames = append(Params.SearchChannelNames, kv.Value)
		case "QueryResultChannelName":
			Params.SearchResultChannelNames = append(Params.SearchResultChannelNames, kv.Value)
		default:
S
sunby 已提交
138
			return fmt.Errorf("Invalid key: %v", kv.Key)
139
		}
C
cai.zhang 已提交
140 141
	}

B
bigsheeper 已提交
142
	log.Debug("", zap.Int64("QueryNodeID", Params.QueryNodeID))
C
cai.zhang 已提交
143

T
ThreadDao 已提交
144
	if node.masterService == nil {
B
bigsheeper 已提交
145
		log.Error("null master service detected")
146 147
	}

T
ThreadDao 已提交
148
	if node.indexService == nil {
B
bigsheeper 已提交
149
		log.Error("null index service detected")
150 151
	}

T
ThreadDao 已提交
152
	if node.dataService == nil {
B
bigsheeper 已提交
153
		log.Error("null data service detected")
154 155
	}

156 157 158 159
	return nil
}

func (node *QueryNode) Start() error {
G
groot 已提交
160 161 162 163 164 165 166 167 168 169
	var err error
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = node.msFactory.SetParams(m)
	if err != nil {
		return err
	}

X
XuanYang-cn 已提交
170
	// init services and manager
G
groot 已提交
171
	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, node.msFactory)
B
bigsheeper 已提交
172
	//node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
G
groot 已提交
173

Z
zhenshan.cao 已提交
174
	node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterService, node.dataService, node.indexService, node.replica)
G
groot 已提交
175
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan, node.msFactory)
B
bigsheeper 已提交
176

X
XuanYang-cn 已提交
177
	// start services
N
neza2017 已提交
178
	go node.searchService.start()
B
bigsheeper 已提交
179
	//go node.metaService.start()
180
	go node.loadService.start()
X
XuanYang-cn 已提交
181
	go node.statsService.start()
G
godchen 已提交
182
	node.UpdateStateCode(internalpb.StateCode_Healthy)
N
neza2017 已提交
183
	return nil
B
bigsheeper 已提交
184
}
B
bigsheeper 已提交
185

N
neza2017 已提交
186
func (node *QueryNode) Stop() error {
G
godchen 已提交
187
	node.UpdateStateCode(internalpb.StateCode_Abnormal)
X
XuanYang-cn 已提交
188 189
	node.queryNodeLoopCancel()

B
bigsheeper 已提交
190
	// free collectionReplica
X
XuanYang-cn 已提交
191
	node.replica.freeAll()
B
bigsheeper 已提交
192 193

	// close services
Z
zhenshan.cao 已提交
194 195 196 197
	for _, dsService := range node.dataSyncServices {
		if dsService != nil {
			dsService.close()
		}
B
bigsheeper 已提交
198 199
	}
	if node.searchService != nil {
X
XuanYang-cn 已提交
200
		node.searchService.close()
B
bigsheeper 已提交
201
	}
202 203
	if node.loadService != nil {
		node.loadService.close()
B
bigsheeper 已提交
204
	}
B
bigsheeper 已提交
205
	if node.statsService != nil {
X
XuanYang-cn 已提交
206
		node.statsService.close()
B
bigsheeper 已提交
207
	}
N
neza2017 已提交
208
	return nil
X
XuanYang-cn 已提交
209 210
}

G
godchen 已提交
211
func (node *QueryNode) UpdateStateCode(code internalpb.StateCode) {
212 213 214
	node.stateCode.Store(code)
}

T
ThreadDao 已提交
215
func (node *QueryNode) SetMasterService(master types.MasterService) error {
B
bigsheeper 已提交
216 217 218
	if master == nil {
		return errors.New("null master service interface")
	}
T
ThreadDao 已提交
219
	node.masterService = master
B
bigsheeper 已提交
220 221 222
	return nil
}

T
ThreadDao 已提交
223
func (node *QueryNode) SetQueryService(query types.QueryService) error {
224
	if query == nil {
B
bigsheeper 已提交
225
		return errors.New("null query service interface")
226
	}
T
ThreadDao 已提交
227
	node.queryService = query
228 229 230
	return nil
}

T
ThreadDao 已提交
231
func (node *QueryNode) SetIndexService(index types.IndexService) error {
232 233 234
	if index == nil {
		return errors.New("null index service interface")
	}
T
ThreadDao 已提交
235
	node.indexService = index
236 237 238
	return nil
}

T
ThreadDao 已提交
239
func (node *QueryNode) SetDataService(data types.DataService) error {
240 241 242
	if data == nil {
		return errors.New("null data service interface")
	}
T
ThreadDao 已提交
243
	node.dataService = data
244 245 246
	return nil
}

247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
func (node *QueryNode) getDataSyncService(collectionID UniqueID) (*dataSyncService, error) {
	node.dsServicesMu.Lock()
	defer node.dsServicesMu.Unlock()
	ds, ok := node.dataSyncServices[collectionID]
	if !ok {
		return nil, errors.New("cannot found dataSyncService, collectionID =" + fmt.Sprintln(collectionID))
	}
	return ds, nil
}

func (node *QueryNode) addDataSyncService(collectionID UniqueID, ds *dataSyncService) error {
	node.dsServicesMu.Lock()
	defer node.dsServicesMu.Unlock()
	if _, ok := node.dataSyncServices[collectionID]; ok {
		return errors.New("dataSyncService has been existed, collectionID =" + fmt.Sprintln(collectionID))
	}
	node.dataSyncServices[collectionID] = ds
	return nil
}

func (node *QueryNode) removeDataSyncService(collectionID UniqueID) {
	node.dsServicesMu.Lock()
	defer node.dsServicesMu.Unlock()
	delete(node.dataSyncServices, collectionID)
}

G
godchen 已提交
273 274
func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
	stats := &internalpb.ComponentStates{
275
		Status: &commonpb.Status{
276
			ErrorCode: commonpb.ErrorCode_Success,
277 278
		},
	}
G
godchen 已提交
279
	code, ok := node.stateCode.Load().(internalpb.StateCode)
C
cai.zhang 已提交
280
	if !ok {
281 282
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
283
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
284 285 286
			Reason:    errMsg,
		}
		return stats, errors.New(errMsg)
C
cai.zhang 已提交
287
	}
G
godchen 已提交
288
	info := &internalpb.ComponentInfo{
C
cai.zhang 已提交
289
		NodeID:    Params.QueryNodeID,
X
XuanYang-cn 已提交
290
		Role:      typeutil.QueryNodeRole,
C
cai.zhang 已提交
291 292
		StateCode: code,
	}
293
	stats.State = info
C
cai.zhang 已提交
294 295 296
	return stats, nil
}

G
godchen 已提交
297 298 299 300 301 302 303 304
func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.QueryTimeTickChannelName,
	}, nil
C
cai.zhang 已提交
305 306
}

G
godchen 已提交
307 308 309 310 311 312 313 314
func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.StatsChannelName,
	}, nil
C
cai.zhang 已提交
315 316
}

G
godchen 已提交
317
func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
318 319 320
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search message stream"
		status := &commonpb.Status{
321
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
322 323 324 325 326 327 328 329 330
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// add request channel
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
X
xige-16 已提交
331
	node.searchService.searchMsgStream.AsConsumer(consumeChannels, consumeSubName)
X
Xiangyu Wang 已提交
332
	log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
X
XuanYang-cn 已提交
333 334 335

	// add result channel
	producerChannels := []string{in.ResultChannelID}
X
xige-16 已提交
336
	node.searchService.searchResultMsgStream.AsProducer(producerChannels)
X
Xiangyu Wang 已提交
337
	log.Debug("querynode AsProducer: " + strings.Join(producerChannels, ", "))
X
XuanYang-cn 已提交
338 339

	status := &commonpb.Status{
340
		ErrorCode: commonpb.ErrorCode_Success,
X
XuanYang-cn 已提交
341 342 343 344
	}
	return status, nil
}

G
godchen 已提交
345
func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
346 347 348
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search result message stream"
		status := &commonpb.Status{
349
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
350 351 352 353 354 355
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
356
	searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
357 358 359
	if !ok {
		errMsg := "type assertion failed for search message stream"
		status := &commonpb.Status{
360
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
361 362 363 364 365 366
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
367
	resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
368 369 370
	if !ok {
		errMsg := "type assertion failed for search result message stream"
		status := &commonpb.Status{
371
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
372 373 374 375 376 377 378 379 380 381
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// remove request channel
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
	// TODO: searchStream.RemovePulsarConsumers(producerChannels)
Z
zhenshan.cao 已提交
382
	searchStream.AsConsumer(consumeChannels, consumeSubName)
X
XuanYang-cn 已提交
383 384 385 386

	// remove result channel
	producerChannels := []string{in.ResultChannelID}
	// TODO: resultStream.RemovePulsarProducer(producerChannels)
Z
zhenshan.cao 已提交
387
	resultStream.AsProducer(producerChannels)
X
XuanYang-cn 已提交
388 389

	status := &commonpb.Status{
390
		ErrorCode: commonpb.ErrorCode_Success,
X
XuanYang-cn 已提交
391 392 393 394
	}
	return status, nil
}

G
godchen 已提交
395
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
Z
zhenshan.cao 已提交
396 397
	log.Debug("starting WatchDmChannels ...", zap.String("ChannelIDs", fmt.Sprintln(in.ChannelIDs)))
	collectionID := in.CollectionID
398 399
	ds, err := node.getDataSyncService(collectionID)
	if err != nil || ds.dmStream == nil {
Z
zhenshan.cao 已提交
400
		errMsg := "null data sync service or null data manipulation stream, collectionID = " + fmt.Sprintln(collectionID)
X
XuanYang-cn 已提交
401
		status := &commonpb.Status{
402
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
403 404
			Reason:    errMsg,
		}
Z
zhenshan.cao 已提交
405
		log.Error(errMsg)
X
XuanYang-cn 已提交
406 407 408
		return status, errors.New(errMsg)
	}

409
	switch t := ds.dmStream.(type) {
Y
yukun 已提交
410
	case *ms.TtMsgStream:
G
groot 已提交
411 412
	default:
		_ = t
X
XuanYang-cn 已提交
413 414
		errMsg := "type assertion failed for dm message stream"
		status := &commonpb.Status{
415
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
XuanYang-cn 已提交
416 417
			Reason:    errMsg,
		}
Z
zhenshan.cao 已提交
418
		log.Error(errMsg)
X
XuanYang-cn 已提交
419 420 421
		return status, errors.New(errMsg)
	}

Z
zhenshan.cao 已提交
422 423 424 425 426
	getUniqueSubName := func() string {
		prefixName := Params.MsgChannelSubName
		return prefixName + "-" + strconv.FormatInt(collectionID, 10)
	}

X
XuanYang-cn 已提交
427 428
	// add request channel
	consumeChannels := in.ChannelIDs
Z
zhenshan.cao 已提交
429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
	toSeekInfo := make([]*internalpb.MsgPosition, 0)
	toDirSubChannels := make([]string, 0)

	consumeSubName := getUniqueSubName()

	for _, info := range in.Infos {
		if len(info.Pos.MsgID) == 0 {
			toDirSubChannels = append(toDirSubChannels, info.ChannelID)
			continue
		}
		info.Pos.MsgGroup = consumeSubName
		toSeekInfo = append(toSeekInfo, info.Pos)

		log.Debug("prevent inserting segments", zap.String("segmentIDs", fmt.Sprintln(info.ExcludedSegments)))
		err := node.replica.addExcludedSegments(collectionID, info.ExcludedSegments)
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
			log.Error(err.Error())
			return status, err
		}
	}

454
	ds.dmStream.AsConsumer(toDirSubChannels, consumeSubName)
Z
zhenshan.cao 已提交
455
	for _, pos := range toSeekInfo {
456
		err := ds.dmStream.Seek(pos)
Z
zhenshan.cao 已提交
457 458 459 460 461 462 463 464 465 466
		if err != nil {
			errMsg := "msgStream seek error :" + err.Error()
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    errMsg,
			}
			log.Error(errMsg)
			return status, errors.New(errMsg)
		}
	}
X
Xiangyu Wang 已提交
467
	log.Debug("querynode AsConsumer: " + strings.Join(consumeChannels, ", ") + " : " + consumeSubName)
X
XuanYang-cn 已提交
468 469

	status := &commonpb.Status{
470
		ErrorCode: commonpb.ErrorCode_Success,
X
XuanYang-cn 已提交
471
	}
Z
zhenshan.cao 已提交
472
	log.Debug("WatchDmChannels done", zap.String("ChannelIDs", fmt.Sprintln(in.ChannelIDs)))
X
XuanYang-cn 已提交
473 474 475
	return status, nil
}

G
godchen 已提交
476
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentsRequest) (*commonpb.Status, error) {
X
XuanYang-cn 已提交
477
	// TODO: support db
Z
zhenshan.cao 已提交
478
	collectionID := in.CollectionID
C
cai.zhang 已提交
479 480
	partitionID := in.PartitionID
	segmentIDs := in.SegmentIDs
X
XuanYang-cn 已提交
481
	fieldIDs := in.FieldIDs
482
	schema := in.Schema
483

B
bigsheeper 已提交
484
	log.Debug("query node load segment", zap.String("loadSegmentRequest", fmt.Sprintln(in)))
X
xige-16 已提交
485 486

	status := &commonpb.Status{
487
		ErrorCode: commonpb.ErrorCode_Success,
X
xige-16 已提交
488
	}
489 490 491
	hasCollection := node.replica.hasCollection(collectionID)
	hasPartition := node.replica.hasPartition(partitionID)
	if !hasCollection {
Z
zhenshan.cao 已提交
492
		// loading init
493 494
		err := node.replica.addCollection(collectionID, schema)
		if err != nil {
495
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
496
			status.Reason = err.Error()
497 498
			return status, err
		}
Z
zhenshan.cao 已提交
499
		node.replica.initExcludedSegments(collectionID)
500 501 502 503 504 505 506 507 508 509
		newDS := newDataSyncService(node.queryNodeLoopCtx, node.replica, node.msFactory, collectionID)
		// ignore duplicated dataSyncService error
		node.addDataSyncService(collectionID, newDS)
		ds, err := node.getDataSyncService(collectionID)
		if err != nil {
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
			status.Reason = err.Error()
			return status, err
		}
		go ds.start()
B
bigsheeper 已提交
510
		node.searchService.startSearchCollection(collectionID)
511 512 513 514
	}
	if !hasPartition {
		err := node.replica.addPartition(collectionID, partitionID)
		if err != nil {
515
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
516
			status.Reason = err.Error()
517 518 519
			return status, err
		}
	}
520
	err := node.replica.enablePartition(partitionID)
C
cai.zhang 已提交
521
	if err != nil {
522
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
523 524 525 526 527 528 529 530
		status.Reason = err.Error()
		return status, err
	}

	if len(segmentIDs) == 0 {
		return status, nil
	}

Z
zhenshan.cao 已提交
531
	err = node.loadService.loadSegmentPassively(collectionID, partitionID, segmentIDs, fieldIDs)
X
xige-16 已提交
532
	if err != nil {
533
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
X
xige-16 已提交
534 535 536
		status.Reason = err.Error()
		return status, err
	}
Z
zhenshan.cao 已提交
537 538

	log.Debug("LoadSegments done", zap.String("segmentIDs", fmt.Sprintln(in.SegmentIDs)))
X
xige-16 已提交
539
	return status, nil
C
cai.zhang 已提交
540 541
}

G
godchen 已提交
542
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {
543 544 545 546
	ds, err := node.getDataSyncService(in.CollectionID)
	if err == nil && ds != nil {
		ds.close()
		node.removeDataSyncService(in.CollectionID)
Z
zhenshan.cao 已提交
547 548 549 550
		node.replica.removeTSafe(in.CollectionID)
		node.replica.removeExcludedSegments(in.CollectionID)
	}

B
bigsheeper 已提交
551 552 553 554
	if node.searchService.hasSearchCollection(in.CollectionID) {
		node.searchService.stopSearchCollection(in.CollectionID)
	}

555
	err = node.replica.removeCollection(in.CollectionID)
B
bigsheeper 已提交
556 557
	if err != nil {
		status := &commonpb.Status{
558
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
B
bigsheeper 已提交
559 560 561 562 563
			Reason:    err.Error(),
		}
		return status, err
	}

Z
zhenshan.cao 已提交
564
	log.Debug("ReleaseCollection done", zap.Int64("collectionID", in.CollectionID))
B
bigsheeper 已提交
565
	return &commonpb.Status{
566
		ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
567 568 569
	}, nil
}

G
godchen 已提交
570
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.ReleasePartitionsRequest) (*commonpb.Status, error) {
B
bigsheeper 已提交
571
	status := &commonpb.Status{
572
		ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
573
	}
C
cai.zhang 已提交
574
	for _, id := range in.PartitionIDs {
B
bigsheeper 已提交
575
		err := node.loadService.segLoader.replica.removePartition(id)
C
cai.zhang 已提交
576
		if err != nil {
B
bigsheeper 已提交
577
			// not return, try to release all partitions
578
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
B
bigsheeper 已提交
579
			status.Reason = err.Error()
C
cai.zhang 已提交
580 581
		}
	}
B
bigsheeper 已提交
582 583
	return status, nil
}
C
cai.zhang 已提交
584

G
godchen 已提交
585
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
B
bigsheeper 已提交
586
	status := &commonpb.Status{
587
		ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
588
	}
C
cai.zhang 已提交
589
	for _, id := range in.SegmentIDs {
B
bigsheeper 已提交
590 591 592
		err2 := node.loadService.segLoader.replica.removeSegment(id)
		if err2 != nil {
			// not return, try to release all segments
593
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
B
bigsheeper 已提交
594
			status.Reason = err2.Error()
X
XuanYang-cn 已提交
595 596
		}
	}
B
bigsheeper 已提交
597
	return status, nil
X
XuanYang-cn 已提交
598
}
B
bigsheeper 已提交
599

G
godchen 已提交
600
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmentInfoRequest) (*queryPb.GetSegmentInfoResponse, error) {
B
bigsheeper 已提交
601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
	infos := make([]*queryPb.SegmentInfo, 0)
	for _, id := range in.SegmentIDs {
		segment, err := node.replica.getSegmentByID(id)
		if err != nil {
			continue
		}
		info := &queryPb.SegmentInfo{
			SegmentID:    segment.ID(),
			CollectionID: segment.collectionID,
			PartitionID:  segment.partitionID,
			MemSize:      segment.getMemSize(),
			NumRows:      segment.getRowCount(),
			IndexName:    segment.getIndexName(),
			IndexID:      segment.getIndexID(),
		}
		infos = append(infos, info)
	}
G
godchen 已提交
618
	return &queryPb.GetSegmentInfoResponse{
B
bigsheeper 已提交
619
		Status: &commonpb.Status{
620
			ErrorCode: commonpb.ErrorCode_Success,
B
bigsheeper 已提交
621 622 623 624
		},
		Infos: infos,
	}, nil
}