query_coord.go 9.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package querycoord
13

X
xige-16 已提交
14
import (
X
xige-16 已提交
15
	"context"
S
sunby 已提交
16
	"math/rand"
17
	"path/filepath"
Z
zhenshan.cao 已提交
18
	"strconv"
19
	"sync"
X
xige-16 已提交
20
	"sync/atomic"
X
xige-16 已提交
21
	"time"
X
xige-16 已提交
22

23
	"github.com/golang/protobuf/proto"
24
	"go.etcd.io/etcd/api/v3/mvccpb"
25 26
	"go.uber.org/zap"

27
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
X
Xiangyu Wang 已提交
28 29
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
30
	"github.com/milvus-io/milvus/internal/proto/commonpb"
X
Xiangyu Wang 已提交
31
	"github.com/milvus-io/milvus/internal/proto/internalpb"
32
	"github.com/milvus-io/milvus/internal/proto/querypb"
X
Xiangyu Wang 已提交
33
	"github.com/milvus-io/milvus/internal/types"
34
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
35
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
36
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
37
	"github.com/milvus-io/milvus/internal/util/typeutil"
X
xige-16 已提交
38
)
39

40
// Timestamp is an alias for the Int64 type
41 42
type Timestamp = typeutil.Timestamp

43 44 45 46 47
type queryChannelInfo struct {
	requestChannel  string
	responseChannel string
}

48
// QueryCoord is the coordinator of queryNodes
49
type QueryCoord struct {
X
xige-16 已提交
50 51
	loopCtx    context.Context
	loopCancel context.CancelFunc
52 53
	loopWg     sync.WaitGroup
	kvClient   *etcdkv.EtcdKV
X
xige-16 已提交
54

X
xige-16 已提交
55 56
	initOnce sync.Once

57
	queryCoordID uint64
58
	meta         Meta
59
	cluster      Cluster
X
xige-16 已提交
60
	newNodeFn    newQueryNodeFn
61
	scheduler    *TaskScheduler
X
xige-16 已提交
62

63 64
	metricsCacheManager *metricsinfo.MetricsCacheManager

65 66
	dataCoordClient types.DataCoord
	rootCoordClient types.RootCoord
X
xige-16 已提交
67

68
	session   *sessionutil.Session
C
congqixia 已提交
69
	liveCh    <-chan bool
70
	eventChan <-chan *sessionutil.SessionEvent
G
godchen 已提交
71

X
xige-16 已提交
72 73
	stateCode  atomic.Value
	enableGrpc bool
G
groot 已提交
74 75

	msFactory msgstream.Factory
X
xige-16 已提交
76 77
}

78
// Register register query service at etcd
79
func (qc *QueryCoord) Register() error {
80
	log.Debug("query coord session info", zap.String("metaPath", Params.MetaRootPath), zap.Strings("etcdEndPoints", Params.EtcdEndpoints), zap.String("address", Params.Address))
81
	qc.session = sessionutil.NewSession(qc.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
C
congqixia 已提交
82
	qc.liveCh = qc.session.Init(typeutil.QueryCoordRole, Params.Address, true)
83
	Params.NodeID = uint64(qc.session.ServerID)
X
Xiaofan 已提交
84
	Params.SetLogger(typeutil.UniqueID(-1))
85 86 87
	return nil
}

X
xige-16 已提交
88
// Init function initializes the queryCoord's meta, cluster, etcdKV and task scheduler
89
func (qc *QueryCoord) Init() error {
90
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
91
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
92 93 94
		if err != nil {
			return err
		}
95
		qc.kvClient = etcdKV
96
		return nil
97
	}
X
xige-16 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111
	var initError error = nil
	qc.initOnce.Do(func() {
		log.Debug("query coordinator try to connect etcd")
		initError = retry.Do(qc.loopCtx, connectEtcdFn, retry.Attempts(300))
		if initError != nil {
			log.Debug("query coordinator try to connect etcd failed", zap.Error(initError))
			return
		}
		log.Debug("query coordinator try to connect etcd success")
		qc.meta, initError = newMeta(qc.kvClient)
		if initError != nil {
			log.Error("query coordinator init meta failed", zap.Error(initError))
			return
		}
112

X
xige-16 已提交
113 114 115 116 117
		qc.cluster, initError = newQueryNodeCluster(qc.loopCtx, qc.meta, qc.kvClient, qc.newNodeFn, qc.session)
		if initError != nil {
			log.Error("query coordinator init cluster failed", zap.Error(initError))
			return
		}
118

X
xige-16 已提交
119 120 121 122 123
		qc.scheduler, initError = NewTaskScheduler(qc.loopCtx, qc.meta, qc.cluster, qc.kvClient, qc.rootCoordClient, qc.dataCoordClient)
		if initError != nil {
			log.Error("query coordinator init task scheduler failed", zap.Error(initError))
			return
		}
124

X
xige-16 已提交
125 126
		qc.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
	})
127

X
xige-16 已提交
128
	return initError
X
xige-16 已提交
129 130
}

X
xige-16 已提交
131
// Start function starts the goroutines to watch the meta and node updates
132 133
func (qc *QueryCoord) Start() error {
	qc.scheduler.Start()
134
	log.Debug("start scheduler ...")
135
	qc.UpdateStateCode(internalpb.StateCode_Healthy)
136

137 138
	qc.loopWg.Add(1)
	go qc.watchNodeLoop()
139

140 141
	qc.loopWg.Add(1)
	go qc.watchMetaLoop()
142

C
congqixia 已提交
143 144 145 146
	go qc.session.LivenessCheck(qc.loopCtx, qc.liveCh, func() {
		qc.Stop()
	})

X
xige-16 已提交
147
	return nil
X
xige-16 已提交
148 149
}

X
xige-16 已提交
150
// Stop function stops watching the meta and node updates
151 152
func (qc *QueryCoord) Stop() error {
	qc.scheduler.Close()
153
	log.Debug("close scheduler ...")
154 155
	qc.loopCancel()
	qc.UpdateStateCode(internalpb.StateCode_Abnormal)
156

157
	qc.loopWg.Wait()
X
xige-16 已提交
158
	return nil
X
xige-16 已提交
159 160
}

161
// UpdateStateCode updates the status of the coord, including healthy, unhealthy
162 163
func (qc *QueryCoord) UpdateStateCode(code internalpb.StateCode) {
	qc.stateCode.Store(code)
164 165
}

166
func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord, error) {
S
sunby 已提交
167
	rand.Seed(time.Now().UnixNano())
168
	queryChannels := make([]*queryChannelInfo, 0)
Z
zhenshan.cao 已提交
169 170 171 172 173 174 175 176 177 178 179
	channelID := len(queryChannels)
	searchPrefix := Params.SearchChannelPrefix
	searchResultPrefix := Params.SearchResultChannelPrefix
	allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
	allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10)

	queryChannels = append(queryChannels, &queryChannelInfo{
		requestChannel:  allocatedQueryChannel,
		responseChannel: allocatedQueryResultChannel,
	})

X
xige-16 已提交
180
	ctx1, cancel := context.WithCancel(ctx)
181
	service := &QueryCoord{
182 183 184
		loopCtx:    ctx1,
		loopCancel: cancel,
		msFactory:  factory,
X
xige-16 已提交
185
		newNodeFn:  newQueryNode,
X
xige-16 已提交
186
	}
X
XuanYang-cn 已提交
187

G
godchen 已提交
188
	service.UpdateStateCode(internalpb.StateCode_Abnormal)
189
	log.Debug("query coordinator", zap.Any("queryChannels", queryChannels))
X
xige-16 已提交
190
	return service, nil
191
}
X
xige-16 已提交
192

193
// SetRootCoord sets root coordinator's client
194 195
func (qc *QueryCoord) SetRootCoord(rootCoord types.RootCoord) {
	qc.rootCoordClient = rootCoord
X
xige-16 已提交
196 197
}

198 199
func (qc *QueryCoord) SetDataCoord(dataCoord types.DataCoord) {
	qc.dataCoordClient = dataCoord
X
xige-16 已提交
200
}
201

202 203
func (qc *QueryCoord) watchNodeLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)
204
	defer cancel()
205 206
	defer qc.loopWg.Done()
	log.Debug("query coordinator start watch node loop")
207

208 209 210 211 212 213 214 215 216 217 218 219
	offlineNodes, err := qc.cluster.offlineNodes()
	if err == nil {
		offlineNodeIDs := make([]int64, 0)
		for id := range offlineNodes {
			offlineNodeIDs = append(offlineNodeIDs, id)
		}
		loadBalanceSegment := &querypb.LoadBalanceRequest{
			Base: &commonpb.MsgBase{
				MsgType:  commonpb.MsgType_LoadBalanceSegments,
				SourceID: qc.session.ServerID,
			},
			SourceNodeIDs: offlineNodeIDs,
220 221
		}

222 223 224 225 226 227 228 229 230 231 232
		loadBalanceTask := &LoadBalanceTask{
			BaseTask: BaseTask{
				ctx:              qc.loopCtx,
				Condition:        NewTaskCondition(qc.loopCtx),
				triggerCondition: querypb.TriggerCondition_nodeDown,
			},
			LoadBalanceRequest: loadBalanceSegment,
			rootCoord:          qc.rootCoordClient,
			dataCoord:          qc.dataCoordClient,
			cluster:            qc.cluster,
			meta:               qc.meta,
233
		}
234 235
		qc.scheduler.Enqueue([]task{loadBalanceTask})
		log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask))
236 237
	}

238
	qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.getSessionVersion()+1)
239 240 241 242
	for {
		select {
		case <-ctx.Done():
			return
243
		case event := <-qc.eventChan:
244 245 246
			switch event.EventType {
			case sessionutil.SessionAddEvent:
				serverID := event.Session.ServerID
247
				log.Debug("start add a queryNode to cluster", zap.Any("nodeID", serverID))
248
				err := qc.cluster.registerNode(ctx, event.Session, serverID, disConnect)
249 250 251
				if err != nil {
					log.Error("query node failed to register", zap.Int64("nodeID", serverID), zap.String("error info", err.Error()))
				}
252
				qc.metricsCacheManager.InvalidateSystemInfoMetrics()
253 254
			case sessionutil.SessionDelEvent:
				serverID := event.Session.ServerID
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
				log.Debug("get a del event after queryNode down", zap.Int64("nodeID", serverID))
				_, err := qc.cluster.getNodeByID(serverID)
				if err != nil {
					log.Error("queryNode not exist", zap.Int64("nodeID", serverID))
					continue
				}

				qc.cluster.stopNode(serverID)
				loadBalanceSegment := &querypb.LoadBalanceRequest{
					Base: &commonpb.MsgBase{
						MsgType:  commonpb.MsgType_LoadBalanceSegments,
						SourceID: qc.session.ServerID,
					},
					SourceNodeIDs: []int64{serverID},
					BalanceReason: querypb.TriggerCondition_nodeDown,
				}

				loadBalanceTask := &LoadBalanceTask{
					BaseTask: BaseTask{
						ctx:              qc.loopCtx,
						Condition:        NewTaskCondition(qc.loopCtx),
						triggerCondition: querypb.TriggerCondition_nodeDown,
					},
					LoadBalanceRequest: loadBalanceSegment,
					rootCoord:          qc.rootCoordClient,
					dataCoord:          qc.dataCoordClient,
					cluster:            qc.cluster,
					meta:               qc.meta,
283
				}
284
				qc.scheduler.Enqueue([]task{loadBalanceTask})
285
				log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask))
286
				qc.metricsCacheManager.InvalidateSystemInfoMetrics()
287 288 289 290 291
			}
		}
	}
}

292 293
func (qc *QueryCoord) watchMetaLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)
294 295

	defer cancel()
296
	defer qc.loopWg.Done()
297
	log.Debug("query coordinator start watch MetaReplica loop")
298

299
	watchChan := qc.kvClient.WatchWithPrefix("queryNode-segmentMeta")
300 301 302 303 304 305

	for {
		select {
		case <-ctx.Done():
			return
		case resp := <-watchChan:
306
			log.Debug("segment MetaReplica updated.")
307 308 309
			for _, event := range resp.Events {
				segmentID, err := strconv.ParseInt(filepath.Base(string(event.Kv.Key)), 10, 64)
				if err != nil {
310
					log.Error("watch MetaReplica loop error when get segmentID", zap.Any("error", err.Error()))
311 312
				}
				segmentInfo := &querypb.SegmentInfo{}
313
				err = proto.Unmarshal(event.Kv.Value, segmentInfo)
314
				if err != nil {
315
					log.Error("watch MetaReplica loop error when unmarshal", zap.Any("error", err.Error()))
316 317 318 319
				}
				switch event.Type {
				case mvccpb.PUT:
					//TODO::
320
					qc.meta.setSegmentInfo(segmentID, segmentInfo)
321 322 323 324 325 326 327 328
				case mvccpb.DELETE:
					//TODO::
				}
			}
		}
	}

}