未验证 提交 15ae68fa 编写于 作者: Y yah01 提交者: GitHub

Refresh targets atomically (#20666)

Signed-off-by: Nyah01 <yang.cen@zilliz.com>
Signed-off-by: Nyah01 <yang.cen@zilliz.com>
上级 29178567
......@@ -46,7 +46,11 @@ func (mgr *TargetManager) RemoveCollection(collectionID int64) {
mgr.rwmutex.Lock()
defer mgr.rwmutex.Unlock()
log.Info("remove collection from targets")
mgr.removeCollection(collectionID)
}
func (mgr *TargetManager) removeCollection(collectionID int64) {
log.Info("remove collection from targets", zap.Int64("collectionID", collectionID))
for _, segment := range mgr.segments {
if segment.CollectionID == collectionID {
mgr.removeSegment(segment.GetID())
......@@ -83,7 +87,15 @@ func (mgr *TargetManager) RemoveSegment(segmentID int64) {
func (mgr *TargetManager) removeSegment(segmentID int64) {
delete(mgr.segments, segmentID)
log.Info("segment removed from targets", zap.Int64("segment", segmentID))
log.Info("segment removed from targets", zap.Int64("segmentID", segmentID))
}
func (mgr *TargetManager) Replace(collectionID int64, channels []*DmChannel, segments []*datapb.SegmentInfo) {
mgr.rwmutex.Lock()
defer mgr.rwmutex.Unlock()
mgr.addDmChannel(channels...)
mgr.addSegment(segments...)
}
// AddSegment adds segment into target set,
......@@ -147,6 +159,10 @@ func (mgr *TargetManager) AddDmChannel(channels ...*DmChannel) {
mgr.rwmutex.Lock()
defer mgr.rwmutex.Unlock()
mgr.addDmChannel(channels...)
}
func (mgr *TargetManager) addDmChannel(channels ...*DmChannel) {
for _, channel := range channels {
ts := channel.GetSeekPosition().GetTimestamp()
log.Info("add channel into targets",
......
......@@ -180,7 +180,6 @@ func (ob *CollectionObserver) refreshTargets(updatedAt time.Time, collectionID i
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ob.targetMgr.RemoveCollection(collectionID)
ob.handoffOb.Unregister(ctx, collectionID)
if len(partitions) == 0 {
......@@ -193,12 +192,12 @@ func (ob *CollectionObserver) refreshTargets(updatedAt time.Time, collectionID i
}
ob.handoffOb.Register(collectionID)
utils.RegisterTargets(ctx,
ob.targetMgr,
ob.broker,
collectionID,
partitions,
)
channels, segments, err := utils.FetchTargets(ctx, ob.targetMgr, ob.broker, collectionID, partitions)
if err != nil {
log.Warn("failed to fetch targets from DataCoord, will refresh targets later", zap.Error(err))
return false
}
ob.targetMgr.Replace(collectionID, channels, segments)
ob.refreshed[collectionID] = updatedAt
return true
......
......@@ -131,8 +131,27 @@ func SpawnReplicas(replicaMgr *meta.ReplicaManager, nodeMgr *session.NodeManager
func RegisterTargets(ctx context.Context,
targetMgr *meta.TargetManager,
broker meta.Broker,
collection int64, partitions []int64) error {
dmChannels := make(map[string][]*datapb.VchannelInfo)
collection int64,
partitions []int64,
) error {
channels, segments, err := FetchTargets(ctx, targetMgr, broker, collection, partitions)
if err != nil {
return err
}
targetMgr.AddDmChannel(channels...)
targetMgr.AddSegment(segments...)
return nil
}
func FetchTargets(ctx context.Context,
targetMgr *meta.TargetManager,
broker meta.Broker,
collection int64,
partitions []int64,
) ([]*meta.DmChannel, []*datapb.SegmentInfo, error) {
channels := make(map[string][]*datapb.VchannelInfo)
segments := make([]*datapb.SegmentInfo, 0)
for _, partitionID := range partitions {
log.Debug("get recovery info...",
......@@ -140,12 +159,12 @@ func RegisterTargets(ctx context.Context,
zap.Int64("partitionID", partitionID))
vChannelInfos, binlogs, err := broker.GetRecoveryInfo(ctx, collection, partitionID)
if err != nil {
return err
return nil, nil, err
}
// Register segments
for _, segmentBinlogs := range binlogs {
targetMgr.AddSegment(SegmentBinlogs2SegmentInfo(
segments = append(segments, SegmentBinlogs2SegmentInfo(
collection,
partitionID,
segmentBinlogs))
......@@ -153,13 +172,14 @@ func RegisterTargets(ctx context.Context,
for _, info := range vChannelInfos {
channelName := info.GetChannelName()
dmChannels[channelName] = append(dmChannels[channelName], info)
channels[channelName] = append(channels[channelName], info)
}
}
// Merge and register channels
for _, channels := range dmChannels {
dmChannels := make([]*meta.DmChannel, 0, len(channels))
for _, channels := range channels {
dmChannel := MergeDmChannelInfo(channels)
targetMgr.AddDmChannel(dmChannel)
dmChannels = append(dmChannels, dmChannel)
}
return nil
return dmChannels, segments, nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册