query_node.go 2.8 KB
Newer Older
N
neza2017 已提交
1
package querynode
B
bigsheeper 已提交
2

3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
8

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12

*/
B
bigsheeper 已提交
13
import "C"
14

B
bigsheeper 已提交
15
import (
16
	"context"
17 18 19 20 21
	"fmt"
	"io"

	"github.com/opentracing/opentracing-go"
	"github.com/uber/jaeger-client-go/config"
B
bigsheeper 已提交
22 23 24
)

type QueryNode struct {
X
XuanYang-cn 已提交
25
	queryNodeLoopCtx    context.Context
26
	queryNodeLoopCancel context.CancelFunc
27

B
bigsheeper 已提交
28
	QueryNodeID uint64
B
bigsheeper 已提交
29

X
XuanYang-cn 已提交
30
	replica collectionReplica
B
bigsheeper 已提交
31

32 33 34 35 36 37
	// services
	dataSyncService  *dataSyncService
	metaService      *metaService
	searchService    *searchService
	loadIndexService *loadIndexService
	statsService     *statsService
38 39 40 41

	//opentracing
	tracer opentracing.Tracer
	closer io.Closer
B
bigsheeper 已提交
42
}
43

X
XuanYang-cn 已提交
44 45 46 47
func Init() {
	Params.Init()
}

D
dragondriver 已提交
48
func NewQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
X
XuanYang-cn 已提交
49 50

	ctx1, cancel := context.WithCancel(ctx)
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
	q := &QueryNode{
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,
		QueryNodeID:         queryNodeID,

		dataSyncService: nil,
		metaService:     nil,
		searchService:   nil,
		statsService:    nil,
	}

	var err error
	cfg := &config.Configuration{
		ServiceName: "query_node",
		Sampler: &config.SamplerConfig{
			Type:  "const",
			Param: 1,
		},
	}
G
godchen 已提交
70
	q.tracer, q.closer, err = cfg.NewTracer()
71 72 73 74
	if err != nil {
		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
	}
	opentracing.SetGlobalTracer(q.tracer)
X
XuanYang-cn 已提交
75

B
bigsheeper 已提交
76
	segmentsMap := make(map[int64]*Segment)
77
	collections := make([]*Collection, 0)
B
bigsheeper 已提交
78

79 80
	tSafe := newTSafe()

81
	q.replica = &collectionReplicaImpl{
G
godchen 已提交
82 83
		collections: collections,
		segments:    segmentsMap,
84 85

		tSafe: tSafe,
G
godchen 已提交
86 87
	}

88
	return q
B
bigsheeper 已提交
89 90
}

X
XuanYang-cn 已提交
91 92 93 94 95
func (node *QueryNode) Start() error {
	// todo add connectMaster logic
	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
	node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
96 97
	node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica)
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan)
B
bigsheeper 已提交
98

99
	go node.dataSyncService.start()
N
neza2017 已提交
100
	go node.searchService.start()
B
bigsheeper 已提交
101
	go node.metaService.start()
102
	go node.loadIndexService.start()
X
XuanYang-cn 已提交
103
	go node.statsService.start()
104 105

	<-node.queryNodeLoopCtx.Done()
X
XuanYang-cn 已提交
106
	return nil
B
bigsheeper 已提交
107
}
B
bigsheeper 已提交
108

B
bigsheeper 已提交
109
func (node *QueryNode) Close() {
X
XuanYang-cn 已提交
110 111
	node.queryNodeLoopCancel()

B
bigsheeper 已提交
112
	// free collectionReplica
X
XuanYang-cn 已提交
113
	node.replica.freeAll()
B
bigsheeper 已提交
114 115 116

	// close services
	if node.dataSyncService != nil {
X
XuanYang-cn 已提交
117
		node.dataSyncService.close()
B
bigsheeper 已提交
118 119
	}
	if node.searchService != nil {
X
XuanYang-cn 已提交
120
		node.searchService.close()
B
bigsheeper 已提交
121
	}
B
bigsheeper 已提交
122 123 124
	if node.loadIndexService != nil {
		node.loadIndexService.close()
	}
B
bigsheeper 已提交
125
	if node.statsService != nil {
X
XuanYang-cn 已提交
126
		node.statsService.close()
B
bigsheeper 已提交
127
	}
128 129 130 131
	if node.closer != nil {
		node.closer.Close()
	}

R
rain 已提交
132
}