query_coord.go 10.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package querycoord
13

X
xige-16 已提交
14
import (
X
xige-16 已提交
15
	"context"
S
sunby 已提交
16
	"math/rand"
17
	"path/filepath"
Z
zhenshan.cao 已提交
18
	"strconv"
19
	"sync"
X
xige-16 已提交
20
	"sync/atomic"
X
xige-16 已提交
21
	"time"
X
xige-16 已提交
22

23
	"github.com/golang/protobuf/proto"
24
	"go.etcd.io/etcd/api/v3/mvccpb"
25 26
	"go.uber.org/zap"

27
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
X
Xiangyu Wang 已提交
28 29
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
30
	"github.com/milvus-io/milvus/internal/proto/commonpb"
X
Xiangyu Wang 已提交
31
	"github.com/milvus-io/milvus/internal/proto/internalpb"
32
	"github.com/milvus-io/milvus/internal/proto/querypb"
X
Xiangyu Wang 已提交
33
	"github.com/milvus-io/milvus/internal/types"
34
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
35
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
36
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
37
	"github.com/milvus-io/milvus/internal/util/typeutil"
X
xige-16 已提交
38
)
39

40
// Timestamp is an alias for the Int64 type
41 42
type Timestamp = typeutil.Timestamp

43 44 45 46 47
type queryChannelInfo struct {
	requestChannel  string
	responseChannel string
}

48
// QueryCoord is the coordinator of queryNodes
49
type QueryCoord struct {
X
xige-16 已提交
50 51
	loopCtx    context.Context
	loopCancel context.CancelFunc
52 53
	loopWg     sync.WaitGroup
	kvClient   *etcdkv.EtcdKV
X
xige-16 已提交
54

X
xige-16 已提交
55 56
	initOnce sync.Once

57
	queryCoordID uint64
58
	meta         Meta
59
	cluster      Cluster
X
xige-16 已提交
60
	newNodeFn    newQueryNodeFn
61
	scheduler    *TaskScheduler
X
xige-16 已提交
62

63 64
	metricsCacheManager *metricsinfo.MetricsCacheManager

65 66
	dataCoordClient types.DataCoord
	rootCoordClient types.RootCoord
X
xige-16 已提交
67

68
	session   *sessionutil.Session
C
congqixia 已提交
69
	liveCh    <-chan bool
70
	eventChan <-chan *sessionutil.SessionEvent
G
godchen 已提交
71

X
xige-16 已提交
72 73
	stateCode  atomic.Value
	enableGrpc bool
G
groot 已提交
74 75

	msFactory msgstream.Factory
X
xige-16 已提交
76 77
}

78
// Register register query service at etcd
79
func (qc *QueryCoord) Register() error {
80
	log.Debug("query coord session info", zap.String("metaPath", Params.MetaRootPath), zap.Strings("etcdEndPoints", Params.EtcdEndpoints), zap.String("address", Params.Address))
81
	qc.session = sessionutil.NewSession(qc.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
C
congqixia 已提交
82
	qc.liveCh = qc.session.Init(typeutil.QueryCoordRole, Params.Address, true)
83
	Params.NodeID = uint64(qc.session.ServerID)
X
Xiaofan 已提交
84
	Params.SetLogger(typeutil.UniqueID(-1))
85 86 87
	return nil
}

X
xige-16 已提交
88
// Init function initializes the queryCoord's meta, cluster, etcdKV and task scheduler
89
func (qc *QueryCoord) Init() error {
90
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
91
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
92 93 94
		if err != nil {
			return err
		}
95
		qc.kvClient = etcdKV
96
		return nil
97
	}
X
xige-16 已提交
98 99 100 101 102 103 104 105 106 107 108 109 110 111
	var initError error = nil
	qc.initOnce.Do(func() {
		log.Debug("query coordinator try to connect etcd")
		initError = retry.Do(qc.loopCtx, connectEtcdFn, retry.Attempts(300))
		if initError != nil {
			log.Debug("query coordinator try to connect etcd failed", zap.Error(initError))
			return
		}
		log.Debug("query coordinator try to connect etcd success")
		qc.meta, initError = newMeta(qc.kvClient)
		if initError != nil {
			log.Error("query coordinator init meta failed", zap.Error(initError))
			return
		}
112

X
xige-16 已提交
113 114 115 116 117
		qc.cluster, initError = newQueryNodeCluster(qc.loopCtx, qc.meta, qc.kvClient, qc.newNodeFn, qc.session)
		if initError != nil {
			log.Error("query coordinator init cluster failed", zap.Error(initError))
			return
		}
118

X
xige-16 已提交
119 120 121 122 123
		qc.scheduler, initError = NewTaskScheduler(qc.loopCtx, qc.meta, qc.cluster, qc.kvClient, qc.rootCoordClient, qc.dataCoordClient)
		if initError != nil {
			log.Error("query coordinator init task scheduler failed", zap.Error(initError))
			return
		}
124

X
xige-16 已提交
125 126
		qc.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
	})
127

X
xige-16 已提交
128
	return initError
X
xige-16 已提交
129 130
}

X
xige-16 已提交
131
// Start function starts the goroutines to watch the meta and node updates
132 133
func (qc *QueryCoord) Start() error {
	qc.scheduler.Start()
134
	log.Debug("start scheduler ...")
135
	qc.UpdateStateCode(internalpb.StateCode_Healthy)
136

137 138
	qc.loopWg.Add(1)
	go qc.watchNodeLoop()
139

140 141
	qc.loopWg.Add(1)
	go qc.watchMetaLoop()
142

C
congqixia 已提交
143 144 145 146
	go qc.session.LivenessCheck(qc.loopCtx, qc.liveCh, func() {
		qc.Stop()
	})

X
xige-16 已提交
147
	return nil
X
xige-16 已提交
148 149
}

X
xige-16 已提交
150
// Stop function stops watching the meta and node updates
151 152
func (qc *QueryCoord) Stop() error {
	qc.scheduler.Close()
153
	log.Debug("close scheduler ...")
154 155
	qc.loopCancel()
	qc.UpdateStateCode(internalpb.StateCode_Abnormal)
156

157
	qc.loopWg.Wait()
X
xige-16 已提交
158
	return nil
X
xige-16 已提交
159 160
}

161
// UpdateStateCode updates the status of the coord, including healthy, unhealthy
162 163
func (qc *QueryCoord) UpdateStateCode(code internalpb.StateCode) {
	qc.stateCode.Store(code)
164 165
}

166
// NewQueryCoord creates a QueryCoord object.
167
func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord, error) {
S
sunby 已提交
168
	rand.Seed(time.Now().UnixNano())
169
	queryChannels := make([]*queryChannelInfo, 0)
Z
zhenshan.cao 已提交
170 171 172 173 174 175 176 177 178 179 180
	channelID := len(queryChannels)
	searchPrefix := Params.SearchChannelPrefix
	searchResultPrefix := Params.SearchResultChannelPrefix
	allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
	allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10)

	queryChannels = append(queryChannels, &queryChannelInfo{
		requestChannel:  allocatedQueryChannel,
		responseChannel: allocatedQueryResultChannel,
	})

X
xige-16 已提交
181
	ctx1, cancel := context.WithCancel(ctx)
182
	service := &QueryCoord{
183 184 185
		loopCtx:    ctx1,
		loopCancel: cancel,
		msFactory:  factory,
X
xige-16 已提交
186
		newNodeFn:  newQueryNode,
X
xige-16 已提交
187
	}
X
XuanYang-cn 已提交
188

G
godchen 已提交
189
	service.UpdateStateCode(internalpb.StateCode_Abnormal)
190
	log.Debug("query coordinator", zap.Any("queryChannels", queryChannels))
X
xige-16 已提交
191
	return service, nil
192
}
X
xige-16 已提交
193

194
// SetRootCoord sets root coordinator's client
195 196
func (qc *QueryCoord) SetRootCoord(rootCoord types.RootCoord) {
	qc.rootCoordClient = rootCoord
X
xige-16 已提交
197 198
}

199
// SetDataCoord sets data coordinator's client
200 201
func (qc *QueryCoord) SetDataCoord(dataCoord types.DataCoord) {
	qc.dataCoordClient = dataCoord
X
xige-16 已提交
202
}
203

204 205
func (qc *QueryCoord) watchNodeLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)
206
	defer cancel()
207 208
	defer qc.loopWg.Done()
	log.Debug("query coordinator start watch node loop")
209

210 211 212 213 214 215 216 217 218 219 220 221
	offlineNodes, err := qc.cluster.offlineNodes()
	if err == nil {
		offlineNodeIDs := make([]int64, 0)
		for id := range offlineNodes {
			offlineNodeIDs = append(offlineNodeIDs, id)
		}
		loadBalanceSegment := &querypb.LoadBalanceRequest{
			Base: &commonpb.MsgBase{
				MsgType:  commonpb.MsgType_LoadBalanceSegments,
				SourceID: qc.session.ServerID,
			},
			SourceNodeIDs: offlineNodeIDs,
222 223
		}

224 225 226 227 228 229 230 231 232 233 234
		loadBalanceTask := &LoadBalanceTask{
			BaseTask: BaseTask{
				ctx:              qc.loopCtx,
				Condition:        NewTaskCondition(qc.loopCtx),
				triggerCondition: querypb.TriggerCondition_nodeDown,
			},
			LoadBalanceRequest: loadBalanceSegment,
			rootCoord:          qc.rootCoordClient,
			dataCoord:          qc.dataCoordClient,
			cluster:            qc.cluster,
			meta:               qc.meta,
235
		}
236 237
		qc.scheduler.Enqueue([]task{loadBalanceTask})
		log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask))
238 239
	}

240
	qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.getSessionVersion()+1)
241 242 243 244
	for {
		select {
		case <-ctx.Done():
			return
245
		case event := <-qc.eventChan:
246 247 248
			switch event.EventType {
			case sessionutil.SessionAddEvent:
				serverID := event.Session.ServerID
249
				log.Debug("start add a queryNode to cluster", zap.Any("nodeID", serverID))
250
				err := qc.cluster.registerNode(ctx, event.Session, serverID, disConnect)
251 252 253
				if err != nil {
					log.Error("query node failed to register", zap.Int64("nodeID", serverID), zap.String("error info", err.Error()))
				}
254
				qc.metricsCacheManager.InvalidateSystemInfoMetrics()
255 256
			case sessionutil.SessionDelEvent:
				serverID := event.Session.ServerID
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
				log.Debug("get a del event after queryNode down", zap.Int64("nodeID", serverID))
				_, err := qc.cluster.getNodeByID(serverID)
				if err != nil {
					log.Error("queryNode not exist", zap.Int64("nodeID", serverID))
					continue
				}

				qc.cluster.stopNode(serverID)
				loadBalanceSegment := &querypb.LoadBalanceRequest{
					Base: &commonpb.MsgBase{
						MsgType:  commonpb.MsgType_LoadBalanceSegments,
						SourceID: qc.session.ServerID,
					},
					SourceNodeIDs: []int64{serverID},
					BalanceReason: querypb.TriggerCondition_nodeDown,
				}

				loadBalanceTask := &LoadBalanceTask{
					BaseTask: BaseTask{
						ctx:              qc.loopCtx,
						Condition:        NewTaskCondition(qc.loopCtx),
						triggerCondition: querypb.TriggerCondition_nodeDown,
					},
					LoadBalanceRequest: loadBalanceSegment,
					rootCoord:          qc.rootCoordClient,
					dataCoord:          qc.dataCoordClient,
					cluster:            qc.cluster,
					meta:               qc.meta,
285
				}
286
				qc.scheduler.Enqueue([]task{loadBalanceTask})
287
				log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask))
288
				qc.metricsCacheManager.InvalidateSystemInfoMetrics()
289 290 291 292 293
			}
		}
	}
}

294 295
func (qc *QueryCoord) watchMetaLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)
296 297

	defer cancel()
298
	defer qc.loopWg.Done()
299
	log.Debug("query coordinator start watch MetaReplica loop")
300

301
	watchChan := qc.kvClient.WatchWithPrefix("queryNode-segmentMeta")
302 303 304 305 306 307

	for {
		select {
		case <-ctx.Done():
			return
		case resp := <-watchChan:
308
			log.Debug("segment MetaReplica updated.")
309 310 311
			for _, event := range resp.Events {
				segmentID, err := strconv.ParseInt(filepath.Base(string(event.Kv.Key)), 10, 64)
				if err != nil {
312
					log.Error("watch MetaReplica loop error when get segmentID", zap.Any("error", err.Error()))
313 314
				}
				segmentInfo := &querypb.SegmentInfo{}
315
				err = proto.Unmarshal(event.Kv.Value, segmentInfo)
316
				if err != nil {
317
					log.Error("watch MetaReplica loop error when unmarshal", zap.Any("error", err.Error()))
318 319 320 321
				}
				switch event.Type {
				case mvccpb.PUT:
					//TODO::
322
					qc.meta.setSegmentInfo(segmentID, segmentInfo)
323 324 325 326 327 328 329 330
				case mvccpb.DELETE:
					//TODO::
				}
			}
		}
	}

}