query_node.go 9.7 KB
Newer Older
1
package querynode
B
bigsheeper 已提交
2

3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
8

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12

*/
B
bigsheeper 已提交
13
import "C"
14

B
bigsheeper 已提交
15
import (
16
	"context"
X
XuanYang-cn 已提交
17
	"errors"
18 19 20 21 22
	"fmt"
	"io"

	"github.com/opentracing/opentracing-go"
	"github.com/uber/jaeger-client-go/config"
X
Xiangyu Wang 已提交
23 24
	"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
	"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
25 26
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
B
bigsheeper 已提交
27 28
)

29 30 31 32
type Node interface {
	Start() error
	Close()

33 34 35 36 37 38
	AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error)
	RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error)
	WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error)
	LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error)
	ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error)
	GetPartitionState(in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error)
39 40
}

B
bigsheeper 已提交
41
type QueryNode struct {
X
XuanYang-cn 已提交
42
	queryNodeLoopCtx    context.Context
43
	queryNodeLoopCancel context.CancelFunc
44

B
bigsheeper 已提交
45
	QueryNodeID uint64
B
bigsheeper 已提交
46

X
XuanYang-cn 已提交
47
	replica collectionReplica
B
bigsheeper 已提交
48

49
	// internal services
50 51 52 53 54
	dataSyncService  *dataSyncService
	metaService      *metaService
	searchService    *searchService
	loadIndexService *loadIndexService
	statsService     *statsService
55

56 57
	segManager *segmentManager

58 59 60
	//opentracing
	tracer opentracing.Tracer
	closer io.Closer
B
bigsheeper 已提交
61
}
62

X
XuanYang-cn 已提交
63 64 65 66
func Init() {
	Params.Init()
}

67 68 69 70 71 72
func NewQueryNode(ctx context.Context, queryNodeID uint64) Node {
	var node Node = newQueryNode(ctx, queryNodeID)
	return node
}

func newQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
X
XuanYang-cn 已提交
73
	ctx1, cancel := context.WithCancel(ctx)
74 75 76 77 78 79 80 81 82
	q := &QueryNode{
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,
		QueryNodeID:         queryNodeID,

		dataSyncService: nil,
		metaService:     nil,
		searchService:   nil,
		statsService:    nil,
X
XuanYang-cn 已提交
83
		segManager:      nil,
84 85 86 87 88 89 90 91 92 93
	}

	var err error
	cfg := &config.Configuration{
		ServiceName: "query_node",
		Sampler: &config.SamplerConfig{
			Type:  "const",
			Param: 1,
		},
	}
S
sunby 已提交
94
	q.tracer, q.closer, err = cfg.NewTracer()
95 96 97 98
	if err != nil {
		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
	}
	opentracing.SetGlobalTracer(q.tracer)
X
XuanYang-cn 已提交
99

B
bigsheeper 已提交
100
	segmentsMap := make(map[int64]*Segment)
101
	collections := make([]*Collection, 0)
B
bigsheeper 已提交
102

103 104
	tSafe := newTSafe()

105
	q.replica = &collectionReplicaImpl{
G
godchen 已提交
106 107
		collections: collections,
		segments:    segmentsMap,
108 109

		tSafe: tSafe,
G
godchen 已提交
110 111
	}

112
	return q
B
bigsheeper 已提交
113 114
}

X
XuanYang-cn 已提交
115 116
func (node *QueryNode) Start() error {
	// todo add connectMaster logic
X
XuanYang-cn 已提交
117
	// init services and manager
X
XuanYang-cn 已提交
118 119 120
	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
	node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
121 122
	node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica)
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan)
X
XuanYang-cn 已提交
123
	node.segManager = newSegmentManager(node.queryNodeLoopCtx, node.replica, node.loadIndexService.loadIndexReqChan)
B
bigsheeper 已提交
124

X
XuanYang-cn 已提交
125
	// start services
126
	go node.dataSyncService.start()
N
neza2017 已提交
127
	go node.searchService.start()
B
bigsheeper 已提交
128
	go node.metaService.start()
129
	go node.loadIndexService.start()
X
XuanYang-cn 已提交
130
	go node.statsService.start()
131 132

	<-node.queryNodeLoopCtx.Done()
X
XuanYang-cn 已提交
133
	return nil
B
bigsheeper 已提交
134
}
B
bigsheeper 已提交
135

B
bigsheeper 已提交
136
func (node *QueryNode) Close() {
X
XuanYang-cn 已提交
137 138
	node.queryNodeLoopCancel()

B
bigsheeper 已提交
139
	// free collectionReplica
X
XuanYang-cn 已提交
140
	node.replica.freeAll()
B
bigsheeper 已提交
141 142 143

	// close services
	if node.dataSyncService != nil {
X
XuanYang-cn 已提交
144
		node.dataSyncService.close()
B
bigsheeper 已提交
145 146
	}
	if node.searchService != nil {
X
XuanYang-cn 已提交
147
		node.searchService.close()
B
bigsheeper 已提交
148
	}
B
bigsheeper 已提交
149 150 151
	if node.loadIndexService != nil {
		node.loadIndexService.close()
	}
B
bigsheeper 已提交
152
	if node.statsService != nil {
X
XuanYang-cn 已提交
153
		node.statsService.close()
B
bigsheeper 已提交
154
	}
155 156 157
	if node.closer != nil {
		node.closer.Close()
	}
X
XuanYang-cn 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170
}

func (node *QueryNode) AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error) {
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
171
	searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
172 173 174 175 176 177 178 179 180 181
	if !ok {
		errMsg := "type assertion failed for search message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
182
	resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
183 184 185 186 187 188 189 190 191 192 193 194 195 196
	if !ok {
		errMsg := "type assertion failed for search result message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// add request channel
	pulsarBufSize := Params.SearchPulsarBufSize
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
X
Xiangyu Wang 已提交
197
	unmarshalDispatcher := util.NewUnmarshalDispatcher()
X
XuanYang-cn 已提交
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
	searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)

	// add result channel
	producerChannels := []string{in.ResultChannelID}
	resultStream.CreatePulsarProducers(producerChannels)

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
	return status, nil
}

func (node *QueryNode) RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error) {
	if node.searchService == nil || node.searchService.searchMsgStream == nil {
		errMsg := "null search service or null search result message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
221
	searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
222 223 224 225 226 227 228 229 230 231
	if !ok {
		errMsg := "type assertion failed for search message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
232
	resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
233 234 235 236 237 238 239 240 241 242 243 244 245 246
	if !ok {
		errMsg := "type assertion failed for search result message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// remove request channel
	pulsarBufSize := Params.SearchPulsarBufSize
	consumeChannels := []string{in.RequestChannelID}
	consumeSubName := Params.MsgChannelSubName
X
Xiangyu Wang 已提交
247
	unmarshalDispatcher := util.NewUnmarshalDispatcher()
X
XuanYang-cn 已提交
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
	// TODO: searchStream.RemovePulsarConsumers(producerChannels)
	searchStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)

	// remove result channel
	producerChannels := []string{in.ResultChannelID}
	// TODO: resultStream.RemovePulsarProducer(producerChannels)
	resultStream.CreatePulsarProducers(producerChannels)

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
	return status, nil
}

func (node *QueryNode) WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
	if node.dataSyncService == nil || node.dataSyncService.dmStream == nil {
		errMsg := "null data sync service or null data manipulation stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

X
Xiangyu Wang 已提交
273
	fgDMMsgStream, ok := node.dataSyncService.dmStream.(*pulsarms.PulsarMsgStream)
X
XuanYang-cn 已提交
274 275 276 277 278 279 280 281 282 283 284 285 286 287
	if !ok {
		errMsg := "type assertion failed for dm message stream"
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    errMsg,
		}

		return status, errors.New(errMsg)
	}

	// add request channel
	pulsarBufSize := Params.SearchPulsarBufSize
	consumeChannels := in.ChannelIDs
	consumeSubName := Params.MsgChannelSubName
X
Xiangyu Wang 已提交
288
	unmarshalDispatcher := util.NewUnmarshalDispatcher()
X
XuanYang-cn 已提交
289 290 291 292 293 294 295 296 297 298
	fgDMMsgStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}
	return status, nil
}

func (node *QueryNode) LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error) {
	// TODO: support db
Z
zhenshan.cao 已提交
299 300
	partitionID := in.PartitionID
	collectionID := in.CollectionID
X
XuanYang-cn 已提交
301
	fieldIDs := in.FieldIDs
Z
zhenshan.cao 已提交
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
	// TODO: interim solution
	if len(fieldIDs) == 0 {
		collection, err := node.replica.getCollectionByID(collectionID)
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			}
			return status, err
		}
		fieldIDs = make([]int64, 0)
		for _, field := range collection.Schema().Fields {
			fieldIDs = append(fieldIDs, field.FieldID)
		}
	}
X
XuanYang-cn 已提交
317
	for _, segmentID := range in.SegmentIDs {
Z
zhenshan.cao 已提交
318 319
		indexID := UniqueID(0) // TODO: get index id from master
		err := node.segManager.loadSegment(segmentID, partitionID, collectionID, &fieldIDs)
X
XuanYang-cn 已提交
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
		if err != nil {
			// TODO: return or continue?
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			}
			return status, err
		}
		err = node.segManager.loadIndex(segmentID, indexID)
		if err != nil {
			// TODO: return or continue?
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
				Reason:    err.Error(),
			}
			return status, err
		}
	}

	return nil, nil
}

func (node *QueryNode) ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error) {
	// TODO: implement
	return nil, nil
}
346

X
XuanYang-cn 已提交
347 348 349
func (node *QueryNode) GetPartitionState(in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error) {
	// TODO: implement
	return nil, nil
R
rain 已提交
350
}