query_node.go 13.5 KB
Newer Older
1
package querynode
B
bigsheeper 已提交
2

3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
8

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12

*/
B
bigsheeper 已提交
13
import "C"
14

B
bigsheeper 已提交
15
import (
16
	"context"
X
XuanYang-cn 已提交
17
	"errors"
18 19
	"fmt"
	"io"
20
	"log"
C
cai.zhang 已提交
21
	"sync/atomic"
22 23 24

	"github.com/opentracing/opentracing-go"
	"github.com/uber/jaeger-client-go/config"
C
cai.zhang 已提交
25

X
Xiangyu Wang 已提交
26 27
	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
	"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
28
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
B
bigsheeper 已提交
29
	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
C
cai.zhang 已提交
30
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
31
	queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
32
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
B
bigsheeper 已提交
33 34
)

35 36 37 38 39 40 41 42 43 44
type Node interface {
	typeutil.Component

	AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error)
	RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error)
	WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error)
	LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error)
	ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error)
}

X
xige-16 已提交
45
type QueryService = typeutil.QueryServiceInterface
46

B
bigsheeper 已提交
47
type QueryNode struct {
48 49
	typeutil.Service

X
XuanYang-cn 已提交
50
	queryNodeLoopCtx    context.Context
51
	queryNodeLoopCancel context.CancelFunc
52

B
bigsheeper 已提交
53
	QueryNodeID uint64
C
cai.zhang 已提交
54
	stateCode   atomic.Value
B
bigsheeper 已提交
55

X
XuanYang-cn 已提交
56
	replica collectionReplica
B
bigsheeper 已提交
57

58
	// internal services
59 60 61 62 63
	dataSyncService  *dataSyncService
	metaService      *metaService
	searchService    *searchService
	loadIndexService *loadIndexService
	statsService     *statsService
64

65 66
	segManager *segmentManager

67 68 69
	//opentracing
	tracer opentracing.Tracer
	closer io.Closer
70 71

	// clients
B
bigsheeper 已提交
72 73 74 75
	masterClient MasterServiceInterface
	queryClient  QueryServiceInterface
	indexClient  IndexServiceInterface
	dataClient   DataServiceInterface
B
bigsheeper 已提交
76
}
77

78
func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
X
XuanYang-cn 已提交
79
	ctx1, cancel := context.WithCancel(ctx)
C
cai.zhang 已提交
80
	node := &QueryNode{
81 82 83 84 85 86 87 88
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,
		QueryNodeID:         queryNodeID,

		dataSyncService: nil,
		metaService:     nil,
		searchService:   nil,
		statsService:    nil,
X
XuanYang-cn 已提交
89
		segManager:      nil,
90 91 92 93 94 95 96 97 98 99
	}

	var err error
	cfg := &config.Configuration{
		ServiceName: "query_node",
		Sampler: &config.SamplerConfig{
			Type:  "const",
			Param: 1,
		},
	}
C
cai.zhang 已提交
100
	node.tracer, node.closer, err = cfg.NewTracer()
101 102 103
	if err != nil {
		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
	}
C
cai.zhang 已提交
104
	opentracing.SetGlobalTracer(node.tracer)
X
XuanYang-cn 已提交
105

B
bigsheeper 已提交
106
	segmentsMap := make(map[int64]*Segment)
107
	collections := make([]*Collection, 0)
B
bigsheeper 已提交
108

109 110
	tSafe := newTSafe()

C
cai.zhang 已提交
111
	node.replica = &collectionReplicaImpl{
G
godchen 已提交
112 113
		collections: collections,
		segments:    segmentsMap,
114 115

		tSafe: tSafe,
G
godchen 已提交
116
	}
C
cai.zhang 已提交
117 118 119
	node.stateCode.Store(internalpb2.StateCode_INITIALIZING)
	return node
}
G
godchen 已提交
120

121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
func NewQueryNodeWithoutID(ctx context.Context) *QueryNode {
	ctx1, cancel := context.WithCancel(ctx)
	node := &QueryNode{
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,

		dataSyncService: nil,
		metaService:     nil,
		searchService:   nil,
		statsService:    nil,
		segManager:      nil,
	}

	var err error
	cfg := &config.Configuration{
		ServiceName: "query_node",
		Sampler: &config.SamplerConfig{
			Type:  "const",
			Param: 1,
		},
	}
	node.tracer, node.closer, err = cfg.NewTracer()
	if err != nil {
		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
	}
	opentracing.SetGlobalTracer(node.tracer)

	segmentsMap := make(map[int64]*Segment)
	collections := make([]*Collection, 0)

	tSafe := newTSafe()

	node.replica = &collectionReplicaImpl{
		collections: collections,
		segments:    segmentsMap,

		tSafe: tSafe,
	}
	node.stateCode.Store(internalpb2.StateCode_INITIALIZING)
	return node
}

C
cai.zhang 已提交
163 164 165
// TODO: delete this and call node.Init()
func Init() {
	Params.Init()
B
bigsheeper 已提交
166 167
}

N
neza2017 已提交
168
func (node *QueryNode) Init() error {
169 170 171 172 173
	Params.Init()
	return nil
}

func (node *QueryNode) Start() error {
X
xige-16 已提交
174
	registerReq := &queryPb.RegisterNodeRequest{
C
cai.zhang 已提交
175 176 177 178 179
		Address: &commonpb.Address{
			Ip:   Params.QueryNodeIP,
			Port: Params.QueryNodePort,
		},
	}
180 181

	response, err := node.queryClient.RegisterNode(registerReq)
C
cai.zhang 已提交
182 183 184 185 186 187 188
	if err != nil {
		panic(err)
	}
	if response.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
		panic(response.Status.Reason)
	}

189 190
	Params.QueryNodeID = response.InitParams.NodeID
	fmt.Println("QueryNodeID is", Params.QueryNodeID)
C
cai.zhang 已提交
191

192 193 194 195 196 197 198 199
	if node.indexClient == nil {
		log.Println("WARN: null index service detected")
	}

	if node.dataClient == nil {
		log.Println("WARN: null data service detected")
	}

X
XuanYang-cn 已提交
200
	// todo add connectMaster logic
X
XuanYang-cn 已提交
201
	// init services and manager
X
XuanYang-cn 已提交
202 203 204
	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
	node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
205 206
	node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica)
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan)
B
bigsheeper 已提交
207
	node.segManager = newSegmentManager(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream, node.loadIndexService.loadIndexReqChan)
B
bigsheeper 已提交
208

X
XuanYang-cn 已提交
209
	// start services
210
	go node.dataSyncService.start()
N
neza2017 已提交
211
	go node.searchService.start()
B
bigsheeper 已提交
212
	go node.metaService.start()
213
	go node.loadIndexService.start()
X
XuanYang-cn 已提交
214
	go node.statsService.start()
215

C
cai.zhang 已提交
216
	node.stateCode.Store(internalpb2.StateCode_HEALTHY)
217
	<-node.queryNodeLoopCtx.Done()
N
neza2017 已提交
218
	return nil
B
bigsheeper 已提交
219
}
B
bigsheeper 已提交
220

N
neza2017 已提交
221
func (node *QueryNode) Stop() error {
C
cai.zhang 已提交
222
	node.stateCode.Store(internalpb2.StateCode_ABNORMAL)
X
XuanYang-cn 已提交
223 224
	node.queryNodeLoopCancel()

B
bigsheeper 已提交
225
	// free collectionReplica
X
XuanYang-cn 已提交
226
	node.replica.freeAll()
B
bigsheeper 已提交
227 228 229

	// close services
	if node.dataSyncService != nil {
X
XuanYang-cn 已提交
230
		node.dataSyncService.close()
B
bigsheeper 已提交
231 232
	}
	if node.searchService != nil {
X
XuanYang-cn 已提交
233
		node.searchService.close()
B
bigsheeper 已提交
234
	}
B
bigsheeper 已提交
235 236 237
	if node.loadIndexService != nil {
		node.loadIndexService.close()
	}
B
bigsheeper 已提交
238
	if node.statsService != nil {
X
XuanYang-cn 已提交
239
		node.statsService.close()
B
bigsheeper 已提交
240
	}
241 242 243
	if node.closer != nil {
		node.closer.Close()
	}
N
neza2017 已提交
244
	return nil
X
XuanYang-cn 已提交
245 246
}

B
bigsheeper 已提交
247 248 249 250 251 252 253 254
func (node *QueryNode) SetMasterService(master MasterServiceInterface) error {
	if master == nil {
		return errors.New("null master service interface")
	}
	node.masterClient = master
	return nil
}

255 256
func (node *QueryNode) SetQueryService(query QueryServiceInterface) error {
	if query == nil {
B
bigsheeper 已提交
257
		return errors.New("null query service interface")
258 259 260 261 262
	}
	node.queryClient = query
	return nil
}

263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
func (node *QueryNode) SetIndexService(index IndexServiceInterface) error {
	if index == nil {
		return errors.New("null index service interface")
	}
	node.indexClient = index
	return nil
}

func (node *QueryNode) SetDataService(data DataServiceInterface) error {
	if data == nil {
		return errors.New("null data service interface")
	}
	node.dataClient = data
	return nil
}

C
cai.zhang 已提交
279 280 281 282 283 284 285
func (node *QueryNode) GetComponentStates() (*internalpb2.ComponentStates, error) {
	code, ok := node.stateCode.Load().(internalpb2.StateCode)
	if !ok {
		return nil, errors.New("unexpected error in type assertion")
	}
	info := &internalpb2.ComponentInfo{
		NodeID:    Params.QueryNodeID,
X
XuanYang-cn 已提交
286
		Role:      typeutil.QueryNodeRole,
C
cai.zhang 已提交
287 288 289 290 291 292 293 294 295
		StateCode: code,
	}
	stats := &internalpb2.ComponentStates{
		State: info,
	}
	return stats, nil
}

func (node *QueryNode) GetTimeTickChannel() (string, error) {
N
neza2017 已提交
296
	return Params.QueryTimeTickChannelName, nil
C
cai.zhang 已提交
297 298 299 300 301 302
}

func (node *QueryNode) GetStatisticsChannel() (string, error) {
	return Params.StatsChannelName, nil
}

X
XuanYang-cn 已提交
303 304 305 306 307 308 309 310 311 312 313
func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) {
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
314
	searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
315 316 317 318 319 320 321 322 323 324
	if !ok {
		errMsg := "type assertion failed for search message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
325
	resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
326 327 328 329 330 331 332 333 334 335 336 337 338 339
	if !ok {
		errMsg := "type assertion failed for search result message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// add request channel
	pulsarBufSize := Params.SearchPulsarBufSize
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
X
Xiangyu Wang 已提交
340
	unmarshalDispatcher := util.NewUnmarshalDispatcher()
X
XuanYang-cn 已提交
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
	searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)

	// add result channel
	producerChannels := []string{in.ResultChannelID}
	resultStream.CreatePulsarProducers(producerChannels)

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
	return status, nil
}

func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) {
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search result message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
364
	searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
365 366 367 368 369 370 371 372 373 374
	if !ok {
		errMsg := "type assertion failed for search message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
375
	resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
376 377 378 379 380 381 382 383 384 385 386 387 388 389
	if !ok {
		errMsg := "type assertion failed for search result message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// remove request channel
	pulsarBufSize := Params.SearchPulsarBufSize
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
X
Xiangyu Wang 已提交
390
	unmarshalDispatcher := util.NewUnmarshalDispatcher()
X
XuanYang-cn 已提交
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
	// TODO: searchStream.RemovePulsarConsumers(producerChannels)
	searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)

	// remove result channel
	producerChannels := []string{in.ResultChannelID}
	// TODO: resultStream.RemovePulsarProducer(producerChannels)
	resultStream.CreatePulsarProducers(producerChannels)

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
	return status, nil
}

func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
	if node.dataSyncService == nil || node.dataSyncService.dmStream == nil {
		errMsg := "null data sync service or null data manipulation stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
416
	fgDMMsgStream, ok := node.dataSyncService.dmStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
417 418 419 420 421 422 423 424 425 426 427 428 429 430
	if !ok {
		errMsg := "type assertion failed for dm message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// add request channel
	pulsarBufSize := Params.SearchPulsarBufSize
	consumeChannels := in.ChannelIDs
	consumeSubName := Params.MsgChannelSubName
X
Xiangyu Wang 已提交
431
	unmarshalDispatcher := util.NewUnmarshalDispatcher()
X
XuanYang-cn 已提交
432 433 434 435 436 437 438 439 440 441
	fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
	return status, nil
}

func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) {
	// TODO: support db
Z
zhenshan.cao 已提交
442
	collectionID := in.CollectionID
C
cai.zhang 已提交
443 444
	partitionID := in.PartitionID
	segmentIDs := in.SegmentIDs
X
XuanYang-cn 已提交
445
	fieldIDs := in.FieldIDs
446

C
cai.zhang 已提交
447 448 449 450 451 452 453 454 455
	err := node.replica.enablePartitionDM(collectionID, partitionID)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    err.Error(),
		}
		return status, err
	}

456 457 458
	// segments are ordered before LoadSegments calling
	if in.LastSegmentState.State == datapb.SegmentState_SegmentGrowing {
		segmentNum := len(segmentIDs)
C
cai.zhang 已提交
459 460 461 462 463 464 465 466 467
		positions := in.LastSegmentState.StartPositions
		err = node.segManager.seekSegment(positions)
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			}
			return status, err
		}
468 469 470
		segmentIDs = segmentIDs[:segmentNum-1]
	}

C
cai.zhang 已提交
471
	err = node.segManager.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs)
C
cai.zhang 已提交
472 473 474 475
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    err.Error(),
Z
zhenshan.cao 已提交
476
		}
C
cai.zhang 已提交
477
		return status, err
Z
zhenshan.cao 已提交
478
	}
C
cai.zhang 已提交
479 480 481 482
	return nil, nil
}

func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
C
cai.zhang 已提交
483 484 485 486 487 488 489 490 491 492 493
	for _, id := range in.PartitionIDs {
		err := node.replica.enablePartitionDM(in.CollectionID, id)
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			}
			return status, err
		}
	}

C
cai.zhang 已提交
494 495 496
	// release all fields in the segments
	for _, id := range in.SegmentIDs {
		err := node.segManager.releaseSegment(id)
X
XuanYang-cn 已提交
497 498 499 500 501 502 503 504 505 506
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			}
			return status, err
		}
	}
	return nil, nil
}