From c9174d55bade2c1b0d34415152997d4a4de1347c Mon Sep 17 00:00:00 2001 From: jaime Date: Thu, 28 Jul 2022 14:52:31 +0800 Subject: [PATCH] Refine merge operation during compacting phase (#18399) Signed-off-by: yun.zhang --- internal/datacoord/compaction_trigger.go | 7 ++++++- internal/datanode/compactor.go | 16 +++++++--------- internal/datanode/compactor_test.go | 13 +++++-------- internal/storage/primary_key.go | 9 +++++++++ 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index a2f77a0d6..5b415b617 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -253,7 +253,12 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { continue } - log.Info("time cost of generating global compaction", zap.Int64("planID", plan.PlanID), zap.Any("time cost", time.Since(start).Milliseconds()), + segIDs := make(map[int64][]*datapb.FieldBinlog, len(plan.SegmentBinlogs)) + for _, seg := range plan.SegmentBinlogs { + segIDs[seg.SegmentID] = seg.Deltalogs + } + + log.Info("time cost of generating global compaction", zap.Any("segID2DeltaLogs", segIDs), zap.Int64("planID", plan.PlanID), zap.Any("time cost", time.Since(start).Milliseconds()), zap.Int64("collectionID", signal.collectionID), zap.String("channel", group.channelName), zap.Int64("partitionID", group.partitionID)) } } diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 99b81b391..d54b8de65 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -133,12 +133,12 @@ func (t *compactionTask) getChannelName() string { return t.plan.GetChannel() } -func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[primaryKey]Timestamp, *DelDataBuf, error) { +func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (map[interface{}]Timestamp, *DelDataBuf, error) { mergeStart := time.Now() dCodec := storage.NewDeleteCodec() var ( - pk2ts = make(map[primaryKey]Timestamp) + pk2ts = make(map[interface{}]Timestamp) dbuff = &DelDataBuf{ delData: &DeleteData{ Pks: make([]primaryKey, 0), @@ -162,7 +162,7 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT ts := dData.Tss[i] if timetravelTs != Timestamp(0) && dData.Tss[i] <= timetravelTs { - pk2ts[pk] = ts + pk2ts[pk.GetValue()] = ts continue } @@ -191,7 +191,7 @@ func nano2Milli(nano time.Duration) float64 { return float64(nano) / float64(time.Millisecond) } -func (t *compactionTask) merge(mergeItr iterator, delta map[primaryKey]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, int64, error) { +func (t *compactionTask) merge(mergeItr iterator, delta map[interface{}]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, int64, error) { mergeStart := time.Now() var ( @@ -207,12 +207,10 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[primaryKey]Timestamp ) isDeletedValue := func(v *storage.Value) bool { - for pk, ts := range delta { - if pk.EQ(v.PK) && uint64(v.Timestamp) <= ts { - return true - } + ts, ok := delta[v.PK.GetValue()] + if ok && uint64(v.Timestamp) <= ts { + return true } - return false } diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index eca31f81d..fc4e3f41d 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -259,9 +259,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) { mitr := storage.NewMergeIterator([]iterator{iitr}) - pk := newInt64PrimaryKey(1) - dm := map[primaryKey]Timestamp{ - pk: 10000, + dm := map[interface{}]Timestamp{ + 1: 10000, } ct := &compactionTask{} @@ -289,7 +288,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) { mitr := storage.NewMergeIterator([]iterator{iitr}) - dm := map[primaryKey]Timestamp{} + dm := map[interface{}]Timestamp{} ct := &compactionTask{} idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime()) @@ -311,10 +310,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) { require.NoError(t, err) mitr := storage.NewMergeIterator([]iterator{iitr}) - - pk := newInt64PrimaryKey(1) - dm := map[primaryKey]Timestamp{ - pk: 10000, + dm := map[interface{}]Timestamp{ + 1: 10000, } ct := &compactionTask{} diff --git a/internal/storage/primary_key.go b/internal/storage/primary_key.go index 771e1ba5c..8320efb28 100644 --- a/internal/storage/primary_key.go +++ b/internal/storage/primary_key.go @@ -34,6 +34,7 @@ type PrimaryKey interface { MarshalJSON() ([]byte, error) UnmarshalJSON(data []byte) error SetValue(interface{}) error + GetValue() interface{} Type() schemapb.DataType } @@ -147,6 +148,10 @@ func (ip *Int64PrimaryKey) Type() schemapb.DataType { return schemapb.DataType_Int64 } +func (ip *Int64PrimaryKey) GetValue() interface{} { + return ip.Value +} + type BaseStringPrimaryKey struct { Value string } @@ -199,6 +204,10 @@ func (sp *BaseStringPrimaryKey) SetValue(data interface{}) error { return nil } +func (sp *BaseStringPrimaryKey) GetValue() interface{} { + return sp.Value +} + type VarCharPrimaryKey struct { BaseStringPrimaryKey } -- GitLab