未验证 提交 d525cacb 编写于 作者: C congqixia 提交者: GitHub

Remove empty delta flow graph (#20076)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 d8ca56e0
......@@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/util/funcutil"
)
// dataSyncService manages a lot of flow graphs
......@@ -246,6 +247,50 @@ func (dsService *dataSyncService) removeFlowGraphsByDeltaChannels(channels []Cha
}
}
// removeEmptyFlowGraphByChannel would remove delta flow graph if no seal segment exists in meta.
// *segment shall be added into meta before add flow graph
// *release sealed segment shall always come after load completed
func (dsService *dataSyncService) removeEmptyFlowGraphByChannel(collectionID int64, channel string) {
log := log.With(
zap.Int64("collectionID", collectionID),
zap.String("channel", channel),
)
dsService.mu.Lock()
defer dsService.mu.Unlock()
// convert dml channel name to delta channel name
dc, err := funcutil.ConvertChannelName(channel, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
if err != nil {
log.Warn("removeEmptyFGByDelta failed to convert channel to delta", zap.Error(err))
return
}
// check flow graph exists first
fg, ok := dsService.deltaChannel2FlowGraph[dc]
if !ok {
log.Warn("remove delta flowgraph does not exist")
return
}
// get all sealed segments associated with this channel
segments, err := dsService.metaReplica.getSegmentIDsByVChannel(nil, channel, segmentTypeSealed)
if err != nil {
log.Warn("removeEmptyFGByDelta failed to check segments with VChannel", zap.Error(err))
}
// check whether there are still not released segments
if len(segments) > 0 {
return
}
// start to release flow graph
log.Info("all segments released, start to remove deltaChannel flowgraph")
fg.close()
delete(dsService.deltaChannel2FlowGraph, dc)
dsService.tSafeReplica.removeTSafe(dc)
rateCol.removeTSafeChannel(dc)
}
// newDataSyncService returns a new dataSyncService
func newDataSyncService(ctx context.Context,
metaReplica ReplicaInterface,
......
......@@ -18,9 +18,13 @@ package querynode
import (
"context"
"fmt"
"testing"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
func TestDataSyncService_DMLFlowGraphs(t *testing.T) {
......@@ -199,3 +203,83 @@ func TestDataSyncService_checkReplica(t *testing.T) {
dataSyncService.tSafeReplica.addTSafe(defaultDMLChannel)
})
}
type DataSyncServiceSuite struct {
suite.Suite
factory dependency.Factory
dsService *dataSyncService
}
func (s *DataSyncServiceSuite) SetupSuite() {
s.factory = genFactory()
}
func (s *DataSyncServiceSuite) SetupTest() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
replica, err := genSimpleReplica()
s.Require().NoError(err)
tSafe := newTSafeReplica()
s.dsService = newDataSyncService(ctx, replica, tSafe, s.factory)
s.Require().NoError(err)
}
func (s *DataSyncServiceSuite) TearDownTest() {
s.dsService.close()
s.dsService = nil
}
func (s *DataSyncServiceSuite) TestRemoveEmptyFlowgraphByChannel() {
s.Run("non existing channel", func() {
s.Assert().NotPanics(func() {
channelName := fmt.Sprintf("%s_%d_1", Params.CommonCfg.RootCoordDml, defaultCollectionID)
s.dsService.removeEmptyFlowGraphByChannel(defaultCollectionID, channelName)
})
})
s.Run("bad format channel", func() {
s.Assert().NotPanics(func() {
s.dsService.removeEmptyFlowGraphByChannel(defaultCollectionID, "")
})
})
s.Run("non-empty flowgraph", func() {
channelName := fmt.Sprintf("%s_%d_1", Params.CommonCfg.RootCoordDml, defaultCollectionID)
deltaChannelName, err := funcutil.ConvertChannelName(channelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
s.Require().NoError(err)
err = s.dsService.metaReplica.addSegment(defaultSegmentID, defaultPartitionID, defaultCollectionID, channelName, defaultSegmentVersion, segmentTypeSealed)
s.Require().NoError(err)
_, err = s.dsService.addFlowGraphsForDeltaChannels(defaultCollectionID, []string{deltaChannelName})
s.Require().NoError(err)
s.Assert().NotPanics(func() {
s.dsService.removeEmptyFlowGraphByChannel(defaultCollectionID, channelName)
})
_, err = s.dsService.getFlowGraphByDeltaChannel(defaultCollectionID, deltaChannelName)
s.Assert().NoError(err)
})
s.Run("empty flowgraph", func() {
channelName := fmt.Sprintf("%s_%d_2", Params.CommonCfg.RootCoordDml, defaultCollectionID)
deltaChannelName, err := funcutil.ConvertChannelName(channelName, Params.CommonCfg.RootCoordDml, Params.CommonCfg.RootCoordDelta)
s.Require().NoError(err)
_, err = s.dsService.addFlowGraphsForDeltaChannels(defaultCollectionID, []string{deltaChannelName})
s.Require().NoError(err)
s.Assert().NotPanics(func() {
s.dsService.removeEmptyFlowGraphByChannel(defaultCollectionID, channelName)
})
_, err = s.dsService.getFlowGraphByDeltaChannel(defaultCollectionID, deltaChannelName)
s.Assert().Error(err)
})
}
func TestDataSyncServiceSuite(t *testing.T) {
suite.Run(t, new(DataSyncServiceSuite))
}
......@@ -599,6 +599,9 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *querypb.ReleaseS
}
}
// note that argument is dmlchannel name
node.dataSyncService.removeEmptyFlowGraphByChannel(in.GetCollectionID(), in.GetShard())
log.Info("release segments done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs), zap.String("Scope", in.GetScope().String()))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册