handler.go 10.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

17 18 19 20 21
package datacoord

import (
	"context"

S
SimFG 已提交
22
	"github.com/milvus-io/milvus/api/commonpb"
23 24 25
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
26
	"github.com/milvus-io/milvus/internal/util/funcutil"
27
	"github.com/milvus-io/milvus/internal/util/typeutil"
G
godchen 已提交
28
	"go.uber.org/zap"
29 30
)

31
// Handler handles some channel method for ChannelManager
32
type Handler interface {
X
XuanYang-cn 已提交
33 34 35 36
	// GetQueryVChanPositions gets the information recovery needed of a channel for QueryCoord
	GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
	// GetDataVChanPositions gets the information recovery needed of a channel for DataNode
	GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
37
	CheckShouldDropChannel(channel string) bool
38
	FinishDropChannel(channel string)
39 40
}

41
// ServerHandler is a helper of Server
42 43 44 45
type ServerHandler struct {
	s *Server
}

46
// newServerHandler creates a new ServerHandler
47 48 49 50
func newServerHandler(s *Server) *ServerHandler {
	return &ServerHandler{s: s}
}

X
XuanYang-cn 已提交
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
// GetDataVChanPositions gets vchannel latest postitions with provided dml channel names for DataNode.
func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
		return s.InsertChannel == channel.Name
	})
	log.Info("GetDataVChanPositions",
		zap.Int64("collectionID", channel.CollectionID),
		zap.String("channel", channel.Name),
		zap.Int("numOfSegments", len(segments)),
	)
	var (
		flushedIDs   = make(typeutil.UniqueSet)
		unflushedIDs = make(typeutil.UniqueSet)
		droppedIDs   = make(typeutil.UniqueSet)
		seekPosition *internalpb.MsgPosition
	)
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
		if s.GetIsImporting() {
			// Skip bulk load segments.
			continue
		}

		if s.GetState() == commonpb.SegmentState_Dropped {
			droppedIDs.Insert(s.GetID())
			continue
		} else if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed {
			flushedIDs.Insert(s.GetID())
		} else {
			unflushedIDs.Insert(s.GetID())
		}

		var segmentPosition *internalpb.MsgPosition
		if s.GetDmlPosition() != nil {
			segmentPosition = s.GetDmlPosition()
		} else {
			segmentPosition = s.GetStartPosition()
		}
		if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp {
			seekPosition = segmentPosition
		}
	}

	// use collection start position when segment position is not found
	if seekPosition == nil {
		if channel.StartPositions == nil {
			collection := h.GetCollection(h.s.ctx, channel.CollectionID)
			if collection != nil {
				seekPosition = getCollectionStartPosition(channel.Name, collection)
			}
		} else {
			// use passed start positions, skip to ask rootcoord.
			seekPosition = toMsgPosition(channel.Name, channel.StartPositions)
		}
	}

	return &datapb.VchannelInfo{
		CollectionID:        channel.CollectionID,
		ChannelName:         channel.Name,
		SeekPosition:        seekPosition,
		FlushedSegmentIds:   flushedIDs.Collect(),
		UnflushedSegmentIds: unflushedIDs.Collect(),
		DroppedSegmentIds:   droppedIDs.Collect(),
	}
}

// GetQueryVChanPositions gets vchannel latest postitions with provided dml channel names for QueryCoord,
121 122
// we expect QueryCoord gets the indexed segments to load, so the flushed segments below are actually the indexed segments,
// the unflushed segments are actually the segments without index, even they are flushed.
X
XuanYang-cn 已提交
123
func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
124 125
	// cannot use GetSegmentsByChannel since dropped segments are needed here
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
126
		return s.InsertChannel == channel.Name
127
	})
128 129 130 131 132 133
	segmentInfos := make(map[int64]*SegmentInfo)
	indexedSegments := FilterInIndexedSegments(h.s.meta, h.s.indexCoord, segments...)
	indexed := make(typeutil.UniqueSet)
	for _, segment := range indexedSegments {
		indexed.Insert(segment.GetID())
	}
X
XuanYang-cn 已提交
134 135 136 137
	log.Info("GetQueryVChanPositions",
		zap.Int64("collectionID", channel.CollectionID),
		zap.String("channel", channel.Name),
		zap.Int("numOfSegments", len(segments)),
138
	)
139
	var (
140 141 142
		indexedIDs   = make(typeutil.UniqueSet)
		unIndexedIDs = make(typeutil.UniqueSet)
		droppedIDs   = make(typeutil.UniqueSet)
143 144
		seekPosition *internalpb.MsgPosition
	)
145 146 147 148 149
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
150 151 152 153
		if s.GetIsImporting() {
			// Skip bulk load segments.
			continue
		}
154
		segmentInfos[s.GetID()] = s
155
		if s.GetState() == commonpb.SegmentState_Dropped {
156
			droppedIDs.Insert(s.GetID())
157
		} else if indexed.Contain(s.GetID()) {
158
			indexedIDs.Insert(s.GetID())
159
		} else {
160
			unIndexedIDs.Insert(s.GetID())
161 162
		}
	}
163
	for id := range unIndexedIDs {
164 165
		// Indexed segments are compacted to a raw segment,
		// replace it with the indexed ones
166
		if len(segmentInfos[id].GetCompactionFrom()) > 0 &&
167
			indexed.Contain(segmentInfos[id].GetCompactionFrom()...) {
168 169 170
			unIndexedIDs.Remove(id)
			indexedIDs.Insert(segmentInfos[id].GetCompactionFrom()...)
			droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
171
		}
172
	}
173

174
	for id := range indexedIDs {
175 176 177 178
		var segmentPosition *internalpb.MsgPosition
		segment := segmentInfos[id]
		if segment.GetDmlPosition() != nil {
			segmentPosition = segment.GetDmlPosition()
179
		} else {
180
			segmentPosition = segment.GetStartPosition()
181 182
		}

183 184 185 186
		if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp {
			seekPosition = segmentPosition
		}
	}
187
	for id := range unIndexedIDs {
188
		var segmentPosition *internalpb.MsgPosition
189 190 191
		segment := segmentInfos[id]
		if segment.GetDmlPosition() != nil {
			segmentPosition = segment.GetDmlPosition()
192
		} else {
193
			segmentPosition = segment.GetStartPosition()
194 195 196 197 198 199
		}

		if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp {
			seekPosition = segmentPosition
		}
	}
200

201 202
	// use collection start position when segment position is not found
	if seekPosition == nil {
203 204 205 206 207 208 209 210
		if channel.StartPositions == nil {
			collection := h.GetCollection(h.s.ctx, channel.CollectionID)
			if collection != nil {
				seekPosition = getCollectionStartPosition(channel.Name, collection)
			}
		} else {
			// use passed start positions, skip to ask rootcoord.
			seekPosition = toMsgPosition(channel.Name, channel.StartPositions)
211 212 213 214
		}
	}

	return &datapb.VchannelInfo{
215 216
		CollectionID:        channel.CollectionID,
		ChannelName:         channel.Name,
217
		SeekPosition:        seekPosition,
218 219 220
		FlushedSegmentIds:   indexedIDs.Collect(),
		UnflushedSegmentIds: unIndexedIDs.Collect(),
		DroppedSegmentIds:   droppedIDs.Collect(),
221 222 223
	}
}

J
jaime 已提交
224 225
func getCollectionStartPosition(channel string, collectionInfo *collectionInfo) *internalpb.MsgPosition {
	return toMsgPosition(channel, collectionInfo.StartPositions)
226 227 228 229
}

func toMsgPosition(channel string, startPositions []*commonpb.KeyDataPair) *internalpb.MsgPosition {
	for _, sp := range startPositions {
230
		if sp.GetKey() != funcutil.ToPhysicalChannel(channel) {
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
			continue
		}
		return &internalpb.MsgPosition{
			ChannelName: channel,
			MsgID:       sp.GetData(),
		}
	}
	return nil
}

// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
	return &datapb.SegmentInfo{
		ID:             info.ID,
		CollectionID:   info.CollectionID,
		PartitionID:    info.PartitionID,
		InsertChannel:  info.InsertChannel,
		NumOfRows:      info.NumOfRows,
		State:          info.State,
		MaxRowNum:      info.MaxRowNum,
		LastExpireTime: info.LastExpireTime,
		StartPosition:  info.StartPosition,
		DmlPosition:    info.DmlPosition,
	}
}

257
// GetCollection returns collection info with specified collection id
J
jaime 已提交
258
func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) *collectionInfo {
259 260 261 262 263 264 265 266 267 268 269 270
	coll := h.s.meta.GetCollection(collectionID)
	if coll != nil {
		return coll
	}
	err := h.s.loadCollectionFromRootCoord(ctx, collectionID)
	if err != nil {
		log.Warn("failed to load collection from rootcoord", zap.Int64("collectionID", collectionID), zap.Error(err))
	}

	return h.s.meta.GetCollection(collectionID)
}

271
// CheckShouldDropChannel returns whether specified channel is marked to be removed
272
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
273 274 275 276 277 278 279 280 281 282 283 284 285
	/*
		segments := h.s.meta.GetSegmentsByChannel(channel)
		for _, segment := range segments {
			if segment.GetStartPosition() != nil && // filter empty segment
				// FIXME: we filter compaction generated segments
				// because datanode may not know the segment due to the network lag or
				// datacoord crash when handling CompleteCompaction.
				// FIXME: cancel this limitation for #12265
				// need to change a unified DropAndFlush to solve the root problem
				//len(segment.CompactionFrom) == 0 &&
				segment.GetState() != commonpb.SegmentState_Dropped {
				return false
			}
286
		}
287
		return false*/
288
	return h.s.meta.catalog.IsChannelDropped(h.s.ctx, channel)
289 290
}

291 292
// FinishDropChannel cleans up the remove flag for channels
// this function is a wrapper of server.meta.FinishDropChannel
293
func (h *ServerHandler) FinishDropChannel(channel string) {
294
	h.s.meta.catalog.DropChannel(h.s.ctx, channel)
295
}