query_coord.go 9.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package querycoord
13

X
xige-16 已提交
14
import (
X
xige-16 已提交
15
	"context"
S
sunby 已提交
16
	"math/rand"
17
	"path/filepath"
Z
zhenshan.cao 已提交
18
	"strconv"
19
	"sync"
X
xige-16 已提交
20
	"sync/atomic"
X
xige-16 已提交
21
	"time"
X
xige-16 已提交
22

23
	"github.com/golang/protobuf/proto"
24
	"go.etcd.io/etcd/api/v3/mvccpb"
25 26
	"go.uber.org/zap"

27
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
X
Xiangyu Wang 已提交
28 29
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
30
	"github.com/milvus-io/milvus/internal/proto/commonpb"
X
Xiangyu Wang 已提交
31
	"github.com/milvus-io/milvus/internal/proto/internalpb"
32
	"github.com/milvus-io/milvus/internal/proto/querypb"
X
Xiangyu Wang 已提交
33
	"github.com/milvus-io/milvus/internal/types"
34
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
35
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
36
	"github.com/milvus-io/milvus/internal/util/typeutil"
X
xige-16 已提交
37
)
38

39 40
type Timestamp = typeutil.Timestamp

41 42 43 44 45
type queryChannelInfo struct {
	requestChannel  string
	responseChannel string
}

46
type QueryCoord struct {
X
xige-16 已提交
47 48
	loopCtx    context.Context
	loopCancel context.CancelFunc
49 50
	loopWg     sync.WaitGroup
	kvClient   *etcdkv.EtcdKV
X
xige-16 已提交
51

52
	queryCoordID uint64
53
	meta         Meta
54 55
	cluster      *queryNodeCluster
	scheduler    *TaskScheduler
X
xige-16 已提交
56

57 58
	dataCoordClient types.DataCoord
	rootCoordClient types.RootCoord
X
xige-16 已提交
59

60 61
	session   *sessionutil.Session
	eventChan <-chan *sessionutil.SessionEvent
G
godchen 已提交
62

X
xige-16 已提交
63 64 65
	stateCode  atomic.Value
	isInit     atomic.Value
	enableGrpc bool
G
groot 已提交
66 67

	msFactory msgstream.Factory
X
xige-16 已提交
68 69
}

70
// Register register query service at etcd
71
func (qc *QueryCoord) Register() error {
72
	log.Debug("query coord session info", zap.String("metaPath", Params.MetaRootPath), zap.Strings("etcdEndPoints", Params.EtcdEndpoints), zap.String("address", Params.Address))
73 74 75
	qc.session = sessionutil.NewSession(qc.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
	qc.session.Init(typeutil.QueryCoordRole, Params.Address, true)
	Params.NodeID = uint64(qc.session.ServerID)
76 77 78
	return nil
}

79
func (qc *QueryCoord) Init() error {
80
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
81
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
82 83 84
		if err != nil {
			return err
		}
85
		qc.kvClient = etcdKV
86
		return nil
87
	}
88
	log.Debug("query coordinator try to connect etcd")
G
godchen 已提交
89
	err := retry.Do(qc.loopCtx, connectEtcdFn, retry.Attempts(300))
90
	if err != nil {
91
		log.Debug("query coordinator try to connect etcd failed", zap.Error(err))
92 93
		return err
	}
94
	log.Debug("query coordinator try to connect etcd success")
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
	qc.meta, err = newMeta(qc.kvClient)
	if err != nil {
		log.Error("query coordinator init meta failed", zap.Error(err))
		return err
	}

	qc.cluster, err = newQueryNodeCluster(qc.meta, qc.kvClient)
	if err != nil {
		log.Error("query coordinator init cluster failed", zap.Error(err))
		return err
	}

	qc.scheduler, err = NewTaskScheduler(qc.loopCtx, qc.meta, qc.cluster, qc.kvClient, qc.rootCoordClient, qc.dataCoordClient)
	if err != nil {
		log.Error("query coordinator init task scheduler failed", zap.Error(err))
		return err
	}

X
xige-16 已提交
113 114 115
	return nil
}

116 117
func (qc *QueryCoord) Start() error {
	qc.scheduler.Start()
118
	log.Debug("start scheduler ...")
119
	qc.UpdateStateCode(internalpb.StateCode_Healthy)
120

121 122
	qc.loopWg.Add(1)
	go qc.watchNodeLoop()
123

124 125
	qc.loopWg.Add(1)
	go qc.watchMetaLoop()
126

X
xige-16 已提交
127
	return nil
X
xige-16 已提交
128 129
}

130 131
func (qc *QueryCoord) Stop() error {
	qc.scheduler.Close()
132
	log.Debug("close scheduler ...")
133 134
	qc.loopCancel()
	qc.UpdateStateCode(internalpb.StateCode_Abnormal)
135

136
	qc.loopWg.Wait()
X
xige-16 已提交
137
	return nil
X
xige-16 已提交
138 139
}

140 141
func (qc *QueryCoord) UpdateStateCode(code internalpb.StateCode) {
	qc.stateCode.Store(code)
142 143
}

144
func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord, error) {
S
sunby 已提交
145
	rand.Seed(time.Now().UnixNano())
146
	queryChannels := make([]*queryChannelInfo, 0)
Z
zhenshan.cao 已提交
147 148 149 150 151 152 153 154 155 156 157
	channelID := len(queryChannels)
	searchPrefix := Params.SearchChannelPrefix
	searchResultPrefix := Params.SearchResultChannelPrefix
	allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
	allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10)

	queryChannels = append(queryChannels, &queryChannelInfo{
		requestChannel:  allocatedQueryChannel,
		responseChannel: allocatedQueryResultChannel,
	})

X
xige-16 已提交
158
	ctx1, cancel := context.WithCancel(ctx)
159
	service := &QueryCoord{
160 161 162
		loopCtx:    ctx1,
		loopCancel: cancel,
		msFactory:  factory,
X
xige-16 已提交
163
	}
X
XuanYang-cn 已提交
164

G
godchen 已提交
165
	service.UpdateStateCode(internalpb.StateCode_Abnormal)
166
	log.Debug("query coordinator", zap.Any("queryChannels", queryChannels))
X
xige-16 已提交
167
	return service, nil
168
}
X
xige-16 已提交
169

170 171
func (qc *QueryCoord) SetRootCoord(rootCoord types.RootCoord) {
	qc.rootCoordClient = rootCoord
X
xige-16 已提交
172 173
}

174 175
func (qc *QueryCoord) SetDataCoord(dataCoord types.DataCoord) {
	qc.dataCoordClient = dataCoord
X
xige-16 已提交
176
}
177

178 179
func (qc *QueryCoord) watchNodeLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)
180
	defer cancel()
181 182
	defer qc.loopWg.Done()
	log.Debug("query coordinator start watch node loop")
183

184
	clusterStartSession, version, _ := qc.session.GetSessions(typeutil.QueryNodeRole)
185 186 187 188 189 190
	sessionMap := make(map[int64]*sessionutil.Session)
	for _, session := range clusterStartSession {
		nodeID := session.ServerID
		sessionMap[nodeID] = session
	}
	for nodeID, session := range sessionMap {
191
		if _, ok := qc.cluster.nodes[nodeID]; !ok {
192
			serverID := session.ServerID
193
			log.Debug("start add a queryNode to cluster", zap.Any("nodeID", serverID))
194
			err := qc.cluster.registerNode(ctx, session, serverID)
195 196 197
			if err != nil {
				log.Error("query node failed to register", zap.Int64("nodeID", serverID), zap.String("error info", err.Error()))
			}
198 199
		}
	}
200
	for nodeID := range qc.cluster.nodes {
201
		if _, ok := sessionMap[nodeID]; !ok {
202
			qc.cluster.stopNode(nodeID)
203 204 205
			loadBalanceSegment := &querypb.LoadBalanceRequest{
				Base: &commonpb.MsgBase{
					MsgType:  commonpb.MsgType_LoadBalanceSegments,
206
					SourceID: qc.session.ServerID,
207 208 209 210 211 212
				},
				SourceNodeIDs: []int64{nodeID},
			}

			loadBalanceTask := &LoadBalanceTask{
				BaseTask: BaseTask{
213 214
					ctx:              qc.loopCtx,
					Condition:        NewTaskCondition(qc.loopCtx),
215 216 217
					triggerCondition: querypb.TriggerCondition_nodeDown,
				},
				LoadBalanceRequest: loadBalanceSegment,
218 219 220 221
				rootCoord:          qc.rootCoordClient,
				dataCoord:          qc.dataCoordClient,
				cluster:            qc.cluster,
				meta:               qc.meta,
222
			}
223
			qc.scheduler.Enqueue([]task{loadBalanceTask})
224 225 226
		}
	}

227
	qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, version+1)
228 229 230 231
	for {
		select {
		case <-ctx.Done():
			return
232
		case event := <-qc.eventChan:
233 234 235
			switch event.EventType {
			case sessionutil.SessionAddEvent:
				serverID := event.Session.ServerID
236
				log.Debug("start add a queryNode to cluster", zap.Any("nodeID", serverID))
237
				err := qc.cluster.registerNode(ctx, event.Session, serverID)
238 239 240
				if err != nil {
					log.Error("query node failed to register", zap.Int64("nodeID", serverID), zap.String("error info", err.Error()))
				}
241 242
			case sessionutil.SessionDelEvent:
				serverID := event.Session.ServerID
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
				log.Debug("get a del event after queryNode down", zap.Int64("nodeID", serverID))
				_, err := qc.cluster.getNodeByID(serverID)
				if err != nil {
					log.Error("queryNode not exist", zap.Int64("nodeID", serverID))
					continue
				}

				qc.cluster.stopNode(serverID)
				loadBalanceSegment := &querypb.LoadBalanceRequest{
					Base: &commonpb.MsgBase{
						MsgType:  commonpb.MsgType_LoadBalanceSegments,
						SourceID: qc.session.ServerID,
					},
					SourceNodeIDs: []int64{serverID},
					BalanceReason: querypb.TriggerCondition_nodeDown,
				}

				loadBalanceTask := &LoadBalanceTask{
					BaseTask: BaseTask{
						ctx:              qc.loopCtx,
						Condition:        NewTaskCondition(qc.loopCtx),
						triggerCondition: querypb.TriggerCondition_nodeDown,
					},
					LoadBalanceRequest: loadBalanceSegment,
					rootCoord:          qc.rootCoordClient,
					dataCoord:          qc.dataCoordClient,
					cluster:            qc.cluster,
					meta:               qc.meta,
271
				}
272
				qc.scheduler.Enqueue([]task{loadBalanceTask})
273 274 275 276 277
			}
		}
	}
}

278 279
func (qc *QueryCoord) watchMetaLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)
280 281

	defer cancel()
282
	defer qc.loopWg.Done()
283
	log.Debug("query coordinator start watch MetaReplica loop")
284

285
	watchChan := qc.kvClient.WatchWithPrefix("queryNode-segmentMeta")
286 287 288 289 290 291

	for {
		select {
		case <-ctx.Done():
			return
		case resp := <-watchChan:
292
			log.Debug("segment MetaReplica updated.")
293 294 295
			for _, event := range resp.Events {
				segmentID, err := strconv.ParseInt(filepath.Base(string(event.Kv.Key)), 10, 64)
				if err != nil {
296
					log.Error("watch MetaReplica loop error when get segmentID", zap.Any("error", err.Error()))
297 298 299 300
				}
				segmentInfo := &querypb.SegmentInfo{}
				err = proto.UnmarshalText(string(event.Kv.Value), segmentInfo)
				if err != nil {
301
					log.Error("watch MetaReplica loop error when unmarshal", zap.Any("error", err.Error()))
302 303 304 305
				}
				switch event.Type {
				case mvccpb.PUT:
					//TODO::
306
					qc.meta.setSegmentInfo(segmentID, segmentInfo)
307 308 309 310 311 312 313 314
				case mvccpb.DELETE:
					//TODO::
				}
			}
		}
	}

}