handler.go 12.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

17 18 19 20 21
package datacoord

import (
	"context"

S
SimFG 已提交
22
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
23 24 25
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
26
	"github.com/milvus-io/milvus/internal/util/funcutil"
27
	"github.com/milvus-io/milvus/internal/util/tsoutil"
28
	"github.com/milvus-io/milvus/internal/util/typeutil"
G
godchen 已提交
29
	"go.uber.org/zap"
30 31
)

32
// Handler handles some channel method for ChannelManager
33
type Handler interface {
X
XuanYang-cn 已提交
34 35 36 37
	// GetQueryVChanPositions gets the information recovery needed of a channel for QueryCoord
	GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
	// GetDataVChanPositions gets the information recovery needed of a channel for DataNode
	GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
38
	CheckShouldDropChannel(channel string) bool
39
	FinishDropChannel(channel string) error
40
	GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
41 42
}

43
// ServerHandler is a helper of Server
44 45 46 47
type ServerHandler struct {
	s *Server
}

48
// newServerHandler creates a new ServerHandler
49 50 51 52
func newServerHandler(s *Server) *ServerHandler {
	return &ServerHandler{s: s}
}

X
XuanYang-cn 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
// GetDataVChanPositions gets vchannel latest postitions with provided dml channel names for DataNode.
func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
		return s.InsertChannel == channel.Name
	})
	log.Info("GetDataVChanPositions",
		zap.Int64("collectionID", channel.CollectionID),
		zap.String("channel", channel.Name),
		zap.Int("numOfSegments", len(segments)),
	)
	var (
		flushedIDs   = make(typeutil.UniqueSet)
		unflushedIDs = make(typeutil.UniqueSet)
		droppedIDs   = make(typeutil.UniqueSet)
	)
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
		if s.GetIsImporting() {
G
groot 已提交
74
			// Skip bulk insert segments.
X
XuanYang-cn 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
			continue
		}

		if s.GetState() == commonpb.SegmentState_Dropped {
			droppedIDs.Insert(s.GetID())
			continue
		} else if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed {
			flushedIDs.Insert(s.GetID())
		} else {
			unflushedIDs.Insert(s.GetID())
		}
	}

	return &datapb.VchannelInfo{
		CollectionID:        channel.CollectionID,
		ChannelName:         channel.Name,
91
		SeekPosition:        h.GetChannelSeekPosition(channel, partitionID),
X
XuanYang-cn 已提交
92 93 94 95 96 97 98
		FlushedSegmentIds:   flushedIDs.Collect(),
		UnflushedSegmentIds: unflushedIDs.Collect(),
		DroppedSegmentIds:   droppedIDs.Collect(),
	}
}

// GetQueryVChanPositions gets vchannel latest postitions with provided dml channel names for QueryCoord,
99 100
// we expect QueryCoord gets the indexed segments to load, so the flushed segments below are actually the indexed segments,
// the unflushed segments are actually the segments without index, even they are flushed.
X
XuanYang-cn 已提交
101
func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
102 103
	// cannot use GetSegmentsByChannel since dropped segments are needed here
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
104
		return s.InsertChannel == channel.Name
105
	})
106
	segmentInfos := make(map[int64]*SegmentInfo)
107
	indexedSegments := FilterInIndexedSegments(h, h.s.indexCoord, segments...)
108 109 110 111
	indexed := make(typeutil.UniqueSet)
	for _, segment := range indexedSegments {
		indexed.Insert(segment.GetID())
	}
X
XuanYang-cn 已提交
112 113 114 115
	log.Info("GetQueryVChanPositions",
		zap.Int64("collectionID", channel.CollectionID),
		zap.String("channel", channel.Name),
		zap.Int("numOfSegments", len(segments)),
116
	)
117
	var (
118 119 120
		indexedIDs   = make(typeutil.UniqueSet)
		unIndexedIDs = make(typeutil.UniqueSet)
		droppedIDs   = make(typeutil.UniqueSet)
121
	)
122 123 124 125 126
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
127
		if s.GetIsImporting() {
G
groot 已提交
128
			// Skip bulk insert segments.
129 130
			continue
		}
131
		segmentInfos[s.GetID()] = s
132
		if s.GetState() == commonpb.SegmentState_Dropped {
133
			droppedIDs.Insert(s.GetID())
134
		} else if indexed.Contain(s.GetID()) {
135
			indexedIDs.Insert(s.GetID())
136
		} else {
137
			unIndexedIDs.Insert(s.GetID())
138 139
		}
	}
C
cai.zhang 已提交
140 141 142 143 144 145 146 147
	for id := range unIndexedIDs {
		// Indexed segments are compacted to a raw segment,
		// replace it with the indexed ones
		if len(segmentInfos[id].GetCompactionFrom()) > 0 &&
			indexed.Contain(segmentInfos[id].GetCompactionFrom()...) {
			unIndexedIDs.Remove(id)
			indexedIDs.Insert(segmentInfos[id].GetCompactionFrom()...)
			droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
148
		}
149
	}
150

151 152 153
	return &datapb.VchannelInfo{
		CollectionID:        channel.CollectionID,
		ChannelName:         channel.Name,
154
		SeekPosition:        h.GetChannelSeekPosition(channel, partitionID),
155 156 157
		FlushedSegmentIds:   indexedIDs.Collect(),
		UnflushedSegmentIds: unIndexedIDs.Collect(),
		DroppedSegmentIds:   droppedIDs.Collect(),
158
	}
159
}
160

161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
// getEarliestSegmentDMLPos returns the earliest dml position of segments,
// this is mainly for COMPATIBILITY with old version <=2.1.x
func (h *ServerHandler) getEarliestSegmentDMLPos(channel *channel, partitionID UniqueID) *internalpb.MsgPosition {
	var minPos *internalpb.MsgPosition
	var minPosSegID int64
	var minPosTs uint64
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
		return s.InsertChannel == channel.Name
	})
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
		if s.GetIsImporting() {
			// Skip bulk insert segments.
			continue
		}
		if s.GetState() == commonpb.SegmentState_Dropped {
			continue
		}

		var segmentPosition *internalpb.MsgPosition
		if s.GetDmlPosition() != nil {
			segmentPosition = s.GetDmlPosition()
		} else {
			segmentPosition = s.GetStartPosition()
		}
		if minPos == nil || segmentPosition.Timestamp < minPos.Timestamp {
			minPosSegID = s.GetID()
			minPosTs = segmentPosition.GetTimestamp()
			minPos = segmentPosition
		}
	}
	if minPos != nil {
		log.Info("getEarliestSegmentDMLPos done",
			zap.Int64("segment ID", minPosSegID),
			zap.Uint64("posTs", minPosTs),
			zap.Time("posTime", tsoutil.PhysicalTime(minPosTs)))
	}
	return minPos
}

// getCollectionStartPos returns collection start position.
func (h *ServerHandler) getCollectionStartPos(channel *channel) *internalpb.MsgPosition {
	// use collection start position when segment position is not found
	var startPosition *internalpb.MsgPosition
	if channel.StartPositions == nil {
		collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
		if collection != nil && err == nil {
			startPosition = getCollectionStartPosition(channel.Name, collection)
		}
		log.Info("NEITHER segment position or channel start position are found, setting channel seek position to collection start position",
214
			zap.String("channel", channel.Name),
215 216
			zap.Uint64("posTs", startPosition.GetTimestamp()),
			zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
217 218
		)
	} else {
219 220 221 222 223 224 225 226 227 228 229 230
		// use passed start positions, skip to ask RootCoord.
		startPosition = toMsgPosition(channel.Name, channel.StartPositions)
		log.Info("segment position not found, setting channel seek position to channel start position",
			zap.String("channel", channel.Name),
			zap.Uint64("posTs", startPosition.GetTimestamp()),
			zap.Time("posTime", tsoutil.PhysicalTime(startPosition.GetTimestamp())),
		)
	}
	return startPosition
}

// GetChannelSeekPosition gets channel seek position from:
231 232 233 234
//  1. Channel checkpoint meta;
//  2. Segments earliest dml position;
//  3. Collection start position;
//     And would return if any position is valid.
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
func (h *ServerHandler) GetChannelSeekPosition(channel *channel, partitionID UniqueID) *internalpb.MsgPosition {
	var seekPosition *internalpb.MsgPosition
	seekPosition = h.s.meta.GetChannelCheckpoint(channel.Name)
	if seekPosition != nil {
		log.Info("channel seek position set from channel checkpoint meta",
			zap.String("channel", channel.Name),
			zap.Uint64("posTs", seekPosition.Timestamp),
			zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
		return seekPosition
	}

	seekPosition = h.getEarliestSegmentDMLPos(channel, partitionID)
	if seekPosition != nil {
		log.Info("channel seek position set from earliest segment dml position",
			zap.String("channel", channel.Name),
			zap.Uint64("posTs", seekPosition.Timestamp),
			zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
		return seekPosition
253
	}
254 255 256 257 258 259 260 261 262 263 264 265 266

	seekPosition = h.getCollectionStartPos(channel)
	if seekPosition != nil {
		log.Info("channel seek position set from collection start position",
			zap.String("channel", channel.Name),
			zap.Uint64("posTs", seekPosition.Timestamp),
			zap.Time("posTime", tsoutil.PhysicalTime(seekPosition.GetTimestamp())))
		return seekPosition
	}

	log.Warn("get channel checkpoint failed, channelCPMeta and earliestSegDMLPos and collStartPos are all invalid",
		zap.String("channel", channel.Name))
	return nil
267 268
}

J
jaime 已提交
269 270
func getCollectionStartPosition(channel string, collectionInfo *collectionInfo) *internalpb.MsgPosition {
	return toMsgPosition(channel, collectionInfo.StartPositions)
271 272 273 274
}

func toMsgPosition(channel string, startPositions []*commonpb.KeyDataPair) *internalpb.MsgPosition {
	for _, sp := range startPositions {
275
		if sp.GetKey() != funcutil.ToPhysicalChannel(channel) {
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
			continue
		}
		return &internalpb.MsgPosition{
			ChannelName: channel,
			MsgID:       sp.GetData(),
		}
	}
	return nil
}

// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
	return &datapb.SegmentInfo{
		ID:             info.ID,
		CollectionID:   info.CollectionID,
		PartitionID:    info.PartitionID,
		InsertChannel:  info.InsertChannel,
		NumOfRows:      info.NumOfRows,
		State:          info.State,
		MaxRowNum:      info.MaxRowNum,
		LastExpireTime: info.LastExpireTime,
		StartPosition:  info.StartPosition,
		DmlPosition:    info.DmlPosition,
	}
}

302
// GetCollection returns collection info with specified collection id
303
func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) {
304 305
	coll := h.s.meta.GetCollection(collectionID)
	if coll != nil {
306
		return coll, nil
307 308 309 310
	}
	err := h.s.loadCollectionFromRootCoord(ctx, collectionID)
	if err != nil {
		log.Warn("failed to load collection from rootcoord", zap.Int64("collectionID", collectionID), zap.Error(err))
311
		return nil, err
312 313
	}

314
	return h.s.meta.GetCollection(collectionID), nil
315 316
}

317
// CheckShouldDropChannel returns whether specified channel is marked to be removed
318
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
319 320 321 322 323 324 325 326 327 328 329 330 331
	/*
		segments := h.s.meta.GetSegmentsByChannel(channel)
		for _, segment := range segments {
			if segment.GetStartPosition() != nil && // filter empty segment
				// FIXME: we filter compaction generated segments
				// because datanode may not know the segment due to the network lag or
				// datacoord crash when handling CompleteCompaction.
				// FIXME: cancel this limitation for #12265
				// need to change a unified DropAndFlush to solve the root problem
				//len(segment.CompactionFrom) == 0 &&
				segment.GetState() != commonpb.SegmentState_Dropped {
				return false
			}
332
		}
333
		return false*/
334
	return h.s.meta.catalog.ShouldDropChannel(h.s.ctx, channel)
335 336
}

337 338
// FinishDropChannel cleans up the remove flag for channels
// this function is a wrapper of server.meta.FinishDropChannel
339 340 341 342 343 344
func (h *ServerHandler) FinishDropChannel(channel string) error {
	err := h.s.meta.catalog.DropChannel(h.s.ctx, channel)
	if err != nil {
		log.Warn("DropChannel failed", zap.String("vChannel", channel), zap.Error(err))
		return err
	}
345 346
	log.Info("DropChannel succeeded", zap.String("vChannel", channel))
	// Channel checkpoints are cleaned up during garbage collection.
347
	return nil
348
}