From 23900952321ca69034ae9acabb378150c1f92c35 Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Thu, 17 Nov 2022 20:37:10 +0800 Subject: [PATCH] Fix load uses compacted segments' binlogs (#20655) Signed-off-by: sunby Signed-off-by: sunby Co-authored-by: sunby --- internal/datacoord/compaction.go | 5 +- internal/datacoord/meta.go | 59 +++++++++--- internal/datacoord/meta_test.go | 43 ++++----- internal/datacoord/mock_test.go | 2 +- internal/datacoord/segment_manager_test.go | 4 +- internal/datacoord/server.go | 7 +- internal/datanode/compactor.go | 102 ++++++++++++++++++++- internal/datanode/compactor_test.go | 4 +- internal/datanode/data_node.go | 1 + internal/datanode/data_sync_service.go | 3 +- internal/datanode/flush_manager.go | 2 +- internal/datanode/flush_manager_test.go | 7 -- 12 files changed, 183 insertions(+), 56 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 282f07310..07000650c 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -249,7 +249,10 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu } func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionResult) error { - oldSegments, modSegments, newSegment := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result) + oldSegments, modSegments, newSegment, err := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result) + if err != nil { + return err + } log := log.With(zap.Int64("planID", plan.GetPlanID())) modInfos := make([]*datapb.SegmentInfo, len(modSegments)) diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index ef379a128..515c8fbc9 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -20,6 +20,7 @@ package datacoord import ( "context" "fmt" + "path" "sync" "time" @@ -36,6 +37,8 @@ import ( "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/metautil" "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" @@ -44,11 +47,12 @@ import ( type meta struct { sync.RWMutex - ctx context.Context - catalog metastore.DataCoordCatalog - collections map[UniqueID]*collectionInfo // collection id to collection info - segments *SegmentsInfo // segment id to segment info - channelCPs map[string]*internalpb.MsgPosition // vChannel -> channel checkpoint/see position + ctx context.Context + catalog metastore.DataCoordCatalog + collections map[UniqueID]*collectionInfo // collection id to collection info + segments *SegmentsInfo // segment id to segment info + channelCPs map[string]*internalpb.MsgPosition // vChannel -> channel checkpoint/see position + chunkManager storage.ChunkManager } type collectionInfo struct { @@ -60,13 +64,14 @@ type collectionInfo struct { } // NewMeta creates meta from provided `kv.TxnKV` -func newMeta(ctx context.Context, kv kv.TxnKV, chunkManagerRootPath string) (*meta, error) { +func newMeta(ctx context.Context, kv kv.TxnKV, chunkManagerRootPath string, chunkManager storage.ChunkManager) (*meta, error) { mt := &meta{ - ctx: ctx, - catalog: &datacoord.Catalog{Txn: kv, ChunkManagerRootPath: chunkManagerRootPath}, - collections: make(map[UniqueID]*collectionInfo), - segments: NewSegmentsInfo(), - channelCPs: make(map[string]*internalpb.MsgPosition), + ctx: ctx, + catalog: &datacoord.Catalog{Txn: kv, ChunkManagerRootPath: chunkManagerRootPath}, + collections: make(map[UniqueID]*collectionInfo), + segments: NewSegmentsInfo(), + channelCPs: make(map[string]*internalpb.MsgPosition), + chunkManager: chunkManager, } err := mt.reloadFromKV() if err != nil { @@ -892,7 +897,7 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) { // - the segment info of compactedFrom segments after compaction to alter // - the segment info of compactedTo segment after compaction to add // The compactedTo segment could contain 0 numRows -func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.CompactionSegmentBinlogs, result *datapb.CompactionResult) ([]*datapb.SegmentInfo, []*SegmentInfo, *SegmentInfo) { +func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.CompactionSegmentBinlogs, result *datapb.CompactionResult) ([]*datapb.SegmentInfo, []*SegmentInfo, *SegmentInfo, error) { log.Info("meta update: prepare for complete compaction mutation") m.Lock() defer m.Unlock() @@ -938,7 +943,11 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac } newAddedDeltalogs := m.updateDeltalogs(originDeltalogs, deletedDeltalogs, nil) - deltalogs := append(result.GetDeltalogs(), newAddedDeltalogs...) + copiedDeltalogs, err := m.copyDeltaFiles(newAddedDeltalogs, modSegments[0].CollectionID, modSegments[0].PartitionID, result.GetSegmentID()) + if err != nil { + return nil, nil, nil, err + } + deltalogs := append(result.GetDeltalogs(), copiedDeltalogs...) compactionFrom := make([]UniqueID, 0, len(modSegments)) for _, s := range modSegments { @@ -970,7 +979,29 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac zap.Int64("new segment num of rows", segment.GetNumOfRows()), zap.Any("compacted from", segment.GetCompactionFrom())) - return oldSegments, modSegments, segment + return oldSegments, modSegments, segment, nil +} + +func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, partitionID, targetSegmentID int64) ([]*datapb.FieldBinlog, error) { + ret := make([]*datapb.FieldBinlog, 0, len(binlogs)) + for _, fieldBinlog := range binlogs { + fieldBinlog = proto.Clone(fieldBinlog).(*datapb.FieldBinlog) + for _, binlog := range fieldBinlog.Binlogs { + blobKey := metautil.JoinIDPath(collectionID, partitionID, targetSegmentID, binlog.LogID) + blobPath := path.Join(m.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey) + blob, err := m.chunkManager.Read(m.ctx, binlog.LogPath) + if err != nil { + return nil, err + } + err = m.chunkManager.Write(m.ctx, blobPath, blob) + if err != nil { + return nil, err + } + binlog.LogPath = blobPath + } + ret = append(ret, fieldBinlog) + } + return ret, nil } func (m *meta) alterMetaStoreAfterCompaction(modSegments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error { diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index 14aeb01e5..e68ec607c 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -187,55 +187,55 @@ func (mek *mockKvIllegalStatslog) LoadWithPrefix(key string) ([]string, []string func TestMetaReloadFromKV(t *testing.T) { t.Run("Test ReloadFromKV success", func(t *testing.T) { fkv := &mockEtcdKv{} - _, err := newMeta(context.TODO(), fkv, "") + _, err := newMeta(context.TODO(), fkv, "", nil) assert.Nil(t, err) }) // load segment error t.Run("Test ReloadFromKV load segment fails", func(t *testing.T) { fkv := &mockKvLoadSegmentError{} - _, err := newMeta(context.TODO(), fkv, "") + _, err := newMeta(context.TODO(), fkv, "", nil) assert.NotNil(t, err) }) // illegal segment info t.Run("Test ReloadFromKV unmarshal segment fails", func(t *testing.T) { fkv := &mockKvIllegalSegment{} - _, err := newMeta(context.TODO(), fkv, "") + _, err := newMeta(context.TODO(), fkv, "", nil) assert.NotNil(t, err) }) // load binlog/deltalog/statslog error t.Run("Test ReloadFromKV load binlog fails", func(t *testing.T) { fkv := &mockKvLoadBinlogError{} - _, err := newMeta(context.TODO(), fkv, "") + _, err := newMeta(context.TODO(), fkv, "", nil) assert.NotNil(t, err) }) t.Run("Test ReloadFromKV load deltalog fails", func(t *testing.T) { fkv := &mockKvLoadDeltaBinlogError{} - _, err := newMeta(context.TODO(), fkv, "") + _, err := newMeta(context.TODO(), fkv, "", nil) assert.NotNil(t, err) }) t.Run("Test ReloadFromKV load statslog fails", func(t *testing.T) { fkv := &mockKvLoadStatsBinlogError{} - _, err := newMeta(context.TODO(), fkv, "") + _, err := newMeta(context.TODO(), fkv, "", nil) assert.NotNil(t, err) }) // illegal binlog/deltalog/statslog info t.Run("Test ReloadFromKV unmarshal binlog fails", func(t *testing.T) { fkv := &mockKvIllegalBinlog{} - _, err := newMeta(context.TODO(), fkv, "") + _, err := newMeta(context.TODO(), fkv, "", nil) assert.NotNil(t, err) }) t.Run("Test ReloadFromKV unmarshal deltalog fails", func(t *testing.T) { fkv := &mockKvIllegalDeltalog{} - _, err := newMeta(context.TODO(), fkv, "") + _, err := newMeta(context.TODO(), fkv, "", nil) assert.NotNil(t, err) }) t.Run("Test ReloadFromKV unmarshal statslog fails", func(t *testing.T) { fkv := &mockKvIllegalStatslog{} - _, err := newMeta(context.TODO(), fkv, "") + _, err := newMeta(context.TODO(), fkv, "", nil) assert.NotNil(t, err) }) } @@ -370,14 +370,14 @@ func TestMeta_Basic(t *testing.T) { // inject error for `Save` memoryKV := memkv.NewMemoryKV() fkv := &saveFailKV{TxnKV: memoryKV} - meta, err := newMeta(context.TODO(), fkv, "") + meta, err := newMeta(context.TODO(), fkv, "", nil) assert.Nil(t, err) err = meta.AddSegment(NewSegmentInfo(&datapb.SegmentInfo{})) assert.NotNil(t, err) fkv2 := &removeFailKV{TxnKV: memoryKV} - meta, err = newMeta(context.TODO(), fkv2, "") + meta, err = newMeta(context.TODO(), fkv2, "", nil) assert.Nil(t, err) // nil, since no segment yet err = meta.DropSegment(0) @@ -389,7 +389,7 @@ func TestMeta_Basic(t *testing.T) { err = meta.DropSegment(0) assert.NotNil(t, err) - meta, err = newMeta(context.TODO(), fkv, "") + meta, err = newMeta(context.TODO(), fkv, "", nil) assert.Nil(t, err) }) @@ -523,7 +523,7 @@ func TestGetUnFlushedSegments(t *testing.T) { func TestUpdateFlushSegmentsInfo(t *testing.T) { t.Run("normal", func(t *testing.T) { - meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") + meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil) assert.Nil(t, err) segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog0")}, @@ -561,7 +561,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { }) t.Run("update non-existed segment", func(t *testing.T) { - meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") + meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil) assert.Nil(t, err) err = meta.UpdateFlushSegmentsInfo(1, false, false, false, nil, nil, nil, nil, nil) @@ -569,7 +569,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { }) t.Run("update checkpoints and start position of non existed segment", func(t *testing.T) { - meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") + meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil) assert.Nil(t, err) segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing}} @@ -586,7 +586,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { t.Run("test save etcd failed", func(t *testing.T) { kv := memkv.NewMemoryKV() failedKv := &saveFailKV{kv} - meta, err := newMeta(context.TODO(), failedKv, "") + meta, err := newMeta(context.TODO(), failedKv, "", nil) assert.Nil(t, err) segmentInfo := &SegmentInfo{ @@ -614,7 +614,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { func TestSaveHandoffMeta(t *testing.T) { kvClient := memkv.NewMemoryKV() - meta, err := newMeta(context.TODO(), kvClient, "") + meta, err := newMeta(context.TODO(), kvClient, "", nil) assert.Nil(t, err) info := &datapb.SegmentInfo{ @@ -785,7 +785,8 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) { Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog5")}, NumOfRows: 1, } - beforeCompact, afterCompact, newSegment := m.PrepareCompleteCompactionMutation(inCompactionLogs, inCompactionResult) + beforeCompact, afterCompact, newSegment, err := m.PrepareCompleteCompactionMutation(inCompactionLogs, inCompactionResult) + assert.Nil(t, err) assert.NotNil(t, beforeCompact) assert.NotNil(t, afterCompact) assert.NotNil(t, newSegment) @@ -1064,7 +1065,7 @@ func TestChannelCP(t *testing.T) { } t.Run("UpdateChannelCheckpoint", func(t *testing.T) { - meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") + meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil) assert.NoError(t, err) // nil position @@ -1076,7 +1077,7 @@ func TestChannelCP(t *testing.T) { }) t.Run("GetChannelCheckpoint", func(t *testing.T) { - meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") + meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil) assert.NoError(t, err) position := meta.GetChannelCheckpoint(mockVChannel) @@ -1091,7 +1092,7 @@ func TestChannelCP(t *testing.T) { }) t.Run("DropChannelCheckpoint", func(t *testing.T) { - meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") + meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil) assert.NoError(t, err) err = meta.DropChannelCheckpoint(mockVChannel) diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 61c3c7680..46780f22c 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -40,7 +40,7 @@ import ( func newMemoryMeta() (*meta, error) { memoryKV := memkv.NewMemoryKV() - return newMeta(context.TODO(), memoryKV, "") + return newMeta(context.TODO(), memoryKV, "", nil) } var _ allocator = (*MockAllocator)(nil) diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index 5902389c2..3d92b3bd1 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -492,7 +492,7 @@ func TestTryToSealSegment(t *testing.T) { mockAllocator := newMockAllocator() memoryKV := memkv.NewMemoryKV() fkv := &saveFailKV{TxnKV: memoryKV} - meta, err := newMeta(context.TODO(), memoryKV, "") + meta, err := newMeta(context.TODO(), memoryKV, "", nil) assert.Nil(t, err) @@ -518,7 +518,7 @@ func TestTryToSealSegment(t *testing.T) { mockAllocator := newMockAllocator() memoryKV := memkv.NewMemoryKV() fkv := &saveFailKV{TxnKV: memoryKV} - meta, err := newMeta(context.TODO(), memoryKV, "") + meta, err := newMeta(context.TODO(), memoryKV, "", nil) assert.Nil(t, err) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 562fccf94..ed513dc77 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -262,7 +262,7 @@ func (s *Server) Init() error { return err } - if err = s.initMeta(storageCli.RootPath()); err != nil { + if err = s.initMeta(storageCli.RootPath(), storageCli); err != nil { return err } @@ -454,12 +454,13 @@ func (s *Server) initSegmentManager() { } } -func (s *Server) initMeta(chunkManagerRootPath string) error { +func (s *Server) initMeta(chunkManagerRootPath string, chunkManager storage.ChunkManager) error { etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue()) + s.kvClient = etcdKV reloadEtcdFn := func() error { var err error - s.meta, err = newMeta(s.ctx, s.kvClient, chunkManagerRootPath) + s.meta, err = newMeta(s.ctx, s.kvClient, chunkManagerRootPath, chunkManager) if err != nil { return err } diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index be2469ae1..f3d9d683a 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -21,17 +21,21 @@ import ( "errors" "fmt" "math" + "path" "strconv" + "strings" "sync" "time" "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/metautil" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -77,8 +81,9 @@ type compactionTask struct { ctx context.Context cancel context.CancelFunc - wg sync.WaitGroup - tr *timerecord.TimeRecorder + wg sync.WaitGroup + tr *timerecord.TimeRecorder + chunkManager storage.ChunkManager } // check if compactionTask implements compactor @@ -91,7 +96,8 @@ func newCompactionTask( channel Channel, fm flushManager, alloc allocatorInterface, - plan *datapb.CompactionPlan) *compactionTask { + plan *datapb.CompactionPlan, + chunkManager storage.ChunkManager) *compactionTask { ctx1, cancel := context.WithCancel(ctx) return &compactionTask{ @@ -105,6 +111,7 @@ func newCompactionTask( allocatorInterface: alloc, plan: plan, tr: timerecord.NewTimeRecorder("compactionTask"), + chunkManager: chunkManager, } } @@ -458,7 +465,96 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { // Inject to stop flush injectStart := time.Now() ti := newTaskInjection(len(segIDs), func(pack *segmentFlushPack) { + collectionID := meta.GetID() pack.segmentID = targetSegID + for _, insertLog := range pack.insertLogs { + splits := strings.Split(insertLog.LogPath, "/") + if len(splits) < 2 { + pack.err = fmt.Errorf("bad insert log path: %s", insertLog.LogPath) + return + } + logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64) + if err != nil { + pack.err = err + return + } + fieldID, err := strconv.ParseInt(splits[len(splits)-2], 10, 64) + if err != nil { + pack.err = err + return + } + blobKey := metautil.JoinIDPath(collectionID, partID, targetSegID, fieldID, logID) + blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentInsertLogPath, blobKey) + blob, err := t.chunkManager.Read(t.ctx, insertLog.LogPath) + if err != nil { + pack.err = err + return + } + err = t.chunkManager.Write(t.ctx, blobPath, blob) + if err != nil { + pack.err = err + return + } + insertLog.LogPath = blobPath + } + + for _, deltaLog := range pack.deltaLogs { + splits := strings.Split(deltaLog.LogPath, "/") + if len(splits) < 1 { + pack.err = fmt.Errorf("delta stats log path: %s", deltaLog.LogPath) + return + } + logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64) + if err != nil { + pack.err = err + return + } + blobKey := metautil.JoinIDPath(collectionID, partID, targetSegID, logID) + blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey) + blob, err := t.chunkManager.Read(t.ctx, deltaLog.LogPath) + if err != nil { + pack.err = err + return + } + err = t.chunkManager.Write(t.ctx, blobPath, blob) + if err != nil { + pack.err = err + return + } + deltaLog.LogPath = blobPath + } + + for _, statsLog := range pack.statsLogs { + splits := strings.Split(statsLog.LogPath, "/") + if len(splits) < 2 { + pack.err = fmt.Errorf("bad stats log path: %s", statsLog.LogPath) + return + } + logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64) + if err != nil { + pack.err = err + return + } + fieldID, err := strconv.ParseInt(splits[len(splits)-2], 10, 64) + if err != nil { + pack.err = err + return + } + blobKey := metautil.JoinIDPath(collectionID, partID, targetSegID, fieldID, logID) + blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentStatslogPath, blobKey) + + blob, err := t.chunkManager.Read(t.ctx, statsLog.LogPath) + if err != nil { + pack.err = err + return + } + err = t.chunkManager.Write(t.ctx, blobPath, blob) + if err != nil { + pack.err = err + return + } + statsLog.LogPath = blobPath + } }) defer close(ti.injectOver) diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index ee6d1f1a1..71ed40812 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -691,7 +691,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { } alloc.random = false // generated ID = 19530 - task := newCompactionTask(context.TODO(), mockbIO, mockbIO, channel, mockfm, alloc, plan) + task := newCompactionTask(context.TODO(), mockbIO, mockbIO, channel, mockfm, alloc, plan, nil) result, err := task.compact() assert.NoError(t, err) assert.NotNil(t, result) @@ -822,7 +822,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { } alloc.random = false // generated ID = 19530 - task := newCompactionTask(context.TODO(), mockbIO, mockbIO, channel, mockfm, alloc, plan) + task := newCompactionTask(context.TODO(), mockbIO, mockbIO, channel, mockfm, alloc, plan, nil) result, err := task.compact() assert.NoError(t, err) assert.NotNil(t, result) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index daf3ed5f7..ac08a868f 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -844,6 +844,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan ds.flushManager, ds.idAllocator, req, + node.chunkManager, ) node.compactionExecutor.execute(task) diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 60ce0724b..4c4d3211a 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/paramtable" + "github.com/milvus-io/milvus/internal/util/retry" ) // dataSyncService controls a flowgraph for a specific collection @@ -160,7 +161,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) // initialize flush manager for DataSync Service dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.chunkManager, dsService.channel, - flushNotifyFunc(dsService), dropVirtualChannelFunc(dsService)) + flushNotifyFunc(dsService, retry.Attempts(50)), dropVirtualChannelFunc(dsService)) var err error // recover segment checkpoints diff --git a/internal/datanode/flush_manager.go b/internal/datanode/flush_manager.go index c6de016c9..154337493 100644 --- a/internal/datanode/flush_manager.go +++ b/internal/datanode/flush_manager.go @@ -822,7 +822,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet log.Warn("failed to SaveBinlogPaths", zap.Int64("segment ID", pack.segmentID), zap.Error(errors.New(rsp.GetReason()))) - return nil + return fmt.Errorf("segment %d not found", pack.segmentID) } // meta error, datanode handles a virtual channel does not belong here if rsp.GetErrorCode() == commonpb.ErrorCode_MetaFailed { diff --git a/internal/datanode/flush_manager_test.go b/internal/datanode/flush_manager_test.go index 3f494c6f5..9854c0525 100644 --- a/internal/datanode/flush_manager_test.go +++ b/internal/datanode/flush_manager_test.go @@ -583,13 +583,6 @@ func TestFlushNotifyFunc(t *testing.T) { }) }) - t.Run("stale segment not found", func(t *testing.T) { - dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_SegmentNotFound - assert.NotPanics(t, func() { - notifyFunc(&segmentFlushPack{flushed: false}) - }) - }) - // issue https://github.com/milvus-io/milvus/issues/17097 // meta error, datanode shall not panic, just drop the virtual channel t.Run("datacoord found meta error", func(t *testing.T) { -- GitLab