query_node.go 11.0 KB
Newer Older
1
package querynode
B
bigsheeper 已提交
2

3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
8

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12

*/
B
bigsheeper 已提交
13
import "C"
14

B
bigsheeper 已提交
15
import (
16
	"context"
X
XuanYang-cn 已提交
17
	"errors"
18
	"fmt"
19
	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
20
	"io"
C
cai.zhang 已提交
21
	"sync/atomic"
22 23 24

	"github.com/opentracing/opentracing-go"
	"github.com/uber/jaeger-client-go/config"
C
cai.zhang 已提交
25

X
Xiangyu Wang 已提交
26 27
	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
	"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
28
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
C
cai.zhang 已提交
29
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
30
	queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
31
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
B
bigsheeper 已提交
32 33
)

X
xige-16 已提交
34 35
type Node = typeutil.QueryNodeInterface
type QueryService = typeutil.QueryServiceInterface
36

B
bigsheeper 已提交
37
type QueryNode struct {
38 39
	typeutil.Service

X
XuanYang-cn 已提交
40
	queryNodeLoopCtx    context.Context
41
	queryNodeLoopCancel context.CancelFunc
42

B
bigsheeper 已提交
43
	QueryNodeID uint64
C
cai.zhang 已提交
44
	stateCode   atomic.Value
B
bigsheeper 已提交
45

X
XuanYang-cn 已提交
46
	replica collectionReplica
B
bigsheeper 已提交
47

48
	// internal services
49 50 51 52 53
	dataSyncService  *dataSyncService
	metaService      *metaService
	searchService    *searchService
	loadIndexService *loadIndexService
	statsService     *statsService
54

55 56
	segManager *segmentManager

57 58 59
	//opentracing
	tracer opentracing.Tracer
	closer io.Closer
B
bigsheeper 已提交
60
}
61

62
func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
X
XuanYang-cn 已提交
63
	ctx1, cancel := context.WithCancel(ctx)
C
cai.zhang 已提交
64
	node := &QueryNode{
65 66 67 68 69 70 71 72
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,
		QueryNodeID:         queryNodeID,

		dataSyncService: nil,
		metaService:     nil,
		searchService:   nil,
		statsService:    nil,
X
XuanYang-cn 已提交
73
		segManager:      nil,
74 75 76 77 78 79 80 81 82 83
	}

	var err error
	cfg := &config.Configuration{
		ServiceName: "query_node",
		Sampler: &config.SamplerConfig{
			Type:  "const",
			Param: 1,
		},
	}
C
cai.zhang 已提交
84
	node.tracer, node.closer, err = cfg.NewTracer()
85 86 87
	if err != nil {
		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
	}
C
cai.zhang 已提交
88
	opentracing.SetGlobalTracer(node.tracer)
X
XuanYang-cn 已提交
89

B
bigsheeper 已提交
90
	segmentsMap := make(map[int64]*Segment)
91
	collections := make([]*Collection, 0)
B
bigsheeper 已提交
92

93 94
	tSafe := newTSafe()

C
cai.zhang 已提交
95
	node.replica = &collectionReplicaImpl{
G
godchen 已提交
96 97
		collections: collections,
		segments:    segmentsMap,
98 99

		tSafe: tSafe,
G
godchen 已提交
100
	}
C
cai.zhang 已提交
101 102 103
	node.stateCode.Store(internalpb2.StateCode_INITIALIZING)
	return node
}
G
godchen 已提交
104

C
cai.zhang 已提交
105 106 107
// TODO: delete this and call node.Init()
func Init() {
	Params.Init()
B
bigsheeper 已提交
108 109
}

N
neza2017 已提交
110
func (node *QueryNode) Init() error {
X
xige-16 已提交
111
	registerReq := &queryPb.RegisterNodeRequest{
C
cai.zhang 已提交
112 113 114 115 116
		Address: &commonpb.Address{
			Ip:   Params.QueryNodeIP,
			Port: Params.QueryNodePort,
		},
	}
X
xige-16 已提交
117
	var client QueryService // TODO: init interface
C
cai.zhang 已提交
118 119 120 121 122 123 124 125 126 127
	response, err := client.RegisterNode(registerReq)
	if err != nil {
		panic(err)
	}
	if response.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
		panic(response.Status.Reason)
	}
	// TODO: use response.initParams

	Params.Init()
N
neza2017 已提交
128
	return nil
C
cai.zhang 已提交
129 130
}

N
neza2017 已提交
131
func (node *QueryNode) Start() error {
X
XuanYang-cn 已提交
132
	// todo add connectMaster logic
X
XuanYang-cn 已提交
133
	// init services and manager
X
XuanYang-cn 已提交
134 135 136
	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
	node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
137 138
	node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica)
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan)
C
cai.zhang 已提交
139
	node.segManager = newSegmentManager(node.queryNodeLoopCtx, node.replica, node.dataSyncService.dmStream, node.loadIndexService.loadIndexReqChan)
B
bigsheeper 已提交
140

X
XuanYang-cn 已提交
141
	// start services
142
	go node.dataSyncService.start()
N
neza2017 已提交
143
	go node.searchService.start()
B
bigsheeper 已提交
144
	go node.metaService.start()
145
	go node.loadIndexService.start()
X
XuanYang-cn 已提交
146
	go node.statsService.start()
147

C
cai.zhang 已提交
148
	node.stateCode.Store(internalpb2.StateCode_HEALTHY)
149
	<-node.queryNodeLoopCtx.Done()
N
neza2017 已提交
150
	return nil
B
bigsheeper 已提交
151
}
B
bigsheeper 已提交
152

N
neza2017 已提交
153
func (node *QueryNode) Stop() error {
C
cai.zhang 已提交
154
	node.stateCode.Store(internalpb2.StateCode_ABNORMAL)
X
XuanYang-cn 已提交
155 156
	node.queryNodeLoopCancel()

B
bigsheeper 已提交
157
	// free collectionReplica
X
XuanYang-cn 已提交
158
	node.replica.freeAll()
B
bigsheeper 已提交
159 160 161

	// close services
	if node.dataSyncService != nil {
X
XuanYang-cn 已提交
162
		node.dataSyncService.close()
B
bigsheeper 已提交
163 164
	}
	if node.searchService != nil {
X
XuanYang-cn 已提交
165
		node.searchService.close()
B
bigsheeper 已提交
166
	}
B
bigsheeper 已提交
167 168 169
	if node.loadIndexService != nil {
		node.loadIndexService.close()
	}
B
bigsheeper 已提交
170
	if node.statsService != nil {
X
XuanYang-cn 已提交
171
		node.statsService.close()
B
bigsheeper 已提交
172
	}
173 174 175
	if node.closer != nil {
		node.closer.Close()
	}
N
neza2017 已提交
176
	return nil
X
XuanYang-cn 已提交
177 178
}

C
cai.zhang 已提交
179 180 181 182 183 184 185
func (node *QueryNode) GetComponentStates() (*internalpb2.ComponentStates, error) {
	code, ok := node.stateCode.Load().(internalpb2.StateCode)
	if !ok {
		return nil, errors.New("unexpected error in type assertion")
	}
	info := &internalpb2.ComponentInfo{
		NodeID:    Params.QueryNodeID,
X
XuanYang-cn 已提交
186
		Role:      typeutil.QueryNodeRole,
C
cai.zhang 已提交
187 188 189 190 191 192 193 194 195
		StateCode: code,
	}
	stats := &internalpb2.ComponentStates{
		State: info,
	}
	return stats, nil
}

func (node *QueryNode) GetTimeTickChannel() (string, error) {
N
neza2017 已提交
196
	return Params.QueryTimeTickChannelName, nil
C
cai.zhang 已提交
197 198 199 200 201 202
}

func (node *QueryNode) GetStatisticsChannel() (string, error) {
	return Params.StatsChannelName, nil
}

X
XuanYang-cn 已提交
203 204 205 206 207 208 209 210 211 212 213
func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) {
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
214
	searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
215 216 217 218 219 220 221 222 223 224
	if !ok {
		errMsg := "type assertion failed for search message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
225
	resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239
	if !ok {
		errMsg := "type assertion failed for search result message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// add request channel
	pulsarBufSize := Params.SearchPulsarBufSize
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
X
Xiangyu Wang 已提交
240
	unmarshalDispatcher := util.NewUnmarshalDispatcher()
X
XuanYang-cn 已提交
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
	searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)

	// add result channel
	producerChannels := []string{in.ResultChannelID}
	resultStream.CreatePulsarProducers(producerChannels)

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
	return status, nil
}

func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) {
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search result message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
264
	searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
265 266 267 268 269 270 271 272 273 274
	if !ok {
		errMsg := "type assertion failed for search message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
275
	resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
276 277 278 279 280 281 282 283 284 285 286 287 288 289
	if !ok {
		errMsg := "type assertion failed for search result message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// remove request channel
	pulsarBufSize := Params.SearchPulsarBufSize
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
X
Xiangyu Wang 已提交
290
	unmarshalDispatcher := util.NewUnmarshalDispatcher()
X
XuanYang-cn 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
	// TODO: searchStream.RemovePulsarConsumers(producerChannels)
	searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)

	// remove result channel
	producerChannels := []string{in.ResultChannelID}
	// TODO: resultStream.RemovePulsarProducer(producerChannels)
	resultStream.CreatePulsarProducers(producerChannels)

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
	return status, nil
}

func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
	if node.dataSyncService == nil || node.dataSyncService.dmStream == nil {
		errMsg := "null data sync service or null data manipulation stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
316
	fgDMMsgStream, ok := node.dataSyncService.dmStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
317 318 319 320 321 322 323 324 325 326 327 328 329 330
	if !ok {
		errMsg := "type assertion failed for dm message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// add request channel
	pulsarBufSize := Params.SearchPulsarBufSize
	consumeChannels := in.ChannelIDs
	consumeSubName := Params.MsgChannelSubName
X
Xiangyu Wang 已提交
331
	unmarshalDispatcher := util.NewUnmarshalDispatcher()
X
XuanYang-cn 已提交
332 333 334 335 336 337 338 339 340 341
	fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
	return status, nil
}

func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) {
	// TODO: support db
Z
zhenshan.cao 已提交
342
	collectionID := in.CollectionID
C
cai.zhang 已提交
343 344
	partitionID := in.PartitionID
	segmentIDs := in.SegmentIDs
X
XuanYang-cn 已提交
345
	fieldIDs := in.FieldIDs
346

C
cai.zhang 已提交
347 348 349 350 351 352 353 354 355
	err := node.replica.enablePartitionDM(collectionID, partitionID)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    err.Error(),
		}
		return status, err
	}

356 357 358
	// segments are ordered before LoadSegments calling
	if in.LastSegmentState.State == datapb.SegmentState_SegmentGrowing {
		segmentNum := len(segmentIDs)
C
cai.zhang 已提交
359 360 361 362 363 364 365 366 367
		positions := in.LastSegmentState.StartPositions
		err = node.segManager.seekSegment(positions)
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			}
			return status, err
		}
368 369 370
		segmentIDs = segmentIDs[:segmentNum-1]
	}

C
cai.zhang 已提交
371
	err = node.segManager.loadSegment(collectionID, partitionID, segmentIDs, fieldIDs)
C
cai.zhang 已提交
372 373 374 375
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    err.Error(),
Z
zhenshan.cao 已提交
376
		}
C
cai.zhang 已提交
377
		return status, err
Z
zhenshan.cao 已提交
378
	}
C
cai.zhang 已提交
379 380 381 382
	return nil, nil
}

func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
C
cai.zhang 已提交
383 384 385 386 387 388 389 390 391 392 393
	for _, id := range in.PartitionIDs {
		err := node.replica.enablePartitionDM(in.CollectionID, id)
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			}
			return status, err
		}
	}

C
cai.zhang 已提交
394 395 396
	// release all fields in the segments
	for _, id := range in.SegmentIDs {
		err := node.segManager.releaseSegment(id)
X
XuanYang-cn 已提交
397 398 399 400 401 402 403 404 405 406
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			}
			return status, err
		}
	}
	return nil, nil
}