query_node.go 3.6 KB
Newer Older
1
package querynode
B
bigsheeper 已提交
2

3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
8

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12

*/
B
bigsheeper 已提交
13
import "C"
14

B
bigsheeper 已提交
15
import (
16
	"context"
17 18 19 20 21
	"fmt"
	"io"

	"github.com/opentracing/opentracing-go"
	"github.com/uber/jaeger-client-go/config"
N
neza2017 已提交
22 23
	"google.golang.org/grpc"

24 25
	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
B
bigsheeper 已提交
26 27
)

28 29 30 31
type Node interface {
	Start() error
	Close()

32 33 34 35 36 37
	AddQueryChannel(in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error)
	RemoveQueryChannel(in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error)
	WatchDmChannels(in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error)
	LoadSegments(in *queryPb.LoadSegmentRequest) (*commonpb.Status, error)
	ReleaseSegments(in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error)
	GetPartitionState(in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error)
38 39
}

B
bigsheeper 已提交
40
type QueryNode struct {
X
XuanYang-cn 已提交
41
	queryNodeLoopCtx    context.Context
42
	queryNodeLoopCancel context.CancelFunc
43

B
bigsheeper 已提交
44
	QueryNodeID uint64
N
neza2017 已提交
45
	grpcServer  *grpc.Server
B
bigsheeper 已提交
46

X
XuanYang-cn 已提交
47
	replica collectionReplica
B
bigsheeper 已提交
48

49
	// internal services
50 51 52 53 54
	dataSyncService  *dataSyncService
	metaService      *metaService
	searchService    *searchService
	loadIndexService *loadIndexService
	statsService     *statsService
55

56 57
	segManager *segmentManager

58 59 60
	//opentracing
	tracer opentracing.Tracer
	closer io.Closer
B
bigsheeper 已提交
61
}
62

X
XuanYang-cn 已提交
63 64 65 66
func Init() {
	Params.Init()
}

67 68 69 70 71 72
func NewQueryNode(ctx context.Context, queryNodeID uint64) Node {
	var node Node = newQueryNode(ctx, queryNodeID)
	return node
}

func newQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
N
neza2017 已提交
73

X
XuanYang-cn 已提交
74
	ctx1, cancel := context.WithCancel(ctx)
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
	q := &QueryNode{
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,
		QueryNodeID:         queryNodeID,

		dataSyncService: nil,
		metaService:     nil,
		searchService:   nil,
		statsService:    nil,
	}

	var err error
	cfg := &config.Configuration{
		ServiceName: "query_node",
		Sampler: &config.SamplerConfig{
			Type:  "const",
			Param: 1,
		},
	}
S
sunby 已提交
94
	q.tracer, q.closer, err = cfg.NewTracer()
95 96 97 98
	if err != nil {
		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
	}
	opentracing.SetGlobalTracer(q.tracer)
X
XuanYang-cn 已提交
99

B
bigsheeper 已提交
100
	segmentsMap := make(map[int64]*Segment)
101
	collections := make([]*Collection, 0)
B
bigsheeper 已提交
102

103 104
	tSafe := newTSafe()

105
	q.replica = &collectionReplicaImpl{
G
godchen 已提交
106 107
		collections: collections,
		segments:    segmentsMap,
108 109

		tSafe: tSafe,
G
godchen 已提交
110 111
	}

112
	return q
B
bigsheeper 已提交
113 114
}

X
XuanYang-cn 已提交
115 116 117 118 119
func (node *QueryNode) Start() error {
	// todo add connectMaster logic
	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
	node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
120 121
	node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica)
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan)
B
bigsheeper 已提交
122

123
	go node.dataSyncService.start()
N
neza2017 已提交
124
	go node.searchService.start()
B
bigsheeper 已提交
125
	go node.metaService.start()
126
	go node.loadIndexService.start()
X
XuanYang-cn 已提交
127
	go node.statsService.start()
128 129

	<-node.queryNodeLoopCtx.Done()
X
XuanYang-cn 已提交
130
	return nil
B
bigsheeper 已提交
131
}
B
bigsheeper 已提交
132

B
bigsheeper 已提交
133
func (node *QueryNode) Close() {
X
XuanYang-cn 已提交
134 135
	node.queryNodeLoopCancel()

B
bigsheeper 已提交
136
	// free collectionReplica
X
XuanYang-cn 已提交
137
	node.replica.freeAll()
B
bigsheeper 已提交
138 139 140

	// close services
	if node.dataSyncService != nil {
X
XuanYang-cn 已提交
141
		node.dataSyncService.close()
B
bigsheeper 已提交
142 143
	}
	if node.searchService != nil {
X
XuanYang-cn 已提交
144
		node.searchService.close()
B
bigsheeper 已提交
145
	}
B
bigsheeper 已提交
146 147 148
	if node.loadIndexService != nil {
		node.loadIndexService.close()
	}
B
bigsheeper 已提交
149
	if node.statsService != nil {
X
XuanYang-cn 已提交
150
		node.statsService.close()
B
bigsheeper 已提交
151
	}
152 153 154 155
	if node.closer != nil {
		node.closer.Close()
	}

R
rain 已提交
156
}