handler.go 5.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
package datacoord

import (
	"context"

	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/rootcoord"
	"go.uber.org/zap"
)

14
// Handler handles some channel method for ChannelManager
15
type Handler interface {
16
	// GetVChanPositions gets the information recovery needed of a channel
17 18
	GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo
	CheckShouldDropChannel(channel string) bool
19
	FinishDropChannel(channel string)
20 21 22 23 24 25 26
}

// Handler is a helper of Server
type ServerHandler struct {
	s *Server
}

27
// newServerHandler creates a new ServerHandler
28 29 30 31
func newServerHandler(s *Server) *ServerHandler {
	return &ServerHandler{s: s}
}

32
// GetVChanPositions gets vchannel latest postitions with provided dml channel names
33
func (h *ServerHandler) GetVChanPositions(channel string, collectionID UniqueID, partitionID UniqueID) *datapb.VchannelInfo {
34 35 36 37
	// cannot use GetSegmentsByChannel since dropped segments are needed here
	segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
		return s.InsertChannel == channel
	})
38 39 40 41 42 43 44
	log.Debug("GetSegmentsByChannel",
		zap.Any("collectionID", collectionID),
		zap.Any("channel", channel),
		zap.Any("numOfSegments", len(segments)),
	)
	var flushed []*datapb.SegmentInfo
	var unflushed []*datapb.SegmentInfo
45
	var dropped []*datapb.SegmentInfo
46 47 48 49 50 51 52
	var seekPosition *internalpb.MsgPosition
	for _, s := range segments {
		if (partitionID > allPartitionID && s.PartitionID != partitionID) ||
			(s.GetStartPosition() == nil && s.GetDmlPosition() == nil) {
			continue
		}

53
		if s.GetState() == commonpb.SegmentState_Dropped {
54
			dropped = append(dropped, trimSegmentInfo(s.SegmentInfo))
55 56 57
			continue
		}

58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
		if s.GetState() == commonpb.SegmentState_Flushing || s.GetState() == commonpb.SegmentState_Flushed {
			flushed = append(flushed, trimSegmentInfo(s.SegmentInfo))
		} else {
			unflushed = append(unflushed, s.SegmentInfo)
		}

		var segmentPosition *internalpb.MsgPosition
		if s.GetDmlPosition() != nil {
			segmentPosition = s.GetDmlPosition()
		} else {
			segmentPosition = s.GetStartPosition()
		}

		if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp {
			seekPosition = segmentPosition
		}
	}
	// use collection start position when segment position is not found
	if seekPosition == nil {
		collection := h.GetCollection(h.s.ctx, collectionID)
		if collection != nil {
			seekPosition = getCollectionStartPosition(channel, collection)
		}
	}

	return &datapb.VchannelInfo{
		CollectionID:      collectionID,
		ChannelName:       channel,
		SeekPosition:      seekPosition,
		FlushedSegments:   flushed,
		UnflushedSegments: unflushed,
89
		DroppedSegments:   dropped,
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
	}
}

func getCollectionStartPosition(channel string, collectionInfo *datapb.CollectionInfo) *internalpb.MsgPosition {
	for _, sp := range collectionInfo.GetStartPositions() {
		if sp.GetKey() != rootcoord.ToPhysicalChannel(channel) {
			continue
		}
		return &internalpb.MsgPosition{
			ChannelName: channel,
			MsgID:       sp.GetData(),
		}
	}
	return nil
}

// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
	return &datapb.SegmentInfo{
		ID:             info.ID,
		CollectionID:   info.CollectionID,
		PartitionID:    info.PartitionID,
		InsertChannel:  info.InsertChannel,
		NumOfRows:      info.NumOfRows,
		State:          info.State,
		MaxRowNum:      info.MaxRowNum,
		LastExpireTime: info.LastExpireTime,
		StartPosition:  info.StartPosition,
		DmlPosition:    info.DmlPosition,
	}
}

122
// GetCollection returns collection info with specified collection id
123 124 125 126 127 128 129 130 131 132 133 134 135
func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) *datapb.CollectionInfo {
	coll := h.s.meta.GetCollection(collectionID)
	if coll != nil {
		return coll
	}
	err := h.s.loadCollectionFromRootCoord(ctx, collectionID)
	if err != nil {
		log.Warn("failed to load collection from rootcoord", zap.Int64("collectionID", collectionID), zap.Error(err))
	}

	return h.s.meta.GetCollection(collectionID)
}

136
// CheckShouldDropChannel returns whether specified channel is marked to be removed
137
func (h *ServerHandler) CheckShouldDropChannel(channel string) bool {
138 139 140 141 142 143 144 145 146 147 148 149 150
	/*
		segments := h.s.meta.GetSegmentsByChannel(channel)
		for _, segment := range segments {
			if segment.GetStartPosition() != nil && // filter empty segment
				// FIXME: we filter compaction generated segments
				// because datanode may not know the segment due to the network lag or
				// datacoord crash when handling CompleteCompaction.
				// FIXME: cancel this limitation for #12265
				// need to change a unified DropAndFlush to solve the root problem
				//len(segment.CompactionFrom) == 0 &&
				segment.GetState() != commonpb.SegmentState_Dropped {
				return false
			}
151
		}
152 153 154 155
		return false*/
	return h.s.meta.ChannelHasRemoveFlag(channel)
}

156 157
// FinishDropChannel cleans up the remove flag for channels
// this function is a wrapper of server.meta.FinishDropChannel
158 159
func (h *ServerHandler) FinishDropChannel(channel string) {
	h.s.meta.FinishRemoveChannel(channel)
160
}