未验证 提交 23900952 编写于 作者: B Bingyi Sun 提交者: GitHub

Fix load uses compacted segments' binlogs (#20655)

Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
Co-authored-by: Nsunby <bingyi.sun@zilliz.com>
上级 fef87f87
...@@ -249,7 +249,10 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu ...@@ -249,7 +249,10 @@ func (c *compactionPlanHandler) completeCompaction(result *datapb.CompactionResu
} }
func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionResult) error { func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.CompactionPlan, result *datapb.CompactionResult) error {
oldSegments, modSegments, newSegment := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result) oldSegments, modSegments, newSegment, err := c.meta.PrepareCompleteCompactionMutation(plan.GetSegmentBinlogs(), result)
if err != nil {
return err
}
log := log.With(zap.Int64("planID", plan.GetPlanID())) log := log.With(zap.Int64("planID", plan.GetPlanID()))
modInfos := make([]*datapb.SegmentInfo, len(modSegments)) modInfos := make([]*datapb.SegmentInfo, len(modSegments))
......
...@@ -20,6 +20,7 @@ package datacoord ...@@ -20,6 +20,7 @@ package datacoord
import ( import (
"context" "context"
"fmt" "fmt"
"path"
"sync" "sync"
"time" "time"
...@@ -36,6 +37,8 @@ import ( ...@@ -36,6 +37,8 @@ import (
"github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/metautil"
"github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
...@@ -44,11 +47,12 @@ import ( ...@@ -44,11 +47,12 @@ import (
type meta struct { type meta struct {
sync.RWMutex sync.RWMutex
ctx context.Context ctx context.Context
catalog metastore.DataCoordCatalog catalog metastore.DataCoordCatalog
collections map[UniqueID]*collectionInfo // collection id to collection info collections map[UniqueID]*collectionInfo // collection id to collection info
segments *SegmentsInfo // segment id to segment info segments *SegmentsInfo // segment id to segment info
channelCPs map[string]*internalpb.MsgPosition // vChannel -> channel checkpoint/see position channelCPs map[string]*internalpb.MsgPosition // vChannel -> channel checkpoint/see position
chunkManager storage.ChunkManager
} }
type collectionInfo struct { type collectionInfo struct {
...@@ -60,13 +64,14 @@ type collectionInfo struct { ...@@ -60,13 +64,14 @@ type collectionInfo struct {
} }
// NewMeta creates meta from provided `kv.TxnKV` // NewMeta creates meta from provided `kv.TxnKV`
func newMeta(ctx context.Context, kv kv.TxnKV, chunkManagerRootPath string) (*meta, error) { func newMeta(ctx context.Context, kv kv.TxnKV, chunkManagerRootPath string, chunkManager storage.ChunkManager) (*meta, error) {
mt := &meta{ mt := &meta{
ctx: ctx, ctx: ctx,
catalog: &datacoord.Catalog{Txn: kv, ChunkManagerRootPath: chunkManagerRootPath}, catalog: &datacoord.Catalog{Txn: kv, ChunkManagerRootPath: chunkManagerRootPath},
collections: make(map[UniqueID]*collectionInfo), collections: make(map[UniqueID]*collectionInfo),
segments: NewSegmentsInfo(), segments: NewSegmentsInfo(),
channelCPs: make(map[string]*internalpb.MsgPosition), channelCPs: make(map[string]*internalpb.MsgPosition),
chunkManager: chunkManager,
} }
err := mt.reloadFromKV() err := mt.reloadFromKV()
if err != nil { if err != nil {
...@@ -892,7 +897,7 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) { ...@@ -892,7 +897,7 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
// - the segment info of compactedFrom segments after compaction to alter // - the segment info of compactedFrom segments after compaction to alter
// - the segment info of compactedTo segment after compaction to add // - the segment info of compactedTo segment after compaction to add
// The compactedTo segment could contain 0 numRows // The compactedTo segment could contain 0 numRows
func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.CompactionSegmentBinlogs, result *datapb.CompactionResult) ([]*datapb.SegmentInfo, []*SegmentInfo, *SegmentInfo) { func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.CompactionSegmentBinlogs, result *datapb.CompactionResult) ([]*datapb.SegmentInfo, []*SegmentInfo, *SegmentInfo, error) {
log.Info("meta update: prepare for complete compaction mutation") log.Info("meta update: prepare for complete compaction mutation")
m.Lock() m.Lock()
defer m.Unlock() defer m.Unlock()
...@@ -938,7 +943,11 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac ...@@ -938,7 +943,11 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac
} }
newAddedDeltalogs := m.updateDeltalogs(originDeltalogs, deletedDeltalogs, nil) newAddedDeltalogs := m.updateDeltalogs(originDeltalogs, deletedDeltalogs, nil)
deltalogs := append(result.GetDeltalogs(), newAddedDeltalogs...) copiedDeltalogs, err := m.copyDeltaFiles(newAddedDeltalogs, modSegments[0].CollectionID, modSegments[0].PartitionID, result.GetSegmentID())
if err != nil {
return nil, nil, nil, err
}
deltalogs := append(result.GetDeltalogs(), copiedDeltalogs...)
compactionFrom := make([]UniqueID, 0, len(modSegments)) compactionFrom := make([]UniqueID, 0, len(modSegments))
for _, s := range modSegments { for _, s := range modSegments {
...@@ -970,7 +979,29 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac ...@@ -970,7 +979,29 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac
zap.Int64("new segment num of rows", segment.GetNumOfRows()), zap.Int64("new segment num of rows", segment.GetNumOfRows()),
zap.Any("compacted from", segment.GetCompactionFrom())) zap.Any("compacted from", segment.GetCompactionFrom()))
return oldSegments, modSegments, segment return oldSegments, modSegments, segment, nil
}
func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, partitionID, targetSegmentID int64) ([]*datapb.FieldBinlog, error) {
ret := make([]*datapb.FieldBinlog, 0, len(binlogs))
for _, fieldBinlog := range binlogs {
fieldBinlog = proto.Clone(fieldBinlog).(*datapb.FieldBinlog)
for _, binlog := range fieldBinlog.Binlogs {
blobKey := metautil.JoinIDPath(collectionID, partitionID, targetSegmentID, binlog.LogID)
blobPath := path.Join(m.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey)
blob, err := m.chunkManager.Read(m.ctx, binlog.LogPath)
if err != nil {
return nil, err
}
err = m.chunkManager.Write(m.ctx, blobPath, blob)
if err != nil {
return nil, err
}
binlog.LogPath = blobPath
}
ret = append(ret, fieldBinlog)
}
return ret, nil
} }
func (m *meta) alterMetaStoreAfterCompaction(modSegments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error { func (m *meta) alterMetaStoreAfterCompaction(modSegments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error {
......
...@@ -187,55 +187,55 @@ func (mek *mockKvIllegalStatslog) LoadWithPrefix(key string) ([]string, []string ...@@ -187,55 +187,55 @@ func (mek *mockKvIllegalStatslog) LoadWithPrefix(key string) ([]string, []string
func TestMetaReloadFromKV(t *testing.T) { func TestMetaReloadFromKV(t *testing.T) {
t.Run("Test ReloadFromKV success", func(t *testing.T) { t.Run("Test ReloadFromKV success", func(t *testing.T) {
fkv := &mockEtcdKv{} fkv := &mockEtcdKv{}
_, err := newMeta(context.TODO(), fkv, "") _, err := newMeta(context.TODO(), fkv, "", nil)
assert.Nil(t, err) assert.Nil(t, err)
}) })
// load segment error // load segment error
t.Run("Test ReloadFromKV load segment fails", func(t *testing.T) { t.Run("Test ReloadFromKV load segment fails", func(t *testing.T) {
fkv := &mockKvLoadSegmentError{} fkv := &mockKvLoadSegmentError{}
_, err := newMeta(context.TODO(), fkv, "") _, err := newMeta(context.TODO(), fkv, "", nil)
assert.NotNil(t, err) assert.NotNil(t, err)
}) })
// illegal segment info // illegal segment info
t.Run("Test ReloadFromKV unmarshal segment fails", func(t *testing.T) { t.Run("Test ReloadFromKV unmarshal segment fails", func(t *testing.T) {
fkv := &mockKvIllegalSegment{} fkv := &mockKvIllegalSegment{}
_, err := newMeta(context.TODO(), fkv, "") _, err := newMeta(context.TODO(), fkv, "", nil)
assert.NotNil(t, err) assert.NotNil(t, err)
}) })
// load binlog/deltalog/statslog error // load binlog/deltalog/statslog error
t.Run("Test ReloadFromKV load binlog fails", func(t *testing.T) { t.Run("Test ReloadFromKV load binlog fails", func(t *testing.T) {
fkv := &mockKvLoadBinlogError{} fkv := &mockKvLoadBinlogError{}
_, err := newMeta(context.TODO(), fkv, "") _, err := newMeta(context.TODO(), fkv, "", nil)
assert.NotNil(t, err) assert.NotNil(t, err)
}) })
t.Run("Test ReloadFromKV load deltalog fails", func(t *testing.T) { t.Run("Test ReloadFromKV load deltalog fails", func(t *testing.T) {
fkv := &mockKvLoadDeltaBinlogError{} fkv := &mockKvLoadDeltaBinlogError{}
_, err := newMeta(context.TODO(), fkv, "") _, err := newMeta(context.TODO(), fkv, "", nil)
assert.NotNil(t, err) assert.NotNil(t, err)
}) })
t.Run("Test ReloadFromKV load statslog fails", func(t *testing.T) { t.Run("Test ReloadFromKV load statslog fails", func(t *testing.T) {
fkv := &mockKvLoadStatsBinlogError{} fkv := &mockKvLoadStatsBinlogError{}
_, err := newMeta(context.TODO(), fkv, "") _, err := newMeta(context.TODO(), fkv, "", nil)
assert.NotNil(t, err) assert.NotNil(t, err)
}) })
// illegal binlog/deltalog/statslog info // illegal binlog/deltalog/statslog info
t.Run("Test ReloadFromKV unmarshal binlog fails", func(t *testing.T) { t.Run("Test ReloadFromKV unmarshal binlog fails", func(t *testing.T) {
fkv := &mockKvIllegalBinlog{} fkv := &mockKvIllegalBinlog{}
_, err := newMeta(context.TODO(), fkv, "") _, err := newMeta(context.TODO(), fkv, "", nil)
assert.NotNil(t, err) assert.NotNil(t, err)
}) })
t.Run("Test ReloadFromKV unmarshal deltalog fails", func(t *testing.T) { t.Run("Test ReloadFromKV unmarshal deltalog fails", func(t *testing.T) {
fkv := &mockKvIllegalDeltalog{} fkv := &mockKvIllegalDeltalog{}
_, err := newMeta(context.TODO(), fkv, "") _, err := newMeta(context.TODO(), fkv, "", nil)
assert.NotNil(t, err) assert.NotNil(t, err)
}) })
t.Run("Test ReloadFromKV unmarshal statslog fails", func(t *testing.T) { t.Run("Test ReloadFromKV unmarshal statslog fails", func(t *testing.T) {
fkv := &mockKvIllegalStatslog{} fkv := &mockKvIllegalStatslog{}
_, err := newMeta(context.TODO(), fkv, "") _, err := newMeta(context.TODO(), fkv, "", nil)
assert.NotNil(t, err) assert.NotNil(t, err)
}) })
} }
...@@ -370,14 +370,14 @@ func TestMeta_Basic(t *testing.T) { ...@@ -370,14 +370,14 @@ func TestMeta_Basic(t *testing.T) {
// inject error for `Save` // inject error for `Save`
memoryKV := memkv.NewMemoryKV() memoryKV := memkv.NewMemoryKV()
fkv := &saveFailKV{TxnKV: memoryKV} fkv := &saveFailKV{TxnKV: memoryKV}
meta, err := newMeta(context.TODO(), fkv, "") meta, err := newMeta(context.TODO(), fkv, "", nil)
assert.Nil(t, err) assert.Nil(t, err)
err = meta.AddSegment(NewSegmentInfo(&datapb.SegmentInfo{})) err = meta.AddSegment(NewSegmentInfo(&datapb.SegmentInfo{}))
assert.NotNil(t, err) assert.NotNil(t, err)
fkv2 := &removeFailKV{TxnKV: memoryKV} fkv2 := &removeFailKV{TxnKV: memoryKV}
meta, err = newMeta(context.TODO(), fkv2, "") meta, err = newMeta(context.TODO(), fkv2, "", nil)
assert.Nil(t, err) assert.Nil(t, err)
// nil, since no segment yet // nil, since no segment yet
err = meta.DropSegment(0) err = meta.DropSegment(0)
...@@ -389,7 +389,7 @@ func TestMeta_Basic(t *testing.T) { ...@@ -389,7 +389,7 @@ func TestMeta_Basic(t *testing.T) {
err = meta.DropSegment(0) err = meta.DropSegment(0)
assert.NotNil(t, err) assert.NotNil(t, err)
meta, err = newMeta(context.TODO(), fkv, "") meta, err = newMeta(context.TODO(), fkv, "", nil)
assert.Nil(t, err) assert.Nil(t, err)
}) })
...@@ -523,7 +523,7 @@ func TestGetUnFlushedSegments(t *testing.T) { ...@@ -523,7 +523,7 @@ func TestGetUnFlushedSegments(t *testing.T) {
func TestUpdateFlushSegmentsInfo(t *testing.T) { func TestUpdateFlushSegmentsInfo(t *testing.T) {
t.Run("normal", func(t *testing.T) { t.Run("normal", func(t *testing.T) {
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil)
assert.Nil(t, err) assert.Nil(t, err)
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog0")}, segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog0")},
...@@ -561,7 +561,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { ...@@ -561,7 +561,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
}) })
t.Run("update non-existed segment", func(t *testing.T) { t.Run("update non-existed segment", func(t *testing.T) {
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil)
assert.Nil(t, err) assert.Nil(t, err)
err = meta.UpdateFlushSegmentsInfo(1, false, false, false, nil, nil, nil, nil, nil) err = meta.UpdateFlushSegmentsInfo(1, false, false, false, nil, nil, nil, nil, nil)
...@@ -569,7 +569,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { ...@@ -569,7 +569,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
}) })
t.Run("update checkpoints and start position of non existed segment", func(t *testing.T) { t.Run("update checkpoints and start position of non existed segment", func(t *testing.T) {
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil)
assert.Nil(t, err) assert.Nil(t, err)
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing}} segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing}}
...@@ -586,7 +586,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { ...@@ -586,7 +586,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
t.Run("test save etcd failed", func(t *testing.T) { t.Run("test save etcd failed", func(t *testing.T) {
kv := memkv.NewMemoryKV() kv := memkv.NewMemoryKV()
failedKv := &saveFailKV{kv} failedKv := &saveFailKV{kv}
meta, err := newMeta(context.TODO(), failedKv, "") meta, err := newMeta(context.TODO(), failedKv, "", nil)
assert.Nil(t, err) assert.Nil(t, err)
segmentInfo := &SegmentInfo{ segmentInfo := &SegmentInfo{
...@@ -614,7 +614,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) { ...@@ -614,7 +614,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
func TestSaveHandoffMeta(t *testing.T) { func TestSaveHandoffMeta(t *testing.T) {
kvClient := memkv.NewMemoryKV() kvClient := memkv.NewMemoryKV()
meta, err := newMeta(context.TODO(), kvClient, "") meta, err := newMeta(context.TODO(), kvClient, "", nil)
assert.Nil(t, err) assert.Nil(t, err)
info := &datapb.SegmentInfo{ info := &datapb.SegmentInfo{
...@@ -785,7 +785,8 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) { ...@@ -785,7 +785,8 @@ func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog5")}, Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog5")},
NumOfRows: 1, NumOfRows: 1,
} }
beforeCompact, afterCompact, newSegment := m.PrepareCompleteCompactionMutation(inCompactionLogs, inCompactionResult) beforeCompact, afterCompact, newSegment, err := m.PrepareCompleteCompactionMutation(inCompactionLogs, inCompactionResult)
assert.Nil(t, err)
assert.NotNil(t, beforeCompact) assert.NotNil(t, beforeCompact)
assert.NotNil(t, afterCompact) assert.NotNil(t, afterCompact)
assert.NotNil(t, newSegment) assert.NotNil(t, newSegment)
...@@ -1064,7 +1065,7 @@ func TestChannelCP(t *testing.T) { ...@@ -1064,7 +1065,7 @@ func TestChannelCP(t *testing.T) {
} }
t.Run("UpdateChannelCheckpoint", func(t *testing.T) { t.Run("UpdateChannelCheckpoint", func(t *testing.T) {
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil)
assert.NoError(t, err) assert.NoError(t, err)
// nil position // nil position
...@@ -1076,7 +1077,7 @@ func TestChannelCP(t *testing.T) { ...@@ -1076,7 +1077,7 @@ func TestChannelCP(t *testing.T) {
}) })
t.Run("GetChannelCheckpoint", func(t *testing.T) { t.Run("GetChannelCheckpoint", func(t *testing.T) {
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil)
assert.NoError(t, err) assert.NoError(t, err)
position := meta.GetChannelCheckpoint(mockVChannel) position := meta.GetChannelCheckpoint(mockVChannel)
...@@ -1091,7 +1092,7 @@ func TestChannelCP(t *testing.T) { ...@@ -1091,7 +1092,7 @@ func TestChannelCP(t *testing.T) {
}) })
t.Run("DropChannelCheckpoint", func(t *testing.T) { t.Run("DropChannelCheckpoint", func(t *testing.T) {
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "") meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "", nil)
assert.NoError(t, err) assert.NoError(t, err)
err = meta.DropChannelCheckpoint(mockVChannel) err = meta.DropChannelCheckpoint(mockVChannel)
......
...@@ -40,7 +40,7 @@ import ( ...@@ -40,7 +40,7 @@ import (
func newMemoryMeta() (*meta, error) { func newMemoryMeta() (*meta, error) {
memoryKV := memkv.NewMemoryKV() memoryKV := memkv.NewMemoryKV()
return newMeta(context.TODO(), memoryKV, "") return newMeta(context.TODO(), memoryKV, "", nil)
} }
var _ allocator = (*MockAllocator)(nil) var _ allocator = (*MockAllocator)(nil)
......
...@@ -492,7 +492,7 @@ func TestTryToSealSegment(t *testing.T) { ...@@ -492,7 +492,7 @@ func TestTryToSealSegment(t *testing.T) {
mockAllocator := newMockAllocator() mockAllocator := newMockAllocator()
memoryKV := memkv.NewMemoryKV() memoryKV := memkv.NewMemoryKV()
fkv := &saveFailKV{TxnKV: memoryKV} fkv := &saveFailKV{TxnKV: memoryKV}
meta, err := newMeta(context.TODO(), memoryKV, "") meta, err := newMeta(context.TODO(), memoryKV, "", nil)
assert.Nil(t, err) assert.Nil(t, err)
...@@ -518,7 +518,7 @@ func TestTryToSealSegment(t *testing.T) { ...@@ -518,7 +518,7 @@ func TestTryToSealSegment(t *testing.T) {
mockAllocator := newMockAllocator() mockAllocator := newMockAllocator()
memoryKV := memkv.NewMemoryKV() memoryKV := memkv.NewMemoryKV()
fkv := &saveFailKV{TxnKV: memoryKV} fkv := &saveFailKV{TxnKV: memoryKV}
meta, err := newMeta(context.TODO(), memoryKV, "") meta, err := newMeta(context.TODO(), memoryKV, "", nil)
assert.Nil(t, err) assert.Nil(t, err)
......
...@@ -262,7 +262,7 @@ func (s *Server) Init() error { ...@@ -262,7 +262,7 @@ func (s *Server) Init() error {
return err return err
} }
if err = s.initMeta(storageCli.RootPath()); err != nil { if err = s.initMeta(storageCli.RootPath(), storageCli); err != nil {
return err return err
} }
...@@ -454,12 +454,13 @@ func (s *Server) initSegmentManager() { ...@@ -454,12 +454,13 @@ func (s *Server) initSegmentManager() {
} }
} }
func (s *Server) initMeta(chunkManagerRootPath string) error { func (s *Server) initMeta(chunkManagerRootPath string, chunkManager storage.ChunkManager) error {
etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue()) etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
s.kvClient = etcdKV s.kvClient = etcdKV
reloadEtcdFn := func() error { reloadEtcdFn := func() error {
var err error var err error
s.meta, err = newMeta(s.ctx, s.kvClient, chunkManagerRootPath) s.meta, err = newMeta(s.ctx, s.kvClient, chunkManagerRootPath, chunkManager)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -21,17 +21,21 @@ import ( ...@@ -21,17 +21,21 @@ import (
"errors" "errors"
"fmt" "fmt"
"math" "math"
"path"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
"github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/metautil"
"github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/tsoutil"
...@@ -77,8 +81,9 @@ type compactionTask struct { ...@@ -77,8 +81,9 @@ type compactionTask struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
wg sync.WaitGroup wg sync.WaitGroup
tr *timerecord.TimeRecorder tr *timerecord.TimeRecorder
chunkManager storage.ChunkManager
} }
// check if compactionTask implements compactor // check if compactionTask implements compactor
...@@ -91,7 +96,8 @@ func newCompactionTask( ...@@ -91,7 +96,8 @@ func newCompactionTask(
channel Channel, channel Channel,
fm flushManager, fm flushManager,
alloc allocatorInterface, alloc allocatorInterface,
plan *datapb.CompactionPlan) *compactionTask { plan *datapb.CompactionPlan,
chunkManager storage.ChunkManager) *compactionTask {
ctx1, cancel := context.WithCancel(ctx) ctx1, cancel := context.WithCancel(ctx)
return &compactionTask{ return &compactionTask{
...@@ -105,6 +111,7 @@ func newCompactionTask( ...@@ -105,6 +111,7 @@ func newCompactionTask(
allocatorInterface: alloc, allocatorInterface: alloc,
plan: plan, plan: plan,
tr: timerecord.NewTimeRecorder("compactionTask"), tr: timerecord.NewTimeRecorder("compactionTask"),
chunkManager: chunkManager,
} }
} }
...@@ -458,7 +465,96 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) { ...@@ -458,7 +465,96 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
// Inject to stop flush // Inject to stop flush
injectStart := time.Now() injectStart := time.Now()
ti := newTaskInjection(len(segIDs), func(pack *segmentFlushPack) { ti := newTaskInjection(len(segIDs), func(pack *segmentFlushPack) {
collectionID := meta.GetID()
pack.segmentID = targetSegID pack.segmentID = targetSegID
for _, insertLog := range pack.insertLogs {
splits := strings.Split(insertLog.LogPath, "/")
if len(splits) < 2 {
pack.err = fmt.Errorf("bad insert log path: %s", insertLog.LogPath)
return
}
logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64)
if err != nil {
pack.err = err
return
}
fieldID, err := strconv.ParseInt(splits[len(splits)-2], 10, 64)
if err != nil {
pack.err = err
return
}
blobKey := metautil.JoinIDPath(collectionID, partID, targetSegID, fieldID, logID)
blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentInsertLogPath, blobKey)
blob, err := t.chunkManager.Read(t.ctx, insertLog.LogPath)
if err != nil {
pack.err = err
return
}
err = t.chunkManager.Write(t.ctx, blobPath, blob)
if err != nil {
pack.err = err
return
}
insertLog.LogPath = blobPath
}
for _, deltaLog := range pack.deltaLogs {
splits := strings.Split(deltaLog.LogPath, "/")
if len(splits) < 1 {
pack.err = fmt.Errorf("delta stats log path: %s", deltaLog.LogPath)
return
}
logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64)
if err != nil {
pack.err = err
return
}
blobKey := metautil.JoinIDPath(collectionID, partID, targetSegID, logID)
blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey)
blob, err := t.chunkManager.Read(t.ctx, deltaLog.LogPath)
if err != nil {
pack.err = err
return
}
err = t.chunkManager.Write(t.ctx, blobPath, blob)
if err != nil {
pack.err = err
return
}
deltaLog.LogPath = blobPath
}
for _, statsLog := range pack.statsLogs {
splits := strings.Split(statsLog.LogPath, "/")
if len(splits) < 2 {
pack.err = fmt.Errorf("bad stats log path: %s", statsLog.LogPath)
return
}
logID, err := strconv.ParseInt(splits[len(splits)-1], 10, 64)
if err != nil {
pack.err = err
return
}
fieldID, err := strconv.ParseInt(splits[len(splits)-2], 10, 64)
if err != nil {
pack.err = err
return
}
blobKey := metautil.JoinIDPath(collectionID, partID, targetSegID, fieldID, logID)
blobPath := path.Join(t.chunkManager.RootPath(), common.SegmentStatslogPath, blobKey)
blob, err := t.chunkManager.Read(t.ctx, statsLog.LogPath)
if err != nil {
pack.err = err
return
}
err = t.chunkManager.Write(t.ctx, blobPath, blob)
if err != nil {
pack.err = err
return
}
statsLog.LogPath = blobPath
}
}) })
defer close(ti.injectOver) defer close(ti.injectOver)
......
...@@ -691,7 +691,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { ...@@ -691,7 +691,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
} }
alloc.random = false // generated ID = 19530 alloc.random = false // generated ID = 19530
task := newCompactionTask(context.TODO(), mockbIO, mockbIO, channel, mockfm, alloc, plan) task := newCompactionTask(context.TODO(), mockbIO, mockbIO, channel, mockfm, alloc, plan, nil)
result, err := task.compact() result, err := task.compact()
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, result) assert.NotNil(t, result)
...@@ -822,7 +822,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { ...@@ -822,7 +822,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
} }
alloc.random = false // generated ID = 19530 alloc.random = false // generated ID = 19530
task := newCompactionTask(context.TODO(), mockbIO, mockbIO, channel, mockfm, alloc, plan) task := newCompactionTask(context.TODO(), mockbIO, mockbIO, channel, mockfm, alloc, plan, nil)
result, err := task.compact() result, err := task.compact()
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, result) assert.NotNil(t, result)
......
...@@ -844,6 +844,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan ...@@ -844,6 +844,7 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
ds.flushManager, ds.flushManager,
ds.idAllocator, ds.idAllocator,
req, req,
node.chunkManager,
) )
node.compactionExecutor.execute(task) node.compactionExecutor.execute(task)
......
...@@ -36,6 +36,7 @@ import ( ...@@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/util/flowgraph" "github.com/milvus-io/milvus/internal/util/flowgraph"
"github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
) )
// dataSyncService controls a flowgraph for a specific collection // dataSyncService controls a flowgraph for a specific collection
...@@ -160,7 +161,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro ...@@ -160,7 +161,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx) dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
// initialize flush manager for DataSync Service // initialize flush manager for DataSync Service
dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.chunkManager, dsService.channel, dsService.flushManager = NewRendezvousFlushManager(dsService.idAllocator, dsService.chunkManager, dsService.channel,
flushNotifyFunc(dsService), dropVirtualChannelFunc(dsService)) flushNotifyFunc(dsService, retry.Attempts(50)), dropVirtualChannelFunc(dsService))
var err error var err error
// recover segment checkpoints // recover segment checkpoints
......
...@@ -822,7 +822,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet ...@@ -822,7 +822,7 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
log.Warn("failed to SaveBinlogPaths", log.Warn("failed to SaveBinlogPaths",
zap.Int64("segment ID", pack.segmentID), zap.Int64("segment ID", pack.segmentID),
zap.Error(errors.New(rsp.GetReason()))) zap.Error(errors.New(rsp.GetReason())))
return nil return fmt.Errorf("segment %d not found", pack.segmentID)
} }
// meta error, datanode handles a virtual channel does not belong here // meta error, datanode handles a virtual channel does not belong here
if rsp.GetErrorCode() == commonpb.ErrorCode_MetaFailed { if rsp.GetErrorCode() == commonpb.ErrorCode_MetaFailed {
......
...@@ -583,13 +583,6 @@ func TestFlushNotifyFunc(t *testing.T) { ...@@ -583,13 +583,6 @@ func TestFlushNotifyFunc(t *testing.T) {
}) })
}) })
t.Run("stale segment not found", func(t *testing.T) {
dataCoord.SaveBinlogPathStatus = commonpb.ErrorCode_SegmentNotFound
assert.NotPanics(t, func() {
notifyFunc(&segmentFlushPack{flushed: false})
})
})
// issue https://github.com/milvus-io/milvus/issues/17097 // issue https://github.com/milvus-io/milvus/issues/17097
// meta error, datanode shall not panic, just drop the virtual channel // meta error, datanode shall not panic, just drop the virtual channel
t.Run("datacoord found meta error", func(t *testing.T) { t.Run("datacoord found meta error", func(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册