handler.go 12.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

17 18 19 20 21
package datacoord

import (
	"context"

S
SimFG 已提交
22
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
23 24 25
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
26
	"github.com/milvus-io/milvus/internal/util/funcutil"
27
	"github.com/milvus-io/milvus/internal/util/tsoutil"
28
	"github.com/milvus-io/milvus/internal/util/typeutil"
G
godchen 已提交
29
	"go.uber.org/zap"
30 31
)

32
// Handler handles some channel method for ChannelManager
33
type Handler interface {
X
XuanYang-cn 已提交
34
	// GetQueryVChanPositions gets the information recovery needed of a channel for QueryCoord
35
	GetQueryVChanPositions(channel *channel, partitionID UniqueID) (*datapb.VchannelInfo, error)
X
XuanYang-cn 已提交
36 37
	// GetDataVChanPositions gets the information recovery needed of a channel for DataNode
	GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
38
	CheckShouldDropChannel(channel string) bool
39
	FinishDropChannel(channel string) error
40
	GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
41 42
}

43
// ServerHandler is a helper of Server
44 45 46 47
type ServerHandler struct {
	s *Server
}

48
// newServerHandler creates a new ServerHandler
49 50 51 52
func newServerHandler(s *Server) *ServerHandler {
	return &ServerHandler{s: s}
}

X
XuanYang-cn 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
// GetDataVChanPositions gets vchannel latest postitions with provided dml channel names for DataNode.
func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
		return s.InsertChannel == channel.Name
	})
	log.Info("GetDataVChanPositions",
		zap.Int64("collectionID", channel.CollectionID),
		zap.String("channel", channel.Name),
		zap.Int("numOfSegments", len(segments)),
	)
	var (
		flushedIDs   = make(typeutil.UniqueSet)
		unflushedIDs = make(typeutil.UniqueSet)
		droppedIDs   = make(typeutil.UniqueSet)
	)
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
		if s.GetIsImporting() {
G
groot 已提交
74
			// Skip bulk insert segments.
X
XuanYang-cn 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
			continue
		}

		if s.GetState() == commonpb.SegmentState_Dropped {
			droppedIDs.Insert(s.GetID())
			continue
		} else if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed {
			flushedIDs.Insert(s.GetID())
		} else {
			unflushedIDs.Insert(s.GetID())
		}
	}

	return &datapb.VchannelInfo{
		CollectionID:        channel.CollectionID,
		ChannelName:         channel.Name,
91
		SeekPosition:        h.GetChannelSeekPosition(channel, partitionID),
X
XuanYang-cn 已提交
92 93 94 95 96 97 98
		FlushedSegmentIds:   flushedIDs.Collect(),
		UnflushedSegmentIds: unflushedIDs.Collect(),
		DroppedSegmentIds:   droppedIDs.Collect(),
	}
}

// GetQueryVChanPositions gets vchannel latest postitions with provided dml channel names for QueryCoord,
99 100
// we expect QueryCoord gets the indexed segments to load, so the flushed segments below are actually the indexed segments,
// the unflushed segments are actually the segments without index, even they are flushed.
101
func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) (*datapb.VchannelInfo, error) {
102 103
	// cannot use GetSegmentsByChannel since dropped segments are needed here
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
104
		return s.InsertChannel == channel.Name
105
	})
106
	segmentInfos := make(map[int64]*SegmentInfo)
107 108 109 110 111
	indexedSegments, err := FilterInIndexedSegments(h, h.s.indexCoord, segments...)
	if err != nil {
		log.Warn("filter indexed segment failed", zap.String("channel", channel.Name), zap.Error(err))
		return nil, err
	}
112 113 114 115
	indexed := make(typeutil.UniqueSet)
	for _, segment := range indexedSegments {
		indexed.Insert(segment.GetID())
	}
X
XuanYang-cn 已提交
116 117 118 119
	log.Info("GetQueryVChanPositions",
		zap.Int64("collectionID", channel.CollectionID),
		zap.String("channel", channel.Name),
		zap.Int("numOfSegments", len(segments)),
120
	)
121
	var (
122 123 124
		indexedIDs   = make(typeutil.UniqueSet)
		unIndexedIDs = make(typeutil.UniqueSet)
		droppedIDs   = make(typeutil.UniqueSet)
125
	)
126 127 128 129 130
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
131
		if s.GetIsImporting() {
G
groot 已提交
132
			// Skip bulk insert segments.
133 134
			continue
		}
135
		segmentInfos[s.GetID()] = s
136
		if s.GetState() == commonpb.SegmentState_Dropped {
137
			droppedIDs.Insert(s.GetID())
138
		} else if indexed.Contain(s.GetID()) {
139
			indexedIDs.Insert(s.GetID())
140
		} else {
141
			unIndexedIDs.Insert(s.GetID())
142 143
		}
	}
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
	hasUnIndexed := true
	for hasUnIndexed {
		hasUnIndexed = false
		for id := range unIndexedIDs {
			// Indexed segments are compacted to a raw segment,
			// replace it with the indexed ones
			if len(segmentInfos[id].GetCompactionFrom()) > 0 {
				unIndexedIDs.Remove(id)
				for _, segID := range segmentInfos[id].GetCompactionFrom() {
					if indexed.Contain(segID) {
						indexedIDs.Insert(segID)
					} else {
						unIndexedIDs.Insert(segID)
						hasUnIndexed = true
					}
				}
				droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
			}
162
		}
163
	}
164

165 166 167
	return &datapb.VchannelInfo{
		CollectionID:        channel.CollectionID,
		ChannelName:         channel.Name,
168
		SeekPosition:        h.GetChannelSeekPosition(channel, partitionID),
169 170 171
		FlushedSegmentIds:   indexedIDs.Collect(),
		UnflushedSegmentIds: unIndexedIDs.Collect(),
		DroppedSegmentIds:   droppedIDs.Collect(),
172
	}, nil
173
}
174

175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
// getEarliestSegmentDMLPos returns the earliest dml position of segments,
// this is mainly for COMPATIBILITY with old version <=2.1.x
func (h *ServerHandler) getEarliestSegmentDMLPos(channel *channel, partitionID UniqueID) *internalpb.MsgPosition {
	var minPos *internalpb.MsgPosition
	var minPosSegID int64
	var minPosTs uint64
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
		return s.InsertChannel == channel.Name
	})
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
		if s.GetIsImporting() {
			// Skip bulk insert segments.
			continue
		}
		if s.GetState() == commonpb.SegmentState_Dropped {
			continue
		}

		var segmentPosition *internalpb.MsgPosition
		if s.GetDmlPosition() != nil {
			segmentPosition = s.GetDmlPosition()
		} else {
			segmentPosition = s.GetStartPosition()
		}
		if minPos == nil || segmentPosition.Timestamp < minPos.Timestamp {
			minPosSegID = s.GetID()
			minPosTs = segmentPosition.GetTimestamp()
			minPos = segmentPosition
		}
	}
	if minPos != nil {
		log.Info("getEarliestSegmentDMLPos done",
			zap.Int64("segment ID", minPosSegID),
			zap.Uint64("posTs", minPosTs),
			zap.Time("posTime", tsoutil.PhysicalTime(minPosTs)))
	}
	return minPos
}

// getCollectionStartPos returns collection start position.
func (h *ServerHandler) getCollectionStartPos(channel *channel) *internalpb.MsgPosition {
	// use collection start position when segment position is not found
	var startPosition *internalpb.MsgPosition
	if channel.StartPositions == nil {
		collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
		if collection != nil && err == nil {
			startPosition = getCollectionStartPosition(channel.Name, collection)
		}
		log.Info("NEITHER segment position or channel start position are found, setting channel seek position to collection start position",
228
			zap.String("channel", channel.Name),
229 230
			zap.Uint64("posTs", startPosition.GetTimestamp()),
			zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
231 232
		)
	} else {
233 234 235 236 237 238 239 240 241 242 243 244
		// use passed start positions, skip to ask RootCoord.
		startPosition = toMsgPosition(channel.Name, channel.StartPositions)
		log.Info("segment position not found, setting channel seek position to channel start position",
			zap.String("channel", channel.Name),
			zap.Uint64("posTs", startPosition.GetTimestamp()),
			zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
		)
	}
	return startPosition
}

// GetChannelSeekPosition gets channel seek position from:
245 246 247 248
//  1. Channel checkpoint meta;
//  2. Segments earliest dml position;
//  3. Collection start position;
//     And would return if any position is valid.
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
func (h *ServerHandler) GetChannelSeekPosition(channel *channel, partitionID UniqueID) *internalpb.MsgPosition {
	var seekPosition *internalpb.MsgPosition
	seekPosition = h.s.meta.GetChannelCheckpoint(channel.Name)
	if seekPosition != nil {
		log.Info("channel seek position set from channel checkpoint meta",
			zap.String("channel", channel.Name),
			zap.Uint64("posTs", seekPosition.Timestamp),
			zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
		return seekPosition
	}

	seekPosition = h.getEarliestSegmentDMLPos(channel, partitionID)
	if seekPosition != nil {
		log.Info("channel seek position set from earliest segment dml position",
			zap.String("channel", channel.Name),
			zap.Uint64("posTs", seekPosition.Timestamp),
			zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
		return seekPosition
267
	}
268 269 270 271 272 273 274 275 276 277 278 279 280

	seekPosition = h.getCollectionStartPos(channel)
	if seekPosition != nil {
		log.Info("channel seek position set from collection start position",
			zap.String("channel", channel.Name),
			zap.Uint64("posTs", seekPosition.Timestamp),
			zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
		return seekPosition
	}

	log.Warn("get channel checkpoint failed, channelCPMeta and earliestSegDMLPos and collStartPos are all invalid",
		zap.String("channel", channel.Name))
	return nil
281 282
}

J
jaime 已提交
283
func getCollectionStartPosition(channel string, collectionInfo *collectionInfo) *internalpb.MsgPosition {
284 285 286 287 288
	position := toMsgPosition(channel, collectionInfo.StartPositions)
	if position != nil {
		position.Timestamp = collectionInfo.CreatedAt
	}
	return position
289 290 291 292
}

func toMsgPosition(channel string, startPositions []*commonpb.KeyDataPair) *internalpb.MsgPosition {
	for _, sp := range startPositions {
293
		if sp.GetKey() != funcutil.ToPhysicalChannel(channel) {
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
			continue
		}
		return &internalpb.MsgPosition{
			ChannelName: channel,
			MsgID:       sp.GetData(),
		}
	}
	return nil
}

// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
	return &datapb.SegmentInfo{
		ID:             info.ID,
		CollectionID:   info.CollectionID,
		PartitionID:    info.PartitionID,
		InsertChannel:  info.InsertChannel,
		NumOfRows:      info.NumOfRows,
		State:          info.State,
		MaxRowNum:      info.MaxRowNum,
		LastExpireTime: info.LastExpireTime,
		StartPosition:  info.StartPosition,
		DmlPosition:    info.DmlPosition,
	}
}

320
// GetCollection returns collection info with specified collection id
321
func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) {
322 323
	coll := h.s.meta.GetCollection(collectionID)
	if coll != nil {
324
		return coll, nil
325 326 327 328
	}
	err := h.s.loadCollectionFromRootCoord(ctx, collectionID)
	if err != nil {
		log.Warn("failed to load collection from rootcoord", zap.Int64("collectionID", collectionID), zap.Error(err))
329
		return nil, err
330 331
	}

332
	return h.s.meta.GetCollection(collectionID), nil
333 334
}

335
// CheckShouldDropChannel returns whether specified channel is marked to be removed
336
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
337 338
	return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel) ||
		!h.s.meta.catalog.ChannelExists(h.s.ctx, channel)
339 340
}

341 342
// FinishDropChannel cleans up the remove flag for channels
// this function is a wrapper of server.meta.FinishDropChannel
343 344 345 346 347 348
func (h *ServerHandler) FinishDropChannel(channel string) error {
	err := h.s.meta.catalog.DropChannel(h.s.ctx, channel)
	if err != nil {
		log.Warn("DropChannel failed", zap.String("vChannel", channel), zap.Error(err))
		return err
	}
349 350
	log.Info("DropChannel succeeded", zap.String("vChannel", channel))
	// Channel checkpoints are cleaned up during garbage collection.
351
	return nil
352
}