handler.go 10.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

17 18 19 20 21
package datacoord

import (
	"context"

S
SimFG 已提交
22
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
23 24 25
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
26
	"github.com/milvus-io/milvus/internal/util/funcutil"
27
	"github.com/milvus-io/milvus/internal/util/tsoutil"
28
	"github.com/milvus-io/milvus/internal/util/typeutil"
G
godchen 已提交
29
	"go.uber.org/zap"
30 31
)

32
// Handler handles some channel method for ChannelManager
33
type Handler interface {
X
XuanYang-cn 已提交
34 35 36 37
	// GetQueryVChanPositions gets the information recovery needed of a channel for QueryCoord
	GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
	// GetDataVChanPositions gets the information recovery needed of a channel for DataNode
	GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
38
	CheckShouldDropChannel(channel string) bool
39
	FinishDropChannel(channel string) error
40
	GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
41 42
}

43
// ServerHandler is a helper of Server
44 45 46 47
type ServerHandler struct {
	s *Server
}

48
// newServerHandler creates a new ServerHandler
49 50 51 52
func newServerHandler(s *Server) *ServerHandler {
	return &ServerHandler{s: s}
}

X
XuanYang-cn 已提交
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
// GetDataVChanPositions gets vchannel latest postitions with provided dml channel names for DataNode.
func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
		return s.InsertChannel == channel.Name
	})
	log.Info("GetDataVChanPositions",
		zap.Int64("collectionID", channel.CollectionID),
		zap.String("channel", channel.Name),
		zap.Int("numOfSegments", len(segments)),
	)
	var (
		flushedIDs   = make(typeutil.UniqueSet)
		unflushedIDs = make(typeutil.UniqueSet)
		droppedIDs   = make(typeutil.UniqueSet)
	)
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
		if s.GetIsImporting() {
G
groot 已提交
74
			// Skip bulk insert segments.
X
XuanYang-cn 已提交
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
			continue
		}

		if s.GetState() == commonpb.SegmentState_Dropped {
			droppedIDs.Insert(s.GetID())
			continue
		} else if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed {
			flushedIDs.Insert(s.GetID())
		} else {
			unflushedIDs.Insert(s.GetID())
		}
	}

	return &datapb.VchannelInfo{
		CollectionID:        channel.CollectionID,
		ChannelName:         channel.Name,
91
		SeekPosition:        h.getChannelCheckpoint(channel),
X
XuanYang-cn 已提交
92 93 94 95 96 97 98
		FlushedSegmentIds:   flushedIDs.Collect(),
		UnflushedSegmentIds: unflushedIDs.Collect(),
		DroppedSegmentIds:   droppedIDs.Collect(),
	}
}

// GetQueryVChanPositions gets vchannel latest postitions with provided dml channel names for QueryCoord,
99 100
// we expect QueryCoord gets the indexed segments to load, so the flushed segments below are actually the indexed segments,
// the unflushed segments are actually the segments without index, even they are flushed.
X
XuanYang-cn 已提交
101
func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo {
102 103
	// cannot use GetSegmentsByChannel since dropped segments are needed here
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
104
		return s.InsertChannel == channel.Name
105
	})
106
	segmentInfos := make(map[int64]*SegmentInfo)
107
	indexedSegments := FilterInIndexedSegments(h, h.s.indexCoord, segments...)
108 109 110 111
	indexed := make(typeutil.UniqueSet)
	for _, segment := range indexedSegments {
		indexed.Insert(segment.GetID())
	}
X
XuanYang-cn 已提交
112 113 114 115
	log.Info("GetQueryVChanPositions",
		zap.Int64("collectionID", channel.CollectionID),
		zap.String("channel", channel.Name),
		zap.Int("numOfSegments", len(segments)),
116
	)
117
	var (
118 119 120
		indexedIDs   = make(typeutil.UniqueSet)
		unIndexedIDs = make(typeutil.UniqueSet)
		droppedIDs   = make(typeutil.UniqueSet)
121
	)
122 123 124 125 126
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}
127
		if s.GetIsImporting() {
G
groot 已提交
128
			// Skip bulk insert segments.
129 130
			continue
		}
131
		segmentInfos[s.GetID()] = s
132
		if s.GetState() == commonpb.SegmentState_Dropped {
133
			droppedIDs.Insert(s.GetID())
134
		} else if indexed.Contain(s.GetID()) {
135
			indexedIDs.Insert(s.GetID())
136
		} else {
137
			unIndexedIDs.Insert(s.GetID())
138 139
		}
	}
140
	for id := range unIndexedIDs {
141 142
		// Indexed segments are compacted to a raw segment,
		// replace it with the indexed ones
143
		if len(segmentInfos[id].GetCompactionFrom()) > 0 &&
144
			indexed.Contain(segmentInfos[id].GetCompactionFrom()...) {
145 146 147
			unIndexedIDs.Remove(id)
			indexedIDs.Insert(segmentInfos[id].GetCompactionFrom()...)
			droppedIDs.Remove(segmentInfos[id].GetCompactionFrom()...)
148
		}
149
	}
150

151 152 153 154 155 156 157
	return &datapb.VchannelInfo{
		CollectionID:        channel.CollectionID,
		ChannelName:         channel.Name,
		SeekPosition:        h.getChannelCheckpoint(channel),
		FlushedSegmentIds:   indexedIDs.Collect(),
		UnflushedSegmentIds: unIndexedIDs.Collect(),
		DroppedSegmentIds:   droppedIDs.Collect(),
158
	}
159
}
160

161 162 163 164 165 166 167 168 169 170
func (h *ServerHandler) getChannelCheckpoint(channel *channel) *internalpb.MsgPosition {
	seekPosition := h.s.meta.GetChannelCheckpoint(channel.Name)
	if seekPosition != nil {
		log.Info("channel seek position set from ChannelCP",
			zap.String("channel", channel.Name),
			zap.Uint64("position timestamp", seekPosition.Timestamp),
			zap.Time("realworld position timestamp", tsoutil.PhysicalTime(seekPosition.GetTimestamp())),
		)
	} else {
		// use collection start position when segment position is not found
171
		if channel.StartPositions == nil {
172 173
			collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
			if collection != nil && err == nil {
174 175
				seekPosition = getCollectionStartPosition(channel.Name, collection)
			}
176 177 178 179 180
			log.Info("NEITHER segment position or channel start position are found, setting channel seek position to collection start position",
				zap.String("channel", channel.Name),
				zap.Uint64("position timestamp", seekPosition.GetTimestamp()),
				zap.Time("realworld position timestamp", tsoutil.PhysicalTime(seekPosition.GetTimestamp())),
			)
181
		} else {
182
			// use passed start positions, skip to ask RootCoord.
183
			seekPosition = toMsgPosition(channel.Name, channel.StartPositions)
184 185 186 187 188
			log.Info("segment position not found, setting channel seek position to channel start position",
				zap.String("channel", channel.Name),
				zap.Uint64("position timestamp", seekPosition.GetTimestamp()),
				zap.Time("realworld position timestamp", tsoutil.PhysicalTime(seekPosition.GetTimestamp())),
			)
189 190
		}
	}
191
	return seekPosition
192 193
}

J
jaime 已提交
194 195
func getCollectionStartPosition(channel string, collectionInfo *collectionInfo) *internalpb.MsgPosition {
	return toMsgPosition(channel, collectionInfo.StartPositions)
196 197 198 199
}

func toMsgPosition(channel string, startPositions []*commonpb.KeyDataPair) *internalpb.MsgPosition {
	for _, sp := range startPositions {
200
		if sp.GetKey() != funcutil.ToPhysicalChannel(channel) {
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
			continue
		}
		return &internalpb.MsgPosition{
			ChannelName: channel,
			MsgID:       sp.GetData(),
		}
	}
	return nil
}

// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
	return &datapb.SegmentInfo{
		ID:             info.ID,
		CollectionID:   info.CollectionID,
		PartitionID:    info.PartitionID,
		InsertChannel:  info.InsertChannel,
		NumOfRows:      info.NumOfRows,
		State:          info.State,
		MaxRowNum:      info.MaxRowNum,
		LastExpireTime: info.LastExpireTime,
		StartPosition:  info.StartPosition,
		DmlPosition:    info.DmlPosition,
	}
}

227
// GetCollection returns collection info with specified collection id
228
func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) {
229 230
	coll := h.s.meta.GetCollection(collectionID)
	if coll != nil {
231
		return coll, nil
232 233 234 235
	}
	err := h.s.loadCollectionFromRootCoord(ctx, collectionID)
	if err != nil {
		log.Warn("failed to load collection from rootcoord", zap.Int64("collectionID", collectionID), zap.Error(err))
236
		return nil, err
237 238
	}

239
	return h.s.meta.GetCollection(collectionID), nil
240 241
}

242
// CheckShouldDropChannel returns whether specified channel is marked to be removed
243
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
244 245 246 247 248 249 250 251 252 253 254 255 256
	/*
		segments := h.s.meta.GetSegmentsByChannel(channel)
		for _, segment := range segments {
			if segment.GetStartPosition() != nil && // filter empty segment
				// FIXME: we filter compaction generated segments
				// because datanode may not know the segment due to the network lag or
				// datacoord crash when handling CompleteCompaction.
				// FIXME: cancel this limitation for #12265
				// need to change a unified DropAndFlush to solve the root problem
				//len(segment.CompactionFrom) == 0 &&
				segment.GetState() != commonpb.SegmentState_Dropped {
				return false
			}
257
		}
258
		return false*/
259
	return h.s.meta.catalog.IsChannelDropped(h.s.ctx, channel)
260 261
}

262 263
// FinishDropChannel cleans up the remove flag for channels
// this function is a wrapper of server.meta.FinishDropChannel
264 265 266 267 268 269 270 271 272 273 274 275
func (h *ServerHandler) FinishDropChannel(channel string) error {
	err := h.s.meta.catalog.DropChannel(h.s.ctx, channel)
	if err != nil {
		log.Warn("DropChannel failed", zap.String("vChannel", channel), zap.Error(err))
		return err
	}
	err = h.s.meta.DropChannelCheckpoint(channel)
	if err != nil {
		log.Warn("DropChannelCheckpoint failed", zap.String("vChannel", channel), zap.Error(err))
		return err
	}
	return nil
276
}