From 99b958e360e9e61c15c079c9cf164f64400c4a88 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Mon, 31 Oct 2022 19:27:35 +0800 Subject: [PATCH] Fix BF Concurrency issue (#20211) Signed-off-by: xiaofan-luan Signed-off-by: xiaofan-luan --- internal/datanode/segment.go | 6 ++++++ internal/querynode/segment.go | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/internal/datanode/segment.go b/internal/datanode/segment.go index c18c7b3b3..56a895dd2 100644 --- a/internal/datanode/segment.go +++ b/internal/datanode/segment.go @@ -17,6 +17,7 @@ package datanode import ( + "sync" "sync/atomic" "github.com/bits-and-blooms/bloom/v3" @@ -36,6 +37,7 @@ type Segment struct { memorySize int64 compactedTo UniqueID + statLock sync.Mutex currentStat *storage.PkStatistics historyStats []*storage.PkStatistics @@ -70,6 +72,8 @@ func (s *Segment) setType(t datapb.SegmentType) { } func (s *Segment) updatePKRange(ids storage.FieldData) { + s.statLock.Lock() + defer s.statLock.Unlock() s.InitCurrentStat() err := s.currentStat.UpdatePKRange(ids) if err != nil { @@ -87,6 +91,8 @@ func (s *Segment) InitCurrentStat() { // check if PK exists is current func (s *Segment) isPKExist(pk primaryKey) bool { + s.statLock.Lock() + defer s.statLock.Unlock() if s.currentStat != nil && s.currentStat.PkExist(pk) { return true } diff --git a/internal/querynode/segment.go b/internal/querynode/segment.go index 27fb542d2..c4174ec12 100644 --- a/internal/querynode/segment.go +++ b/internal/querynode/segment.go @@ -95,6 +95,7 @@ type Segment struct { indexedFieldInfos *typeutil.ConcurrentMap[UniqueID, *IndexedFieldInfo] + statLock sync.Mutex // only used by sealed segments currentStat *storage.PkStatistics historyStats []*storage.PkStatistics @@ -618,6 +619,8 @@ func (s *Segment) fillIndexedFieldsData(ctx context.Context, collectionID Unique } func (s *Segment) updateBloomFilter(pks []primaryKey) { + s.statLock.Lock() + defer s.statLock.Unlock() s.InitCurrentStat() buf := make([]byte, 8) for _, pk := range pks { @@ -647,6 +650,8 @@ func (s *Segment) InitCurrentStat() { // check if PK exists is current func (s *Segment) isPKExist(pk primaryKey) bool { + s.statLock.Lock() + defer s.statLock.Unlock() if s.currentStat != nil && s.currentStat.PkExist(pk) { return true } -- GitLab