cluster.go 8.3 KB
Newer Older
S
sunby 已提交
1 2 3 4 5 6 7 8 9 10
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
11
package datacoord
S
sunby 已提交
12 13

import (
S
sunby 已提交
14
	"fmt"
S
sunby 已提交
15 16
	"sync"

X
Xiangyu Wang 已提交
17 18 19
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
20 21
	"go.uber.org/zap"
	"golang.org/x/net/context"
S
sunby 已提交
22 23
)

24 25 26 27 28
type cluster struct {
	mu             sync.RWMutex
	ctx            context.Context
	dataManager    *clusterNodeManager
	sessionManager sessionManager
29
	posProvider    positionProvider
30 31 32 33

	startupPolicy    clusterStartupPolicy
	registerPolicy   dataNodeRegisterPolicy
	unregisterPolicy dataNodeUnregisterPolicy
S
sunby 已提交
34
	assignPolicy     channelAssignPolicy
N
neza2017 已提交
35
}
36 37 38

type clusterOption struct {
	apply func(c *cluster)
N
neza2017 已提交
39
}
S
sunby 已提交
40

41 42 43 44
func withStartupPolicy(p clusterStartupPolicy) clusterOption {
	return clusterOption{
		apply: func(c *cluster) { c.startupPolicy = p },
	}
S
sunby 已提交
45 46
}

47 48 49
func withRegisterPolicy(p dataNodeRegisterPolicy) clusterOption {
	return clusterOption{
		apply: func(c *cluster) { c.registerPolicy = p },
S
sunby 已提交
50 51 52
	}
}

53 54 55
func withUnregistorPolicy(p dataNodeUnregisterPolicy) clusterOption {
	return clusterOption{
		apply: func(c *cluster) { c.unregisterPolicy = p },
S
sunby 已提交
56 57 58
	}
}

59 60
func withAssignPolicy(p channelAssignPolicy) clusterOption {
	return clusterOption{
S
sunby 已提交
61
		apply: func(c *cluster) { c.assignPolicy = p },
S
sunby 已提交
62 63 64
	}
}

65
func defaultStartupPolicy() clusterStartupPolicy {
S
sunby 已提交
66
	return newWatchRestartsStartupPolicy()
S
sunby 已提交
67 68
}

69
func defaultRegisterPolicy() dataNodeRegisterPolicy {
70
	return newAssiggBufferRegisterPolicy()
71 72 73
}

func defaultUnregisterPolicy() dataNodeUnregisterPolicy {
74
	return &randomAssignUnregisterPolicy{}
75 76 77
}

func defaultAssignPolicy() channelAssignPolicy {
S
sunby 已提交
78
	return newBalancedAssignPolicy()
79 80
}

81
func newCluster(ctx context.Context, dataManager *clusterNodeManager, sessionManager sessionManager, posProvider positionProvider, opts ...clusterOption) *cluster {
82 83 84 85
	c := &cluster{
		ctx:              ctx,
		sessionManager:   sessionManager,
		dataManager:      dataManager,
86
		posProvider:      posProvider,
87 88 89
		startupPolicy:    defaultStartupPolicy(),
		registerPolicy:   defaultRegisterPolicy(),
		unregisterPolicy: defaultUnregisterPolicy(),
S
sunby 已提交
90
		assignPolicy:     defaultAssignPolicy(),
S
sunby 已提交
91
	}
92 93
	for _, opt := range opts {
		opt.apply(c)
S
sunby 已提交
94
	}
95 96 97 98 99 100

	return c
}

func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error {
	deltaChange := c.dataManager.updateCluster(dataNodes)
101 102 103 104
	nodes, chanBuffer := c.dataManager.getDataNodes(false)
	var rets []*datapb.DataNodeInfo
	rets, chanBuffer = c.startupPolicy.apply(nodes, deltaChange, chanBuffer)
	c.dataManager.updateDataNodes(rets, chanBuffer)
105
	rets = c.watch(rets)
106
	c.dataManager.updateDataNodes(rets, chanBuffer)
107 108 109 110 111
	return nil
}

func (c *cluster) watch(nodes []*datapb.DataNodeInfo) []*datapb.DataNodeInfo {
	for _, n := range nodes {
S
sunby 已提交
112 113
		logMsg := fmt.Sprintf("Begin to watch channels for node %s:", n.Address)
		uncompletes := make([]vchannel, 0, len(n.Channels))
114 115
		for _, ch := range n.Channels {
			if ch.State == datapb.ChannelWatchState_Uncomplete {
S
sunby 已提交
116 117 118 119 120
				if len(uncompletes) == 0 {
					logMsg += ch.Name
				} else {
					logMsg += "," + ch.Name
				}
121 122 123 124
				uncompletes = append(uncompletes, vchannel{
					CollectionID: ch.CollectionID,
					DmlChannel:   ch.Name,
				})
125 126
			}
		}
S
sunby 已提交
127 128 129 130 131 132

		if len(uncompletes) == 0 {
			continue
		}
		log.Debug(logMsg)

S
sunby 已提交
133
		vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true)
134 135 136 137
		if err != nil {
			log.Warn("get vchannel position failed", zap.Error(err))
			continue
		}
138 139 140 141 142 143
		cli, err := c.sessionManager.getOrCreateSession(n.Address)
		if err != nil {
			log.Warn("get session failed", zap.String("addr", n.Address), zap.Error(err))
			continue
		}
		req := &datapb.WatchDmChannelsRequest{
S
sunby 已提交
144
			Base: &commonpb.MsgBase{
145
				SourceID: Params.NodeID,
S
sunby 已提交
146
			},
147
			Vchannels: vchanInfos,
148 149 150 151 152 153 154 155
		}
		resp, err := cli.WatchDmChannels(c.ctx, req)
		if err != nil {
			log.Warn("watch dm channel failed", zap.String("addr", n.Address), zap.Error(err))
			continue
		}
		if resp.ErrorCode != commonpb.ErrorCode_Success {
			log.Warn("watch channels failed", zap.String("address", n.Address), zap.Error(err))
S
sunby 已提交
156 157
			continue
		}
158 159 160 161 162
		for _, ch := range n.Channels {
			if ch.State == datapb.ChannelWatchState_Uncomplete {
				ch.State = datapb.ChannelWatchState_Complete
			}
		}
S
sunby 已提交
163
	}
164
	return nodes
S
sunby 已提交
165 166
}

167 168 169 170
func (c *cluster) register(n *datapb.DataNodeInfo) {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.dataManager.register(n)
171 172
	cNodes, chanBuffer := c.dataManager.getDataNodes(true)
	var rets []*datapb.DataNodeInfo
173
	log.Debug("before register policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer))
174
	rets, chanBuffer = c.registerPolicy.apply(cNodes, n, chanBuffer)
175
	log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer))
176
	c.dataManager.updateDataNodes(rets, chanBuffer)
177
	rets = c.watch(rets)
178
	c.dataManager.updateDataNodes(rets, chanBuffer)
179 180 181 182 183
}

func (c *cluster) unregister(n *datapb.DataNodeInfo) {
	c.mu.Lock()
	defer c.mu.Unlock()
184

S
sunby 已提交
185
	c.sessionManager.releaseSession(n.Address)
186 187 188 189
	oldNode := c.dataManager.unregister(n)
	if oldNode != nil {
		n = oldNode
	}
190
	cNodes, chanBuffer := c.dataManager.getDataNodes(true)
191
	log.Debug("before unregister policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer))
192 193 194 195 196 197 198 199 200
	var rets []*datapb.DataNodeInfo
	if len(cNodes) == 0 {
		for _, chStat := range n.Channels {
			chStat.State = datapb.ChannelWatchState_Uncomplete
			chanBuffer = append(chanBuffer, chStat)
		}
	} else {
		rets = c.unregisterPolicy.apply(cNodes, n)
	}
201
	log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer))
202
	c.dataManager.updateDataNodes(rets, chanBuffer)
203
	rets = c.watch(rets)
204
	c.dataManager.updateDataNodes(rets, chanBuffer)
205 206
}

207
func (c *cluster) watchIfNeeded(channel string, collectionID UniqueID) {
208 209
	c.mu.Lock()
	defer c.mu.Unlock()
210 211 212 213 214 215 216 217 218 219 220 221
	cNodes, chanBuffer := c.dataManager.getDataNodes(true)
	var rets []*datapb.DataNodeInfo
	if len(cNodes) == 0 { // no nodes to assign, put into buffer
		chanBuffer = append(chanBuffer, &datapb.ChannelStatus{
			Name:         channel,
			CollectionID: collectionID,
			State:        datapb.ChannelWatchState_Uncomplete,
		})
	} else {
		rets = c.assignPolicy.apply(cNodes, channel, collectionID)
	}
	c.dataManager.updateDataNodes(rets, chanBuffer)
222
	rets = c.watch(rets)
223
	c.dataManager.updateDataNodes(rets, chanBuffer)
224 225 226 227 228 229 230 231 232 233 234
}

func (c *cluster) flush(segments []*datapb.SegmentInfo) {
	c.mu.Lock()
	defer c.mu.Unlock()

	m := make(map[string]map[UniqueID][]UniqueID) // channel-> map[collectionID]segmentIDs

	for _, seg := range segments {
		if _, ok := m[seg.InsertChannel]; !ok {
			m[seg.InsertChannel] = make(map[UniqueID][]UniqueID)
S
sunby 已提交
235
		}
236 237

		m[seg.InsertChannel][seg.CollectionID] = append(m[seg.InsertChannel][seg.CollectionID], seg.ID)
S
sunby 已提交
238 239
	}

240
	dataNodes, _ := c.dataManager.getDataNodes(true)
241 242 243 244 245

	channel2Node := make(map[string]string)
	for _, node := range dataNodes {
		for _, chstatus := range node.Channels {
			channel2Node[chstatus.Name] = node.Address
S
sunby 已提交
246 247
		}
	}
S
sunby 已提交
248

249 250 251 252 253 254 255 256
	for ch, coll2seg := range m {
		node, ok := channel2Node[ch]
		if !ok {
			continue
		}
		cli, err := c.sessionManager.getOrCreateSession(node)
		if err != nil {
			log.Warn("get session failed", zap.String("addr", node), zap.Error(err))
S
sunby 已提交
257 258
			continue
		}
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
		for coll, segs := range coll2seg {
			req := &datapb.FlushSegmentsRequest{
				Base: &commonpb.MsgBase{
					MsgType:  commonpb.MsgType_Flush,
					SourceID: Params.NodeID,
				},
				CollectionID: coll,
				SegmentIDs:   segs,
			}
			resp, err := cli.FlushSegments(c.ctx, req)
			if err != nil {
				log.Warn("flush segment failed", zap.String("addr", node), zap.Error(err))
				continue
			}
			if resp.ErrorCode != commonpb.ErrorCode_Success {
				log.Warn("flush segment failed", zap.String("dataNode", node), zap.Error(err))
				continue
			}
			log.Debug("flush segments succeed", zap.Any("segmentIDs", segs))
		}
S
sunby 已提交
279 280
	}
}
S
sunby 已提交
281

282 283 284 285
func (c *cluster) releaseSessions() {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.sessionManager.release()
S
sunby 已提交
286
}