query_node.go 2.3 KB
Newer Older
N
neza2017 已提交
1
package querynode
B
bigsheeper 已提交
2

3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
8

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12

*/
B
bigsheeper 已提交
13
import "C"
14

B
bigsheeper 已提交
15
import (
16
	"context"
B
bigsheeper 已提交
17 18 19
)

type QueryNode struct {
X
XuanYang-cn 已提交
20
	queryNodeLoopCtx    context.Context
21
	queryNodeLoopCancel context.CancelFunc
22

B
bigsheeper 已提交
23
	QueryNodeID uint64
B
bigsheeper 已提交
24

X
XuanYang-cn 已提交
25
	replica collectionReplica
B
bigsheeper 已提交
26

27 28 29 30 31 32
	// services
	dataSyncService  *dataSyncService
	metaService      *metaService
	searchService    *searchService
	loadIndexService *loadIndexService
	statsService     *statsService
B
bigsheeper 已提交
33
}
34

X
XuanYang-cn 已提交
35 36 37 38
func Init() {
	Params.Init()
}

D
dragondriver 已提交
39
func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
X
XuanYang-cn 已提交
40 41 42

	ctx1, cancel := context.WithCancel(ctx)

B
bigsheeper 已提交
43
	segmentsMap := make(map[int64]*Segment)
44
	collections := make([]*Collection, 0)
B
bigsheeper 已提交
45

46 47
	tSafe := newTSafe()

Q
quicksilver 已提交
48
	var replica collectionReplica = &collectionReplicaImpl{
G
godchen 已提交
49 50
		collections: collections,
		segments:    segmentsMap,
51 52

		tSafe: tSafe,
G
godchen 已提交
53 54
	}

Q
quicksilver 已提交
55 56 57 58 59 60 61 62 63 64 65 66
	return &QueryNode{
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,
		QueryNodeID:         queryNodeID,

		replica: replica,

		dataSyncService: nil,
		metaService:     nil,
		searchService:   nil,
		statsService:    nil,
	}
B
bigsheeper 已提交
67 68
}

X
XuanYang-cn 已提交
69 70 71 72 73
func (node *QueryNode) Start() error {
	// todo add connectMaster logic
	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
	node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
74 75
	node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica)
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan)
B
bigsheeper 已提交
76

77
	go node.dataSyncService.start()
N
neza2017 已提交
78
	go node.searchService.start()
B
bigsheeper 已提交
79
	go node.metaService.start()
80
	go node.loadIndexService.start()
X
XuanYang-cn 已提交
81
	go node.statsService.start()
82 83

	<-node.queryNodeLoopCtx.Done()
X
XuanYang-cn 已提交
84
	return nil
B
bigsheeper 已提交
85
}
B
bigsheeper 已提交
86

B
bigsheeper 已提交
87
func (node *QueryNode) Close() {
X
XuanYang-cn 已提交
88 89
	node.queryNodeLoopCancel()

B
bigsheeper 已提交
90
	// free collectionReplica
X
XuanYang-cn 已提交
91
	node.replica.freeAll()
B
bigsheeper 已提交
92 93 94

	// close services
	if node.dataSyncService != nil {
X
XuanYang-cn 已提交
95
		node.dataSyncService.close()
B
bigsheeper 已提交
96 97
	}
	if node.searchService != nil {
X
XuanYang-cn 已提交
98
		node.searchService.close()
B
bigsheeper 已提交
99
	}
Q
quicksilver 已提交
100 101 102
	if node.loadIndexService != nil {
		node.loadIndexService.close()
	}
B
bigsheeper 已提交
103
	if node.statsService != nil {
X
XuanYang-cn 已提交
104
		node.statsService.close()
B
bigsheeper 已提交
105
	}
R
rain 已提交
106
}