handler.go 12.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

17 18 19 20 21
package datacoord

import (
	"context"

S
SimFG 已提交
22
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
23 24 25
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
26
	"github.com/milvus-io/milvus/internal/util/funcutil"
27
	"github.com/milvus-io/milvus/internal/util/tsoutil"
28
	"github.com/milvus-io/milvus/internal/util/typeutil"
G
godchen 已提交
29
	"go.uber.org/zap"
30 31
)

32
// Handler handles some channel method for ChannelManager
33
type Handler interface {
X
XuanYang-cn 已提交
34 35 36 37
	// GetQueryVChanPositions gets the information recovery needed of a channel for QueryCoord
	GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
	// GetDataVChanPositions gets the information recovery needed of a channel for DataNode
	GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
38
	CheckShouldDropChannel(channel string) bool
39
	FinishDropChannel(channel string) error
40
	GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
41 42
}

43
// ServerHandler is a helper of Server
44 45 46 47
type ServerHandler struct {
	s *Server
}

48
// newServerHandler creates a new ServerHandler
49 50 51 52
func newServerHandler(s *Server) *ServerHandler {
	return &ServerHandler{s: s}
}

X
XuanYang-cn 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
// GetDataVChanPositions gets vchannel latest postitions with provided dml channel names for DataNode.
func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
		return s.InsertChannel == channel.Name
	})
	log.Info("GetDataVChanPositions",
		zap.Int64("collectionID", channel.CollectionID),
		zap.String("channel", channel.Name),
		zap.Int("numOfSegments", len(segments)),
	)
	var (
		flushedIDs   = make(typeutil.UniqueSet)
		unflushedIDs = make(typeutil.UniqueSet)
		droppedIDs   = make(typeutil.UniqueSet)
	)
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
		if s.GetIsImporting() {
G
groot 已提交
74
			// Skip bulk insert segments.
X
XuanYang-cn 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
			continue
		}

		if s.GetState() == commonpb.SegmentState_Dropped {
			droppedIDs.Insert(s.GetID())
			continue
		} else if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed {
			flushedIDs.Insert(s.GetID())
		} else {
			unflushedIDs.Insert(s.GetID())
		}
	}

	return &datapb.VchannelInfo{
		CollectionID:        channel.CollectionID,
		ChannelName:         channel.Name,
91
		SeekPosition:        h.GetChannelSeekPosition(channel, partitionID),
X
XuanYang-cn 已提交
92 93 94 95 96 97 98
		FlushedSegmentIds:   flushedIDs.Collect(),
		UnflushedSegmentIds: unflushedIDs.Collect(),
		DroppedSegmentIds:   droppedIDs.Collect(),
	}
}

// GetQueryVChanPositions gets vchannel latest postitions with provided dml channel names for QueryCoord,
99 100
// we expect QueryCoord gets the indexed segments to load, so the flushed segments below are actually the indexed segments,
// the unflushed segments are actually the segments without index, even they are flushed.
X
XuanYang-cn 已提交
101
func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
102 103
	// cannot use GetSegmentsByChannel since dropped segments are needed here
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
104
		return s.InsertChannel == channel.Name
105
	})
106
	segmentInfos := make(map[int64]*SegmentInfo)
107
	indexedSegments := FilterInIndexedSegments(h, h.s.indexCoord, segments...)
108 109 110 111
	indexed := make(typeutil.UniqueSet)
	for _, segment := range indexedSegments {
		indexed.Insert(segment.GetID())
	}
X
XuanYang-cn 已提交
112 113 114 115
	log.Info("GetQueryVChanPositions",
		zap.Int64("collectionID", channel.CollectionID),
		zap.String("channel", channel.Name),
		zap.Int("numOfSegments", len(segments)),
116
	)
117
	var (
118 119 120
		indexedIDs   = make(typeutil.UniqueSet)
		unIndexedIDs = make(typeutil.UniqueSet)
		droppedIDs   = make(typeutil.UniqueSet)
121
	)
122 123 124 125 126
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
127
		if s.GetIsImporting() {
G
groot 已提交
128
			// Skip bulk insert segments.
129 130
			continue
		}
131
		segmentInfos[s.GetID()] = s
132
		if s.GetState() == commonpb.SegmentState_Dropped {
133
			droppedIDs.Insert(s.GetID())
134
		} else if indexed.Contain(s.GetID()) {
135
			indexedIDs.Insert(s.GetID())
136
		} else {
137
			unIndexedIDs.Insert(s.GetID())
138 139
		}
	}
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
	hasUnIndexed := true
	for hasUnIndexed {
		hasUnIndexed = false
		for id := range unIndexedIDs {
			// Indexed segments are compacted to a raw segment,
			// replace it with the indexed ones
			if len(segmentInfos[id].GetCompactionFrom()) > 0 {
				unIndexedIDs.Remove(id)
				for _, segID := range segmentInfos[id].GetCompactionFrom() {
					if indexed.Contain(segID) {
						indexedIDs.Insert(segID)
					} else {
						unIndexedIDs.Insert(segID)
						hasUnIndexed = true
					}
				}
				droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
			}
158
		}
159
	}
160

161 162 163
	return &datapb.VchannelInfo{
		CollectionID:        channel.CollectionID,
		ChannelName:         channel.Name,
164
		SeekPosition:        h.GetChannelSeekPosition(channel, partitionID),
165 166 167
		FlushedSegmentIds:   indexedIDs.Collect(),
		UnflushedSegmentIds: unIndexedIDs.Collect(),
		DroppedSegmentIds:   droppedIDs.Collect(),
168
	}
169
}
170

171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
// getEarliestSegmentDMLPos returns the earliest dml position of segments,
// this is mainly for COMPATIBILITY with old version <=2.1.x
func (h *ServerHandler) getEarliestSegmentDMLPos(channel *channel, partitionID UniqueID) *internalpb.MsgPosition {
	var minPos *internalpb.MsgPosition
	var minPosSegID int64
	var minPosTs uint64
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
		return s.InsertChannel == channel.Name
	})
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
		if s.GetIsImporting() {
			// Skip bulk insert segments.
			continue
		}
		if s.GetState() == commonpb.SegmentState_Dropped {
			continue
		}

		var segmentPosition *internalpb.MsgPosition
		if s.GetDmlPosition() != nil {
			segmentPosition = s.GetDmlPosition()
		} else {
			segmentPosition = s.GetStartPosition()
		}
		if minPos == nil || segmentPosition.Timestamp < minPos.Timestamp {
			minPosSegID = s.GetID()
			minPosTs = segmentPosition.GetTimestamp()
			minPos = segmentPosition
		}
	}
	if minPos != nil {
		log.Info("getEarliestSegmentDMLPos done",
			zap.Int64("segment ID", minPosSegID),
			zap.Uint64("posTs", minPosTs),
			zap.Time("posTime", tsoutil.PhysicalTime(minPosTs)))
	}
	return minPos
}

// getCollectionStartPos returns collection start position.
func (h *ServerHandler) getCollectionStartPos(channel *channel) *internalpb.MsgPosition {
	// use collection start position when segment position is not found
	var startPosition *internalpb.MsgPosition
	if channel.StartPositions == nil {
		collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
		if collection != nil && err == nil {
			startPosition = getCollectionStartPosition(channel.Name, collection)
		}
		log.Info("NEITHER segment position or channel start position are found, setting channel seek position to collection start position",
224
			zap.String("channel", channel.Name),
225 226
			zap.Uint64("posTs", startPosition.GetTimestamp()),
			zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
227 228
		)
	} else {
229 230 231 232 233 234 235 236 237 238 239 240
		// use passed start positions, skip to ask RootCoord.
		startPosition = toMsgPosition(channel.Name, channel.StartPositions)
		log.Info("segment position not found, setting channel seek position to channel start position",
			zap.String("channel", channel.Name),
			zap.Uint64("posTs", startPosition.GetTimestamp()),
			zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
		)
	}
	return startPosition
}

// GetChannelSeekPosition gets channel seek position from:
241 242 243 244
//  1. Channel checkpoint meta;
//  2. Segments earliest dml position;
//  3. Collection start position;
//     And would return if any position is valid.
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
func (h *ServerHandler) GetChannelSeekPosition(channel *channel, partitionID UniqueID) *internalpb.MsgPosition {
	var seekPosition *internalpb.MsgPosition
	seekPosition = h.s.meta.GetChannelCheckpoint(channel.Name)
	if seekPosition != nil {
		log.Info("channel seek position set from channel checkpoint meta",
			zap.String("channel", channel.Name),
			zap.Uint64("posTs", seekPosition.Timestamp),
			zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
		return seekPosition
	}

	seekPosition = h.getEarliestSegmentDMLPos(channel, partitionID)
	if seekPosition != nil {
		log.Info("channel seek position set from earliest segment dml position",
			zap.String("channel", channel.Name),
			zap.Uint64("posTs", seekPosition.Timestamp),
			zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
		return seekPosition
263
	}
264 265 266 267 268 269 270 271 272 273 274 275 276

	seekPosition = h.getCollectionStartPos(channel)
	if seekPosition != nil {
		log.Info("channel seek position set from collection start position",
			zap.String("channel", channel.Name),
			zap.Uint64("posTs", seekPosition.Timestamp),
			zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
		return seekPosition
	}

	log.Warn("get channel checkpoint failed, channelCPMeta and earliestSegDMLPos and collStartPos are all invalid",
		zap.String("channel", channel.Name))
	return nil
277 278
}

J
jaime 已提交
279 280
func getCollectionStartPosition(channel string, collectionInfo *collectionInfo) *internalpb.MsgPosition {
	return toMsgPosition(channel, collectionInfo.StartPositions)
281 282 283 284
}

func toMsgPosition(channel string, startPositions []*commonpb.KeyDataPair) *internalpb.MsgPosition {
	for _, sp := range startPositions {
285
		if sp.GetKey() != funcutil.ToPhysicalChannel(channel) {
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
			continue
		}
		return &internalpb.MsgPosition{
			ChannelName: channel,
			MsgID:       sp.GetData(),
		}
	}
	return nil
}

// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
	return &datapb.SegmentInfo{
		ID:             info.ID,
		CollectionID:   info.CollectionID,
		PartitionID:    info.PartitionID,
		InsertChannel:  info.InsertChannel,
		NumOfRows:      info.NumOfRows,
		State:          info.State,
		MaxRowNum:      info.MaxRowNum,
		LastExpireTime: info.LastExpireTime,
		StartPosition:  info.StartPosition,
		DmlPosition:    info.DmlPosition,
	}
}

312
// GetCollection returns collection info with specified collection id
313
func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) {
314 315
	coll := h.s.meta.GetCollection(collectionID)
	if coll != nil {
316
		return coll, nil
317 318 319 320
	}
	err := h.s.loadCollectionFromRootCoord(ctx, collectionID)
	if err != nil {
		log.Warn("failed to load collection from rootcoord", zap.Int64("collectionID", collectionID), zap.Error(err))
321
		return nil, err
322 323
	}

324
	return h.s.meta.GetCollection(collectionID), nil
325 326
}

327
// CheckShouldDropChannel returns whether specified channel is marked to be removed
328
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
329 330 331 332 333 334 335 336 337 338 339 340 341
	/*
		segments := h.s.meta.GetSegmentsByChannel(channel)
		for _, segment := range segments {
			if segment.GetStartPosition() != nil && // filter empty segment
				// FIXME: we filter compaction generated segments
				// because datanode may not know the segment due to the network lag or
				// datacoord crash when handling CompleteCompaction.
				// FIXME: cancel this limitation for #12265
				// need to change a unified DropAndFlush to solve the root problem
				//len(segment.CompactionFrom) == 0 &&
				segment.GetState() != commonpb.SegmentState_Dropped {
				return false
			}
342
		}
343
		return false*/
344
	return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel)
345 346
}

347 348
// FinishDropChannel cleans up the remove flag for channels
// this function is a wrapper of server.meta.FinishDropChannel
349 350 351 352 353 354
func (h *ServerHandler) FinishDropChannel(channel string) error {
	err := h.s.meta.catalog.DropChannel(h.s.ctx, channel)
	if err != nil {
		log.Warn("DropChannel failed", zap.String("vChannel", channel), zap.Error(err))
		return err
	}
355 356
	log.Info("DropChannel succeeded", zap.String("vChannel", channel))
	// Channel checkpoints are cleaned up during garbage collection.
357
	return nil
358
}