cluster.go 8.3 KB
Newer Older
S
sunby 已提交
1 2 3 4 5 6 7 8 9 10
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
11
package datacoord
S
sunby 已提交
12 13

import (
S
sunby 已提交
14
	"fmt"
S
sunby 已提交
15 16
	"sync"

X
Xiangyu Wang 已提交
17 18 19
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
20 21
	"go.uber.org/zap"
	"golang.org/x/net/context"
S
sunby 已提交
22 23
)

24 25 26 27 28
type cluster struct {
	mu             sync.RWMutex
	ctx            context.Context
	dataManager    *clusterNodeManager
	sessionManager sessionManager
29
	posProvider    positionProvider
30 31 32 33

	startupPolicy    clusterStartupPolicy
	registerPolicy   dataNodeRegisterPolicy
	unregisterPolicy dataNodeUnregisterPolicy
S
sunby 已提交
34
	assignPolicy     channelAssignPolicy
N
neza2017 已提交
35
}
36 37 38

type clusterOption struct {
	apply func(c *cluster)
N
neza2017 已提交
39
}
S
sunby 已提交
40

41 42 43 44
func withStartupPolicy(p clusterStartupPolicy) clusterOption {
	return clusterOption{
		apply: func(c *cluster) { c.startupPolicy = p },
	}
S
sunby 已提交
45 46
}

47 48 49
func withRegisterPolicy(p dataNodeRegisterPolicy) clusterOption {
	return clusterOption{
		apply: func(c *cluster) { c.registerPolicy = p },
S
sunby 已提交
50 51 52
	}
}

53 54 55
func withUnregistorPolicy(p dataNodeUnregisterPolicy) clusterOption {
	return clusterOption{
		apply: func(c *cluster) { c.unregisterPolicy = p },
S
sunby 已提交
56 57 58
	}
}

59 60
func withAssignPolicy(p channelAssignPolicy) clusterOption {
	return clusterOption{
S
sunby 已提交
61
		apply: func(c *cluster) { c.assignPolicy = p },
S
sunby 已提交
62 63 64
	}
}

65
func defaultStartupPolicy() clusterStartupPolicy {
S
sunby 已提交
66
	return newWatchRestartsStartupPolicy()
S
sunby 已提交
67 68
}

69
func defaultRegisterPolicy() dataNodeRegisterPolicy {
70
	return newAssiggBufferRegisterPolicy()
71 72 73
}

func defaultUnregisterPolicy() dataNodeUnregisterPolicy {
C
congqixia 已提交
74
	return randomAssignRegisterFunc
75 76 77
}

func defaultAssignPolicy() channelAssignPolicy {
S
sunby 已提交
78
	return newBalancedAssignPolicy()
79 80
}

S
sunby 已提交
81 82 83
func newCluster(ctx context.Context, dataManager *clusterNodeManager,
	sessionManager sessionManager, posProvider positionProvider,
	opts ...clusterOption) *cluster {
84 85 86 87
	c := &cluster{
		ctx:              ctx,
		sessionManager:   sessionManager,
		dataManager:      dataManager,
88
		posProvider:      posProvider,
89 90 91
		startupPolicy:    defaultStartupPolicy(),
		registerPolicy:   defaultRegisterPolicy(),
		unregisterPolicy: defaultUnregisterPolicy(),
S
sunby 已提交
92
		assignPolicy:     defaultAssignPolicy(),
S
sunby 已提交
93
	}
94 95
	for _, opt := range opts {
		opt.apply(c)
S
sunby 已提交
96
	}
97 98 99 100 101 102

	return c
}

func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error {
	deltaChange := c.dataManager.updateCluster(dataNodes)
103 104 105 106
	nodes, chanBuffer := c.dataManager.getDataNodes(false)
	var rets []*datapb.DataNodeInfo
	rets, chanBuffer = c.startupPolicy.apply(nodes, deltaChange, chanBuffer)
	c.dataManager.updateDataNodes(rets, chanBuffer)
107
	rets = c.watch(rets)
108
	c.dataManager.updateDataNodes(rets, chanBuffer)
109 110 111 112 113
	return nil
}

func (c *cluster) watch(nodes []*datapb.DataNodeInfo) []*datapb.DataNodeInfo {
	for _, n := range nodes {
S
sunby 已提交
114 115
		logMsg := fmt.Sprintf("Begin to watch channels for node %s:", n.Address)
		uncompletes := make([]vchannel, 0, len(n.Channels))
116 117
		for _, ch := range n.Channels {
			if ch.State == datapb.ChannelWatchState_Uncomplete {
S
sunby 已提交
118 119 120 121 122
				if len(uncompletes) == 0 {
					logMsg += ch.Name
				} else {
					logMsg += "," + ch.Name
				}
123 124 125 126
				uncompletes = append(uncompletes, vchannel{
					CollectionID: ch.CollectionID,
					DmlChannel:   ch.Name,
				})
127 128
			}
		}
S
sunby 已提交
129 130 131 132 133 134

		if len(uncompletes) == 0 {
			continue
		}
		log.Debug(logMsg)

S
sunby 已提交
135
		vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true)
136 137 138 139
		if err != nil {
			log.Warn("get vchannel position failed", zap.Error(err))
			continue
		}
140 141 142 143 144 145
		cli, err := c.sessionManager.getOrCreateSession(n.Address)
		if err != nil {
			log.Warn("get session failed", zap.String("addr", n.Address), zap.Error(err))
			continue
		}
		req := &datapb.WatchDmChannelsRequest{
S
sunby 已提交
146
			Base: &commonpb.MsgBase{
147
				SourceID: Params.NodeID,
S
sunby 已提交
148
			},
149
			Vchannels: vchanInfos,
150 151 152 153 154 155 156 157
		}
		resp, err := cli.WatchDmChannels(c.ctx, req)
		if err != nil {
			log.Warn("watch dm channel failed", zap.String("addr", n.Address), zap.Error(err))
			continue
		}
		if resp.ErrorCode != commonpb.ErrorCode_Success {
			log.Warn("watch channels failed", zap.String("address", n.Address), zap.Error(err))
S
sunby 已提交
158 159
			continue
		}
160 161 162 163 164
		for _, ch := range n.Channels {
			if ch.State == datapb.ChannelWatchState_Uncomplete {
				ch.State = datapb.ChannelWatchState_Complete
			}
		}
S
sunby 已提交
165
	}
166
	return nodes
S
sunby 已提交
167 168
}

169 170 171 172
func (c *cluster) register(n *datapb.DataNodeInfo) {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.dataManager.register(n)
173 174
	cNodes, chanBuffer := c.dataManager.getDataNodes(true)
	var rets []*datapb.DataNodeInfo
175
	log.Debug("before register policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer))
176
	rets, chanBuffer = c.registerPolicy.apply(cNodes, n, chanBuffer)
177
	log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer))
178
	c.dataManager.updateDataNodes(rets, chanBuffer)
179
	rets = c.watch(rets)
180
	c.dataManager.updateDataNodes(rets, chanBuffer)
181 182 183 184 185
}

func (c *cluster) unregister(n *datapb.DataNodeInfo) {
	c.mu.Lock()
	defer c.mu.Unlock()
186

S
sunby 已提交
187
	c.sessionManager.releaseSession(n.Address)
188 189 190 191
	oldNode := c.dataManager.unregister(n)
	if oldNode != nil {
		n = oldNode
	}
192
	cNodes, chanBuffer := c.dataManager.getDataNodes(true)
193
	log.Debug("before unregister policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer))
194 195 196 197 198 199 200 201 202
	var rets []*datapb.DataNodeInfo
	if len(cNodes) == 0 {
		for _, chStat := range n.Channels {
			chStat.State = datapb.ChannelWatchState_Uncomplete
			chanBuffer = append(chanBuffer, chStat)
		}
	} else {
		rets = c.unregisterPolicy.apply(cNodes, n)
	}
203
	log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer))
204
	c.dataManager.updateDataNodes(rets, chanBuffer)
205
	rets = c.watch(rets)
206
	c.dataManager.updateDataNodes(rets, chanBuffer)
207 208
}

209
func (c *cluster) watchIfNeeded(channel string, collectionID UniqueID) {
210 211
	c.mu.Lock()
	defer c.mu.Unlock()
212 213 214 215 216 217 218 219 220 221 222 223
	cNodes, chanBuffer := c.dataManager.getDataNodes(true)
	var rets []*datapb.DataNodeInfo
	if len(cNodes) == 0 { // no nodes to assign, put into buffer
		chanBuffer = append(chanBuffer, &datapb.ChannelStatus{
			Name:         channel,
			CollectionID: collectionID,
			State:        datapb.ChannelWatchState_Uncomplete,
		})
	} else {
		rets = c.assignPolicy.apply(cNodes, channel, collectionID)
	}
	c.dataManager.updateDataNodes(rets, chanBuffer)
224
	rets = c.watch(rets)
225
	c.dataManager.updateDataNodes(rets, chanBuffer)
226 227 228 229 230 231 232 233 234 235 236
}

func (c *cluster) flush(segments []*datapb.SegmentInfo) {
	c.mu.Lock()
	defer c.mu.Unlock()

	m := make(map[string]map[UniqueID][]UniqueID) // channel-> map[collectionID]segmentIDs

	for _, seg := range segments {
		if _, ok := m[seg.InsertChannel]; !ok {
			m[seg.InsertChannel] = make(map[UniqueID][]UniqueID)
S
sunby 已提交
237
		}
238 239

		m[seg.InsertChannel][seg.CollectionID] = append(m[seg.InsertChannel][seg.CollectionID], seg.ID)
S
sunby 已提交
240 241
	}

242
	dataNodes, _ := c.dataManager.getDataNodes(true)
243 244 245 246 247

	channel2Node := make(map[string]string)
	for _, node := range dataNodes {
		for _, chstatus := range node.Channels {
			channel2Node[chstatus.Name] = node.Address
S
sunby 已提交
248 249
		}
	}
S
sunby 已提交
250

251 252 253 254 255 256 257 258
	for ch, coll2seg := range m {
		node, ok := channel2Node[ch]
		if !ok {
			continue
		}
		cli, err := c.sessionManager.getOrCreateSession(node)
		if err != nil {
			log.Warn("get session failed", zap.String("addr", node), zap.Error(err))
S
sunby 已提交
259 260
			continue
		}
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
		for coll, segs := range coll2seg {
			req := &datapb.FlushSegmentsRequest{
				Base: &commonpb.MsgBase{
					MsgType:  commonpb.MsgType_Flush,
					SourceID: Params.NodeID,
				},
				CollectionID: coll,
				SegmentIDs:   segs,
			}
			resp, err := cli.FlushSegments(c.ctx, req)
			if err != nil {
				log.Warn("flush segment failed", zap.String("addr", node), zap.Error(err))
				continue
			}
			if resp.ErrorCode != commonpb.ErrorCode_Success {
				log.Warn("flush segment failed", zap.String("dataNode", node), zap.Error(err))
				continue
			}
			log.Debug("flush segments succeed", zap.Any("segmentIDs", segs))
		}
S
sunby 已提交
281 282
	}
}
S
sunby 已提交
283

284 285 286 287
func (c *cluster) releaseSessions() {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.sessionManager.release()
S
sunby 已提交
288
}