query_node.go 15.1 KB
Newer Older
1
package querynode
B
bigsheeper 已提交
2

3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
8

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12

*/
B
bigsheeper 已提交
13
import "C"
14

B
bigsheeper 已提交
15
import (
16
	"context"
17
	"fmt"
18 19
	"github.com/opentracing/opentracing-go"
	"github.com/uber/jaeger-client-go/config"
20
	"io"
21
	"log"
C
cai.zhang 已提交
22
	"sync/atomic"
23

24
	"github.com/zilliztech/milvus-distributed/internal/errors"
G
groot 已提交
25
	"github.com/zilliztech/milvus-distributed/internal/msgstream"
X
Xiangyu Wang 已提交
26
	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
G
groot 已提交
27
	"github.com/zilliztech/milvus-distributed/internal/msgstream/rmqms"
28
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
C
cai.zhang 已提交
29
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
30
	queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
31
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
B
bigsheeper 已提交
32 33
)

34 35 36 37 38 39 40
type Node interface {
	typeutil.Component

	AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error)
	RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error)
	WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error)
	LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error)
B
bigsheeper 已提交
41 42
	ReleaseCollection(in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error)
	ReleasePartitions(in *queryPb.ReleasePartitionRequest) (*commonpb.Status, error)
43
	ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error)
B
bigsheeper 已提交
44
	GetSegmentInfo(in *queryPb.SegmentInfoRequest) (*queryPb.SegmentInfoResponse, error)
45 46
}

X
xige-16 已提交
47
type QueryService = typeutil.QueryServiceInterface
48

B
bigsheeper 已提交
49
type QueryNode struct {
50 51
	typeutil.Service

X
XuanYang-cn 已提交
52
	queryNodeLoopCtx    context.Context
53
	queryNodeLoopCancel context.CancelFunc
54

55
	QueryNodeID UniqueID
C
cai.zhang 已提交
56
	stateCode   atomic.Value
B
bigsheeper 已提交
57

X
XuanYang-cn 已提交
58
	replica collectionReplica
B
bigsheeper 已提交
59

60
	// internal services
61 62 63 64 65
	dataSyncService *dataSyncService
	metaService     *metaService
	searchService   *searchService
	loadService     *loadService
	statsService    *statsService
66

67 68
	//opentracing
	closer io.Closer
69 70

	// clients
B
bigsheeper 已提交
71 72 73 74
	masterClient MasterServiceInterface
	queryClient  QueryServiceInterface
	indexClient  IndexServiceInterface
	dataClient   DataServiceInterface
G
groot 已提交
75 76

	msFactory msgstream.Factory
B
bigsheeper 已提交
77
}
78

79
func NewQueryNode(ctx context.Context, queryNodeID UniqueID, factory msgstream.Factory) *QueryNode {
X
XuanYang-cn 已提交
80
	ctx1, cancel := context.WithCancel(ctx)
C
cai.zhang 已提交
81
	node := &QueryNode{
82 83 84 85 86 87 88 89
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,
		QueryNodeID:         queryNodeID,

		dataSyncService: nil,
		metaService:     nil,
		searchService:   nil,
		statsService:    nil,
G
groot 已提交
90 91

		msFactory: factory,
92 93
	}

B
bigsheeper 已提交
94
	node.replica = newCollectionReplicaImpl()
95
	node.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
C
cai.zhang 已提交
96 97
	return node
}
G
godchen 已提交
98

G
groot 已提交
99
func NewQueryNodeWithoutID(ctx context.Context, factory msgstream.Factory) *QueryNode {
100 101 102 103 104 105 106 107 108
	ctx1, cancel := context.WithCancel(ctx)
	node := &QueryNode{
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,

		dataSyncService: nil,
		metaService:     nil,
		searchService:   nil,
		statsService:    nil,
G
groot 已提交
109 110

		msFactory: factory,
111 112
	}

B
bigsheeper 已提交
113
	node.replica = newCollectionReplicaImpl()
114
	node.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
115

116
	return node
B
bigsheeper 已提交
117 118
}

N
neza2017 已提交
119
func (node *QueryNode) Init() error {
X
xige-16 已提交
120
	registerReq := &queryPb.RegisterNodeRequest{
121 122 123 124
		Base: &commonpb.MsgBase{
			MsgType:  commonpb.MsgType_kNone,
			SourceID: Params.QueryNodeID,
		},
C
cai.zhang 已提交
125 126 127 128 129
		Address: &commonpb.Address{
			Ip:   Params.QueryNodeIP,
			Port: Params.QueryNodePort,
		},
	}
130

131
	resp, err := node.queryClient.RegisterNode(registerReq)
C
cai.zhang 已提交
132 133 134
	if err != nil {
		panic(err)
	}
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
	if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
		panic(resp.Status.Reason)
	}

	for _, kv := range resp.InitParams.StartParams {
		switch kv.Key {
		case "StatsChannelName":
			Params.StatsChannelName = kv.Value
		case "TimeTickChannelName":
			Params.QueryTimeTickChannelName = kv.Value
		case "QueryChannelName":
			Params.SearchChannelNames = append(Params.SearchChannelNames, kv.Value)
		case "QueryResultChannelName":
			Params.SearchResultChannelNames = append(Params.SearchResultChannelNames, kv.Value)
		default:
			return errors.Errorf("Invalid key: %v", kv.Key)
		}
C
cai.zhang 已提交
152 153
	}

154
	fmt.Println("QueryNodeID is", Params.QueryNodeID)
C
cai.zhang 已提交
155

156 157 158 159 160 161 162 163 164 165 166 167 168 169
	cfg := &config.Configuration{
		ServiceName: fmt.Sprintf("query_node_%d", node.QueryNodeID),
		Sampler: &config.SamplerConfig{
			Type:  "const",
			Param: 1,
		},
	}
	tracer, closer, err := cfg.NewTracer()
	if err != nil {
		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
	}
	opentracing.SetGlobalTracer(tracer)
	node.closer = closer

170 171 172 173
	if node.masterClient == nil {
		log.Println("WARN: null master service detected")
	}

174 175 176 177 178 179 180 181
	if node.indexClient == nil {
		log.Println("WARN: null index service detected")
	}

	if node.dataClient == nil {
		log.Println("WARN: null data service detected")
	}

182 183 184 185
	return nil
}

func (node *QueryNode) Start() error {
G
groot 已提交
186 187 188 189 190 191 192 193 194 195
	var err error
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = node.msFactory.SetParams(m)
	if err != nil {
		return err
	}

X
XuanYang-cn 已提交
196
	// init services and manager
G
groot 已提交
197 198
	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica, node.msFactory)
	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica, node.msFactory)
B
bigsheeper 已提交
199
	//node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
G
groot 已提交
200

201
	node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream)
G
groot 已提交
202
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan, node.msFactory)
B
bigsheeper 已提交
203

X
XuanYang-cn 已提交
204
	// start services
205
	go node.dataSyncService.start()
N
neza2017 已提交
206
	go node.searchService.start()
B
bigsheeper 已提交
207
	//go node.metaService.start()
208
	go node.loadService.start()
X
XuanYang-cn 已提交
209
	go node.statsService.start()
210
	node.UpdateStateCode(internalpb2.StateCode_HEALTHY)
N
neza2017 已提交
211
	return nil
B
bigsheeper 已提交
212
}
B
bigsheeper 已提交
213

N
neza2017 已提交
214
func (node *QueryNode) Stop() error {
215
	node.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
X
XuanYang-cn 已提交
216 217
	node.queryNodeLoopCancel()

B
bigsheeper 已提交
218
	// free collectionReplica
X
XuanYang-cn 已提交
219
	node.replica.freeAll()
B
bigsheeper 已提交
220 221 222

	// close services
	if node.dataSyncService != nil {
X
XuanYang-cn 已提交
223
		node.dataSyncService.close()
B
bigsheeper 已提交
224 225
	}
	if node.searchService != nil {
X
XuanYang-cn 已提交
226
		node.searchService.close()
B
bigsheeper 已提交
227
	}
228 229
	if node.loadService != nil {
		node.loadService.close()
B
bigsheeper 已提交
230
	}
B
bigsheeper 已提交
231
	if node.statsService != nil {
X
XuanYang-cn 已提交
232
		node.statsService.close()
B
bigsheeper 已提交
233
	}
234 235 236
	if node.closer != nil {
		node.closer.Close()
	}
N
neza2017 已提交
237
	return nil
X
XuanYang-cn 已提交
238 239
}

240 241 242 243
func (node *QueryNode) UpdateStateCode(code internalpb2.StateCode) {
	node.stateCode.Store(code)
}

B
bigsheeper 已提交
244 245 246 247 248 249 250 251
func (node *QueryNode) SetMasterService(master MasterServiceInterface) error {
	if master == nil {
		return errors.New("null master service interface")
	}
	node.masterClient = master
	return nil
}

252 253
func (node *QueryNode) SetQueryService(query QueryServiceInterface) error {
	if query == nil {
B
bigsheeper 已提交
254
		return errors.New("null query service interface")
255 256 257 258 259
	}
	node.queryClient = query
	return nil
}

260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
func (node *QueryNode) SetIndexService(index IndexServiceInterface) error {
	if index == nil {
		return errors.New("null index service interface")
	}
	node.indexClient = index
	return nil
}

func (node *QueryNode) SetDataService(data DataServiceInterface) error {
	if data == nil {
		return errors.New("null data service interface")
	}
	node.dataClient = data
	return nil
}

C
cai.zhang 已提交
276
func (node *QueryNode) GetComponentStates() (*internalpb2.ComponentStates, error) {
277 278 279 280 281
	stats := &internalpb2.ComponentStates{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
	}
C
cai.zhang 已提交
282 283
	code, ok := node.stateCode.Load().(internalpb2.StateCode)
	if !ok {
284 285 286 287 288 289
		errMsg := "unexpected error in type assertion"
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}
		return stats, errors.New(errMsg)
C
cai.zhang 已提交
290 291 292
	}
	info := &internalpb2.ComponentInfo{
		NodeID:    Params.QueryNodeID,
X
XuanYang-cn 已提交
293
		Role:      typeutil.QueryNodeRole,
C
cai.zhang 已提交
294 295
		StateCode: code,
	}
296
	stats.State = info
C
cai.zhang 已提交
297 298 299 300
	return stats, nil
}

func (node *QueryNode) GetTimeTickChannel() (string, error) {
N
neza2017 已提交
301
	return Params.QueryTimeTickChannelName, nil
C
cai.zhang 已提交
302 303 304 305 306 307
}

func (node *QueryNode) GetStatisticsChannel() (string, error) {
	return Params.StatsChannelName, nil
}

X
XuanYang-cn 已提交
308 309 310 311 312 313 314 315 316 317 318 319 320 321
func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) {
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// add request channel
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
X
xige-16 已提交
322
	node.searchService.searchMsgStream.AsConsumer(consumeChannels, consumeSubName)
X
XuanYang-cn 已提交
323 324 325

	// add result channel
	producerChannels := []string{in.ResultChannelID}
X
xige-16 已提交
326
	node.searchService.searchResultMsgStream.AsProducer(producerChannels)
X
XuanYang-cn 已提交
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
	return status, nil
}

func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) {
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search result message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
345
	searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
346 347 348 349 350 351 352 353 354 355
	if !ok {
		errMsg := "type assertion failed for search message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
356
	resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
357 358 359 360 361 362 363 364 365 366 367 368 369 370
	if !ok {
		errMsg := "type assertion failed for search result message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// remove request channel
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
	// TODO: searchStream.RemovePulsarConsumers(producerChannels)
Z
zhenshan.cao 已提交
371
	searchStream.AsConsumer(consumeChannels, consumeSubName)
X
XuanYang-cn 已提交
372 373 374 375

	// remove result channel
	producerChannels := []string{in.ResultChannelID}
	// TODO: resultStream.RemovePulsarProducer(producerChannels)
Z
zhenshan.cao 已提交
376
	resultStream.AsProducer(producerChannels)
X
XuanYang-cn 已提交
377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
	return status, nil
}

func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
	if node.dataSyncService == nil || node.dataSyncService.dmStream == nil {
		errMsg := "null data sync service or null data manipulation stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

G
groot 已提交
395 396 397 398 399
	switch t := node.dataSyncService.dmStream.(type) {
	case *pulsarms.PulsarTtMsgStream:
	case *rmqms.RmqTtMsgStream:
	default:
		_ = t
X
XuanYang-cn 已提交
400 401 402 403 404 405 406 407 408 409 410 411
		errMsg := "type assertion failed for dm message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// add request channel
	consumeChannels := in.ChannelIDs
	consumeSubName := Params.MsgChannelSubName
G
groot 已提交
412
	node.dataSyncService.dmStream.AsConsumer(consumeChannels, consumeSubName)
X
XuanYang-cn 已提交
413 414 415 416 417 418 419 420 421

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
	return status, nil
}

func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) {
	// TODO: support db
Z
zhenshan.cao 已提交
422
	collectionID := in.CollectionID
C
cai.zhang 已提交
423 424
	partitionID := in.PartitionID
	segmentIDs := in.SegmentIDs
X
XuanYang-cn 已提交
425
	fieldIDs := in.FieldIDs
426
	schema := in.Schema
427

428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449
	hasCollection := node.replica.hasCollection(collectionID)
	hasPartition := node.replica.hasPartition(partitionID)
	if !hasCollection {
		err := node.replica.addCollection(collectionID, schema)
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			}
			return status, err
		}
	}
	if !hasPartition {
		err := node.replica.addPartition(collectionID, partitionID)
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			}
			return status, err
		}
	}
450
	err := node.replica.enablePartition(partitionID)
C
cai.zhang 已提交
451 452 453 454 455 456 457 458
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    err.Error(),
		}
		return status, err
	}

459
	// segments are ordered before LoadSegments calling
460 461 462
	for i, state := range in.SegmentStates {
		if state.State == commonpb.SegmentState_SegmentGrowing {
			position := state.StartPosition
463
			err := node.loadService.segLoader.seekSegment(position)
464 465 466 467 468 469
			if err != nil {
				status := &commonpb.Status{
					ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
					Reason:    err.Error(),
				}
				return status, err
C
cai.zhang 已提交
470
			}
471 472
			segmentIDs = segmentIDs[:i]
			break
C
cai.zhang 已提交
473
		}
474 475
	}

476
	err = node.loadService.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs)
C
cai.zhang 已提交
477 478 479 480
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    err.Error(),
Z
zhenshan.cao 已提交
481
		}
C
cai.zhang 已提交
482
		return status, err
Z
zhenshan.cao 已提交
483
	}
484 485 486
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}, nil
C
cai.zhang 已提交
487 488
}

B
bigsheeper 已提交
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
func (node *QueryNode) ReleaseCollection(in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {
	err := node.replica.removeCollection(in.CollectionID)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    err.Error(),
		}
		return status, err
	}

	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}, nil
}

func (node *QueryNode) ReleasePartitions(in *queryPb.ReleasePartitionRequest) (*commonpb.Status, error) {
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
C
cai.zhang 已提交
508
	for _, id := range in.PartitionIDs {
B
bigsheeper 已提交
509
		err := node.loadService.segLoader.replica.removePartition(id)
C
cai.zhang 已提交
510
		if err != nil {
B
bigsheeper 已提交
511 512 513
			// not return, try to release all partitions
			status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
			status.Reason = err.Error()
C
cai.zhang 已提交
514 515
		}
	}
B
bigsheeper 已提交
516 517
	return status, nil
}
C
cai.zhang 已提交
518

B
bigsheeper 已提交
519 520 521 522
func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
C
cai.zhang 已提交
523
	for _, id := range in.SegmentIDs {
B
bigsheeper 已提交
524 525 526 527 528
		err2 := node.loadService.segLoader.replica.removeSegment(id)
		if err2 != nil {
			// not return, try to release all segments
			status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
			status.Reason = err2.Error()
X
XuanYang-cn 已提交
529 530
		}
	}
B
bigsheeper 已提交
531
	return status, nil
X
XuanYang-cn 已提交
532
}
B
bigsheeper 已提交
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558

func (node *QueryNode) GetSegmentInfo(in *queryPb.SegmentInfoRequest) (*queryPb.SegmentInfoResponse, error) {
	infos := make([]*queryPb.SegmentInfo, 0)
	for _, id := range in.SegmentIDs {
		segment, err := node.replica.getSegmentByID(id)
		if err != nil {
			continue
		}
		info := &queryPb.SegmentInfo{
			SegmentID:    segment.ID(),
			CollectionID: segment.collectionID,
			PartitionID:  segment.partitionID,
			MemSize:      segment.getMemSize(),
			NumRows:      segment.getRowCount(),
			IndexName:    segment.getIndexName(),
			IndexID:      segment.getIndexID(),
		}
		infos = append(infos, info)
	}
	return &queryPb.SegmentInfoResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		Infos: infos,
	}, nil
}