query_coord.go 9.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package querycoord
13

X
xige-16 已提交
14
import (
X
xige-16 已提交
15
	"context"
S
sunby 已提交
16
	"math/rand"
17
	"path/filepath"
Z
zhenshan.cao 已提交
18
	"strconv"
19
	"sync"
X
xige-16 已提交
20
	"sync/atomic"
X
xige-16 已提交
21
	"time"
X
xige-16 已提交
22

23 24 25
	"github.com/coreos/etcd/mvcc/mvccpb"
	"github.com/golang/protobuf/proto"
	"go.etcd.io/etcd/clientv3"
26 27
	"go.uber.org/zap"

28
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
X
Xiangyu Wang 已提交
29 30
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
31
	"github.com/milvus-io/milvus/internal/proto/commonpb"
X
Xiangyu Wang 已提交
32
	"github.com/milvus-io/milvus/internal/proto/internalpb"
33
	"github.com/milvus-io/milvus/internal/proto/querypb"
X
Xiangyu Wang 已提交
34
	"github.com/milvus-io/milvus/internal/types"
35
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
36
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
37
	"github.com/milvus-io/milvus/internal/util/typeutil"
X
xige-16 已提交
38
)
39

40 41
type Timestamp = typeutil.Timestamp

42 43 44 45 46
type queryChannelInfo struct {
	requestChannel  string
	responseChannel string
}

47
type QueryCoord struct {
X
xige-16 已提交
48 49
	loopCtx    context.Context
	loopCancel context.CancelFunc
50 51
	loopWg     sync.WaitGroup
	kvClient   *etcdkv.EtcdKV
X
xige-16 已提交
52

53
	queryCoordID uint64
54
	meta         Meta
55 56
	cluster      *queryNodeCluster
	scheduler    *TaskScheduler
X
xige-16 已提交
57

58 59
	dataCoordClient types.DataCoord
	rootCoordClient types.RootCoord
X
xige-16 已提交
60

61 62
	session   *sessionutil.Session
	eventChan <-chan *sessionutil.SessionEvent
G
godchen 已提交
63

X
xige-16 已提交
64 65 66
	stateCode  atomic.Value
	isInit     atomic.Value
	enableGrpc bool
G
groot 已提交
67 68

	msFactory msgstream.Factory
X
xige-16 已提交
69 70
}

71
// Register register query service at etcd
72
func (qc *QueryCoord) Register() error {
73
	log.Debug("query coord session info", zap.String("metaPath", Params.MetaRootPath), zap.Strings("etcdEndPoints", Params.EtcdEndpoints), zap.String("address", Params.Address))
74 75 76
	qc.session = sessionutil.NewSession(qc.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
	qc.session.Init(typeutil.QueryCoordRole, Params.Address, true)
	Params.NodeID = uint64(qc.session.ServerID)
77 78 79
	return nil
}

80
func (qc *QueryCoord) Init() error {
81 82 83 84 85 86
	connectEtcdFn := func() error {
		etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
		if err != nil {
			return err
		}
		etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
87
		qc.kvClient = etcdKV
88 89 90 91
		metaKV, err := newMeta(etcdKV)
		if err != nil {
			return err
		}
92 93
		qc.meta = metaKV
		qc.cluster, err = newQueryNodeCluster(metaKV, etcdKV)
94 95 96
		if err != nil {
			return err
		}
Z
zhenshan.cao 已提交
97

98
		qc.scheduler, err = NewTaskScheduler(qc.loopCtx, metaKV, qc.cluster, etcdKV, qc.rootCoordClient, qc.dataCoordClient)
99 100
		return err
	}
101
	log.Debug("query coordinator try to connect etcd")
G
godchen 已提交
102
	err := retry.Do(qc.loopCtx, connectEtcdFn, retry.Attempts(300))
103
	if err != nil {
104
		log.Debug("query coordinator try to connect etcd failed", zap.Error(err))
105 106
		return err
	}
107
	log.Debug("query coordinator try to connect etcd success")
X
xige-16 已提交
108 109 110
	return nil
}

111 112
func (qc *QueryCoord) Start() error {
	qc.scheduler.Start()
113
	log.Debug("start scheduler ...")
114
	qc.UpdateStateCode(internalpb.StateCode_Healthy)
115

116 117
	qc.loopWg.Add(1)
	go qc.watchNodeLoop()
118

119 120
	qc.loopWg.Add(1)
	go qc.watchMetaLoop()
121

X
xige-16 已提交
122
	return nil
X
xige-16 已提交
123 124
}

125 126
func (qc *QueryCoord) Stop() error {
	qc.scheduler.Close()
127
	log.Debug("close scheduler ...")
128 129
	qc.loopCancel()
	qc.UpdateStateCode(internalpb.StateCode_Abnormal)
130

131
	qc.loopWg.Wait()
X
xige-16 已提交
132
	return nil
X
xige-16 已提交
133 134
}

135 136
func (qc *QueryCoord) UpdateStateCode(code internalpb.StateCode) {
	qc.stateCode.Store(code)
137 138
}

139
func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord, error) {
S
sunby 已提交
140
	rand.Seed(time.Now().UnixNano())
141
	queryChannels := make([]*queryChannelInfo, 0)
Z
zhenshan.cao 已提交
142 143 144 145 146 147 148 149 150 151 152
	channelID := len(queryChannels)
	searchPrefix := Params.SearchChannelPrefix
	searchResultPrefix := Params.SearchResultChannelPrefix
	allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
	allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10)

	queryChannels = append(queryChannels, &queryChannelInfo{
		requestChannel:  allocatedQueryChannel,
		responseChannel: allocatedQueryResultChannel,
	})

X
xige-16 已提交
153
	ctx1, cancel := context.WithCancel(ctx)
154
	service := &QueryCoord{
155 156 157
		loopCtx:    ctx1,
		loopCancel: cancel,
		msFactory:  factory,
X
xige-16 已提交
158
	}
X
XuanYang-cn 已提交
159

G
godchen 已提交
160
	service.UpdateStateCode(internalpb.StateCode_Abnormal)
161
	log.Debug("query coordinator", zap.Any("queryChannels", queryChannels))
X
xige-16 已提交
162
	return service, nil
163
}
X
xige-16 已提交
164

165 166
func (qc *QueryCoord) SetRootCoord(rootCoord types.RootCoord) {
	qc.rootCoordClient = rootCoord
X
xige-16 已提交
167 168
}

169 170
func (qc *QueryCoord) SetDataCoord(dataCoord types.DataCoord) {
	qc.dataCoordClient = dataCoord
X
xige-16 已提交
171
}
172

173 174
func (qc *QueryCoord) watchNodeLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)
175
	defer cancel()
176 177
	defer qc.loopWg.Done()
	log.Debug("query coordinator start watch node loop")
178

179
	clusterStartSession, version, _ := qc.session.GetSessions(typeutil.QueryNodeRole)
180 181 182 183 184 185
	sessionMap := make(map[int64]*sessionutil.Session)
	for _, session := range clusterStartSession {
		nodeID := session.ServerID
		sessionMap[nodeID] = session
	}
	for nodeID, session := range sessionMap {
186
		if _, ok := qc.cluster.nodes[nodeID]; !ok {
187
			serverID := session.ServerID
188
			log.Debug("start add a queryNode to cluster", zap.Any("nodeID", serverID))
189
			err := qc.cluster.registerNode(ctx, session, serverID)
190 191 192
			if err != nil {
				log.Error("query node failed to register", zap.Int64("nodeID", serverID), zap.String("error info", err.Error()))
			}
193 194
		}
	}
195
	for nodeID := range qc.cluster.nodes {
196
		if _, ok := sessionMap[nodeID]; !ok {
197
			qc.cluster.stopNode(nodeID)
198 199 200
			loadBalanceSegment := &querypb.LoadBalanceRequest{
				Base: &commonpb.MsgBase{
					MsgType:  commonpb.MsgType_LoadBalanceSegments,
201
					SourceID: qc.session.ServerID,
202 203 204 205 206 207
				},
				SourceNodeIDs: []int64{nodeID},
			}

			loadBalanceTask := &LoadBalanceTask{
				BaseTask: BaseTask{
208 209
					ctx:              qc.loopCtx,
					Condition:        NewTaskCondition(qc.loopCtx),
210 211 212
					triggerCondition: querypb.TriggerCondition_nodeDown,
				},
				LoadBalanceRequest: loadBalanceSegment,
213 214 215 216
				rootCoord:          qc.rootCoordClient,
				dataCoord:          qc.dataCoordClient,
				cluster:            qc.cluster,
				meta:               qc.meta,
217
			}
218
			qc.scheduler.Enqueue([]task{loadBalanceTask})
219 220 221
		}
	}

222
	qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, version+1)
223 224 225 226
	for {
		select {
		case <-ctx.Done():
			return
227
		case event := <-qc.eventChan:
228 229 230
			switch event.EventType {
			case sessionutil.SessionAddEvent:
				serverID := event.Session.ServerID
231
				log.Debug("start add a queryNode to cluster", zap.Any("nodeID", serverID))
232
				err := qc.cluster.registerNode(ctx, event.Session, serverID)
233 234 235
				if err != nil {
					log.Error("query node failed to register", zap.Int64("nodeID", serverID), zap.String("error info", err.Error()))
				}
236 237
			case sessionutil.SessionDelEvent:
				serverID := event.Session.ServerID
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
				log.Debug("get a del event after queryNode down", zap.Int64("nodeID", serverID))
				_, err := qc.cluster.getNodeByID(serverID)
				if err != nil {
					log.Error("queryNode not exist", zap.Int64("nodeID", serverID))
					continue
				}

				qc.cluster.stopNode(serverID)
				loadBalanceSegment := &querypb.LoadBalanceRequest{
					Base: &commonpb.MsgBase{
						MsgType:  commonpb.MsgType_LoadBalanceSegments,
						SourceID: qc.session.ServerID,
					},
					SourceNodeIDs: []int64{serverID},
					BalanceReason: querypb.TriggerCondition_nodeDown,
				}

				loadBalanceTask := &LoadBalanceTask{
					BaseTask: BaseTask{
						ctx:              qc.loopCtx,
						Condition:        NewTaskCondition(qc.loopCtx),
						triggerCondition: querypb.TriggerCondition_nodeDown,
					},
					LoadBalanceRequest: loadBalanceSegment,
					rootCoord:          qc.rootCoordClient,
					dataCoord:          qc.dataCoordClient,
					cluster:            qc.cluster,
					meta:               qc.meta,
266
				}
267
				qc.scheduler.Enqueue([]task{loadBalanceTask})
268 269 270 271 272
			}
		}
	}
}

273 274
func (qc *QueryCoord) watchMetaLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)
275 276

	defer cancel()
277
	defer qc.loopWg.Done()
278
	log.Debug("query coordinator start watch MetaReplica loop")
279

280
	watchChan := qc.kvClient.WatchWithPrefix("queryNode-segmentMeta")
281 282 283 284 285 286

	for {
		select {
		case <-ctx.Done():
			return
		case resp := <-watchChan:
287
			log.Debug("segment MetaReplica updated.")
288 289 290
			for _, event := range resp.Events {
				segmentID, err := strconv.ParseInt(filepath.Base(string(event.Kv.Key)), 10, 64)
				if err != nil {
291
					log.Error("watch MetaReplica loop error when get segmentID", zap.Any("error", err.Error()))
292 293 294 295
				}
				segmentInfo := &querypb.SegmentInfo{}
				err = proto.UnmarshalText(string(event.Kv.Value), segmentInfo)
				if err != nil {
296
					log.Error("watch MetaReplica loop error when unmarshal", zap.Any("error", err.Error()))
297 298 299 300
				}
				switch event.Type {
				case mvccpb.PUT:
					//TODO::
301
					qc.meta.setSegmentInfo(segmentID, segmentInfo)
302 303 304 305 306 307 308 309
				case mvccpb.DELETE:
					//TODO::
				}
			}
		}
	}

}