handler.go 10.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

17 18 19 20 21
package datacoord

import (
	"context"

S
SimFG 已提交
22
	"github.com/milvus-io/milvus/api/commonpb"
23 24 25
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
26
	"github.com/milvus-io/milvus/internal/util/funcutil"
27
	"github.com/milvus-io/milvus/internal/util/typeutil"
G
godchen 已提交
28
	"go.uber.org/zap"
29 30
)

31
// Handler handles some channel method for ChannelManager
32
type Handler interface {
X
XuanYang-cn 已提交
33 34 35 36
	// GetQueryVChanPositions gets the information recovery needed of a channel for QueryCoord
	GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
	// GetDataVChanPositions gets the information recovery needed of a channel for DataNode
	GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
37
	CheckShouldDropChannel(channel string) bool
38
	FinishDropChannel(channel string)
39
	GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
40 41
}

42
// ServerHandler is a helper of Server
43 44 45 46
type ServerHandler struct {
	s *Server
}

47
// newServerHandler creates a new ServerHandler
48 49 50 51
func newServerHandler(s *Server) *ServerHandler {
	return &ServerHandler{s: s}
}

X
XuanYang-cn 已提交
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
// GetDataVChanPositions gets vchannel latest postitions with provided dml channel names for DataNode.
func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
		return s.InsertChannel == channel.Name
	})
	log.Info("GetDataVChanPositions",
		zap.Int64("collectionID", channel.CollectionID),
		zap.String("channel", channel.Name),
		zap.Int("numOfSegments", len(segments)),
	)
	var (
		flushedIDs   = make(typeutil.UniqueSet)
		unflushedIDs = make(typeutil.UniqueSet)
		droppedIDs   = make(typeutil.UniqueSet)
		seekPosition *internalpb.MsgPosition
	)
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
		if s.GetIsImporting() {
			// Skip bulk load segments.
			continue
		}

		if s.GetState() == commonpb.SegmentState_Dropped {
			droppedIDs.Insert(s.GetID())
			continue
		} else if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed {
			flushedIDs.Insert(s.GetID())
		} else {
			unflushedIDs.Insert(s.GetID())
		}

		var segmentPosition *internalpb.MsgPosition
		if s.GetDmlPosition() != nil {
			segmentPosition = s.GetDmlPosition()
		} else {
			segmentPosition = s.GetStartPosition()
		}
		if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp {
			seekPosition = segmentPosition
		}
	}

	// use collection start position when segment position is not found
	if seekPosition == nil {
		if channel.StartPositions == nil {
101 102
			collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
			if collection != nil && err == nil {
X
XuanYang-cn 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
				seekPosition = getCollectionStartPosition(channel.Name, collection)
			}
		} else {
			// use passed start positions, skip to ask rootcoord.
			seekPosition = toMsgPosition(channel.Name, channel.StartPositions)
		}
	}

	return &datapb.VchannelInfo{
		CollectionID:        channel.CollectionID,
		ChannelName:         channel.Name,
		SeekPosition:        seekPosition,
		FlushedSegmentIds:   flushedIDs.Collect(),
		UnflushedSegmentIds: unflushedIDs.Collect(),
		DroppedSegmentIds:   droppedIDs.Collect(),
	}
}

// GetQueryVChanPositions gets vchannel latest postitions with provided dml channel names for QueryCoord,
122 123
// we expect QueryCoord gets the indexed segments to load, so the flushed segments below are actually the indexed segments,
// the unflushed segments are actually the segments without index, even they are flushed.
X
XuanYang-cn 已提交
124
func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
125 126
	// cannot use GetSegmentsByChannel since dropped segments are needed here
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
127
		return s.InsertChannel == channel.Name
128
	})
129
	segmentInfos := make(map[int64]*SegmentInfo)
130
	indexedSegments := FilterInIndexedSegments(h, h.s.indexCoord, segments...)
131 132 133 134
	indexed := make(typeutil.UniqueSet)
	for _, segment := range indexedSegments {
		indexed.Insert(segment.GetID())
	}
X
XuanYang-cn 已提交
135 136 137 138
	log.Info("GetQueryVChanPositions",
		zap.Int64("collectionID", channel.CollectionID),
		zap.String("channel", channel.Name),
		zap.Int("numOfSegments", len(segments)),
139
	)
140
	var (
141 142 143
		indexedIDs   = make(typeutil.UniqueSet)
		unIndexedIDs = make(typeutil.UniqueSet)
		droppedIDs   = make(typeutil.UniqueSet)
144 145
		seekPosition *internalpb.MsgPosition
	)
146 147 148 149 150
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
151 152 153 154
		if s.GetIsImporting() {
			// Skip bulk load segments.
			continue
		}
155
		segmentInfos[s.GetID()] = s
156
		if s.GetState() == commonpb.SegmentState_Dropped {
157
			droppedIDs.Insert(s.GetID())
158
		} else if indexed.Contain(s.GetID()) {
159
			indexedIDs.Insert(s.GetID())
160
		} else {
161
			unIndexedIDs.Insert(s.GetID())
162 163
		}
	}
164
	for id := range unIndexedIDs {
165 166
		// Indexed segments are compacted to a raw segment,
		// replace it with the indexed ones
167
		if len(segmentInfos[id].GetCompactionFrom()) > 0 &&
168
			indexed.Contain(segmentInfos[id].GetCompactionFrom()...) {
169 170 171
			unIndexedIDs.Remove(id)
			indexedIDs.Insert(segmentInfos[id].GetCompactionFrom()...)
			droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
172
		}
173
	}
174

175
	for id := range indexedIDs {
176 177 178 179
		var segmentPosition *internalpb.MsgPosition
		segment := segmentInfos[id]
		if segment.GetDmlPosition() != nil {
			segmentPosition = segment.GetDmlPosition()
180
		} else {
181
			segmentPosition = segment.GetStartPosition()
182 183
		}

184 185 186 187
		if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp {
			seekPosition = segmentPosition
		}
	}
188
	for id := range unIndexedIDs {
189
		var segmentPosition *internalpb.MsgPosition
190 191 192
		segment := segmentInfos[id]
		if segment.GetDmlPosition() != nil {
			segmentPosition = segment.GetDmlPosition()
193
		} else {
194
			segmentPosition = segment.GetStartPosition()
195 196 197 198 199 200
		}

		if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp {
			seekPosition = segmentPosition
		}
	}
201

202 203
	// use collection start position when segment position is not found
	if seekPosition == nil {
204
		if channel.StartPositions == nil {
205 206
			collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
			if collection != nil && err == nil {
207 208 209 210 211
				seekPosition = getCollectionStartPosition(channel.Name, collection)
			}
		} else {
			// use passed start positions, skip to ask rootcoord.
			seekPosition = toMsgPosition(channel.Name, channel.StartPositions)
212 213 214 215
		}
	}

	return &datapb.VchannelInfo{
216 217
		CollectionID:        channel.CollectionID,
		ChannelName:         channel.Name,
218
		SeekPosition:        seekPosition,
219 220 221
		FlushedSegmentIds:   indexedIDs.Collect(),
		UnflushedSegmentIds: unIndexedIDs.Collect(),
		DroppedSegmentIds:   droppedIDs.Collect(),
222 223 224
	}
}

J
jaime 已提交
225 226
func getCollectionStartPosition(channel string, collectionInfo *collectionInfo) *internalpb.MsgPosition {
	return toMsgPosition(channel, collectionInfo.StartPositions)
227 228 229 230
}

func toMsgPosition(channel string, startPositions []*commonpb.KeyDataPair) *internalpb.MsgPosition {
	for _, sp := range startPositions {
231
		if sp.GetKey() != funcutil.ToPhysicalChannel(channel) {
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
			continue
		}
		return &internalpb.MsgPosition{
			ChannelName: channel,
			MsgID:       sp.GetData(),
		}
	}
	return nil
}

// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
	return &datapb.SegmentInfo{
		ID:             info.ID,
		CollectionID:   info.CollectionID,
		PartitionID:    info.PartitionID,
		InsertChannel:  info.InsertChannel,
		NumOfRows:      info.NumOfRows,
		State:          info.State,
		MaxRowNum:      info.MaxRowNum,
		LastExpireTime: info.LastExpireTime,
		StartPosition:  info.StartPosition,
		DmlPosition:    info.DmlPosition,
	}
}

258
// GetCollection returns collection info with specified collection id
259
func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) {
260 261
	coll := h.s.meta.GetCollection(collectionID)
	if coll != nil {
262
		return coll, nil
263 264 265 266
	}
	err := h.s.loadCollectionFromRootCoord(ctx, collectionID)
	if err != nil {
		log.Warn("failed to load collection from rootcoord", zap.Int64("collectionID", collectionID), zap.Error(err))
267
		return nil, err
268 269
	}

270
	return h.s.meta.GetCollection(collectionID), nil
271 272
}

273
// CheckShouldDropChannel returns whether specified channel is marked to be removed
274
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
275 276 277 278 279 280 281 282 283 284 285 286 287
	/*
		segments := h.s.meta.GetSegmentsByChannel(channel)
		for _, segment := range segments {
			if segment.GetStartPosition() != nil && // filter empty segment
				// FIXME: we filter compaction generated segments
				// because datanode may not know the segment due to the network lag or
				// datacoord crash when handling CompleteCompaction.
				// FIXME: cancel this limitation for #12265
				// need to change a unified DropAndFlush to solve the root problem
				//len(segment.CompactionFrom) == 0 &&
				segment.GetState() != commonpb.SegmentState_Dropped {
				return false
			}
288
		}
289
		return false*/
290
	return h.s.meta.catalog.IsChannelDropped(h.s.ctx, channel)
291 292
}

293 294
// FinishDropChannel cleans up the remove flag for channels
// this function is a wrapper of server.meta.FinishDropChannel
295
func (h *ServerHandler) FinishDropChannel(channel string) {
296
	h.s.meta.catalog.DropChannel(h.s.ctx, channel)
297
}