query_node.go 3.7 KB
Newer Older
1
package querynodeimp
B
bigsheeper 已提交
2

3 4
/*

5
#cgo CFLAGS: -I${SRCDIR}/../core/output/include
6

G
GuoRentong 已提交
7
#cgo LDFLAGS: -L${SRCDIR}/../core/output/lib -lmilvus_segcore -Wl,-rpath=${SRCDIR}/../core/output/lib
8

F
FluorineDog 已提交
9 10
#include "segcore/collection_c.h"
#include "segcore/segment_c.h"
11 12

*/
B
bigsheeper 已提交
13
import "C"
14

B
bigsheeper 已提交
15
import (
16
	"context"
17 18 19 20 21
	"fmt"
	"io"

	"github.com/opentracing/opentracing-go"
	"github.com/uber/jaeger-client-go/config"
22 23 24 25
	"google.golang.org/grpc"

	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	queryPb "github.com/zilliztech/milvus-distributed/internal/proto/querypb"
B
bigsheeper 已提交
26 27
)

28 29 30 31 32 33 34 35 36 37 38 39
type Node interface {
	Start() error
	Close()

	AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelsRequest) (*commonpb.Status, error)
	RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelsRequest) (*commonpb.Status, error)
	WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error)
	LoadSegments(ctx context.Context, in *queryPb.LoadSegmentRequest) (*commonpb.Status, error)
	ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentRequest) (*commonpb.Status, error)
	GetPartitionState(ctx context.Context, in *queryPb.PartitionStatesRequest) (*queryPb.PartitionStatesResponse, error)
}

B
bigsheeper 已提交
40
type QueryNode struct {
X
XuanYang-cn 已提交
41
	queryNodeLoopCtx    context.Context
42
	queryNodeLoopCancel context.CancelFunc
43

B
bigsheeper 已提交
44
	QueryNodeID uint64
45
	grpcServer  *grpc.Server
B
bigsheeper 已提交
46

X
XuanYang-cn 已提交
47
	replica collectionReplica
B
bigsheeper 已提交
48

49
	// internal services
50 51 52 53 54
	dataSyncService  *dataSyncService
	metaService      *metaService
	searchService    *searchService
	loadIndexService *loadIndexService
	statsService     *statsService
55 56 57 58

	//opentracing
	tracer opentracing.Tracer
	closer io.Closer
B
bigsheeper 已提交
59
}
60

X
XuanYang-cn 已提交
61 62 63 64
func Init() {
	Params.Init()
}

65 66 67 68 69 70
func NewQueryNode(ctx context.Context, queryNodeID uint64) Node {
	var node Node = newQueryNode(ctx, queryNodeID)
	return node
}

func newQueryNode(ctx context.Context, queryNodeID uint64) *QueryNode {
X
XuanYang-cn 已提交
71 72

	ctx1, cancel := context.WithCancel(ctx)
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
	q := &QueryNode{
		queryNodeLoopCtx:    ctx1,
		queryNodeLoopCancel: cancel,
		QueryNodeID:         queryNodeID,

		dataSyncService: nil,
		metaService:     nil,
		searchService:   nil,
		statsService:    nil,
	}

	var err error
	cfg := &config.Configuration{
		ServiceName: "query_node",
		Sampler: &config.SamplerConfig{
			Type:  "const",
			Param: 1,
		},
	}
S
sunby 已提交
92
	q.tracer, q.closer, err = cfg.NewTracer()
93 94 95 96
	if err != nil {
		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
	}
	opentracing.SetGlobalTracer(q.tracer)
X
XuanYang-cn 已提交
97

B
bigsheeper 已提交
98
	segmentsMap := make(map[int64]*Segment)
99
	collections := make([]*Collection, 0)
B
bigsheeper 已提交
100

101 102
	tSafe := newTSafe()

103
	q.replica = &collectionReplicaImpl{
G
godchen 已提交
104 105
		collections: collections,
		segments:    segmentsMap,
106 107

		tSafe: tSafe,
G
godchen 已提交
108 109
	}

110
	return q
B
bigsheeper 已提交
111 112
}

X
XuanYang-cn 已提交
113 114 115 116 117
func (node *QueryNode) Start() error {
	// todo add connectMaster logic
	node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
	node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
	node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
118 119
	node.loadIndexService = newLoadIndexService(node.queryNodeLoopCtx, node.replica)
	node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadIndexService.fieldStatsChan)
B
bigsheeper 已提交
120

121
	go node.dataSyncService.start()
N
neza2017 已提交
122
	go node.searchService.start()
B
bigsheeper 已提交
123
	go node.metaService.start()
124
	go node.loadIndexService.start()
X
XuanYang-cn 已提交
125
	go node.statsService.start()
126 127

	<-node.queryNodeLoopCtx.Done()
X
XuanYang-cn 已提交
128
	return nil
B
bigsheeper 已提交
129
}
B
bigsheeper 已提交
130

B
bigsheeper 已提交
131
func (node *QueryNode) Close() {
X
XuanYang-cn 已提交
132 133
	node.queryNodeLoopCancel()

B
bigsheeper 已提交
134
	// free collectionReplica
X
XuanYang-cn 已提交
135
	node.replica.freeAll()
B
bigsheeper 已提交
136 137 138

	// close services
	if node.dataSyncService != nil {
X
XuanYang-cn 已提交
139
		node.dataSyncService.close()
B
bigsheeper 已提交
140 141
	}
	if node.searchService != nil {
X
XuanYang-cn 已提交
142
		node.searchService.close()
B
bigsheeper 已提交
143
	}
B
bigsheeper 已提交
144 145 146
	if node.loadIndexService != nil {
		node.loadIndexService.close()
	}
B
bigsheeper 已提交
147
	if node.statsService != nil {
X
XuanYang-cn 已提交
148
		node.statsService.close()
B
bigsheeper 已提交
149
	}
150 151 152 153
	if node.closer != nil {
		node.closer.Close()
	}

R
rain 已提交
154
}