未验证 提交 5471e35c 编写于 作者: C congqixia 提交者: GitHub

Wait last version search to release growing safely (#17713)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 fcfca9a7
......@@ -128,12 +128,14 @@ type ShardCluster struct {
segmentDetector ShardSegmentDetector
nodeBuilder ShardNodeBuilder
mut sync.RWMutex
leader *shardNode // shard leader node instance
nodes map[int64]*shardNode // online nodes
segments SegmentsStatus // shard segments
versions map[int64]*ShardClusterVersion // version id to version map
currentVersion *ShardClusterVersion // current serving segment state version
mut sync.RWMutex
leader *shardNode // shard leader node instance
nodes map[int64]*shardNode // online nodes
segments SegmentsStatus // shard segments
mutVersion sync.RWMutex
versions sync.Map // version id to version
currentVersion *ShardClusterVersion // current serving segment state version
nextVersionID *atomic.Int64
segmentCond *sync.Cond // segment state change condition
rcCond *sync.Cond // segment rc change condition
......@@ -158,7 +160,6 @@ func NewShardCluster(collectionID int64, replicaID int64, vchannelName string,
nodes: make(map[int64]*shardNode),
segments: make(map[int64]shardSegmentInfo),
versions: make(map[int64]*ShardClusterVersion),
nextVersionID: atomic.NewInt64(0),
closeCh: make(chan struct{}),
......@@ -189,8 +190,8 @@ func (sc *ShardCluster) serviceable() bool {
return false
}
sc.mut.RLock()
defer sc.mut.RUnlock()
sc.mutVersion.RLock()
defer sc.mutVersion.RUnlock()
// check there is a working version(SyncSegments called)
return sc.currentVersion != nil
}
......@@ -276,15 +277,8 @@ func (sc *ShardCluster) updateSegment(evt shardSegmentInfo) {
// SyncSegments synchronize segment distribution in batch
func (sc *ShardCluster) SyncSegments(distribution []*querypb.ReplicaSegmentsInfo, state segmentState) {
log.Info("ShardCluster sync segments", zap.Any("replica segments", distribution), zap.Int32("state", int32(state)))
// notify handoff wait online if any
defer func() {
sc.segmentCond.L.Lock()
sc.segmentCond.Broadcast()
sc.segmentCond.L.Unlock()
}()
sc.mut.Lock()
defer sc.mut.Unlock()
sc.mut.Lock()
for _, line := range distribution {
for _, segmentID := range line.GetSegmentIds() {
old, ok := sc.segments[segmentID]
......@@ -306,9 +300,17 @@ func (sc *ShardCluster) SyncSegments(distribution []*querypb.ReplicaSegmentsInfo
})
}
}
sc.mut.Unlock()
// notify handoff wait online if any
sc.segmentCond.L.Lock()
sc.segmentCond.Broadcast()
sc.segmentCond.L.Unlock()
sc.mutVersion.Lock()
defer sc.mutVersion.Unlock()
version := NewShardClusterVersion(sc.nextVersionID.Inc(), sc.segments.Clone(filterNothing))
sc.versions[version.versionID] = version
sc.versions.Store(version.versionID, version)
sc.currentVersion = version
}
......@@ -498,8 +500,8 @@ func (sc *ShardCluster) segmentAllocations(partitionIDs []int64) (map[int64][]in
log.Warn("request segment allocations when cluster is not serviceable", zap.Int64("collectionID", sc.collectionID), zap.Int64("replicaID", sc.replicaID), zap.String("vchannelName", sc.vchannelName))
return map[int64][]int64{}, 0
}
sc.mut.RLock()
defer sc.mut.RUnlock()
sc.mutVersion.RLock()
defer sc.mutVersion.RUnlock()
// return allocation from current version and version id
return sc.currentVersion.GetAllocation(partitionIDs), sc.currentVersion.versionID
}
......@@ -511,11 +513,10 @@ func (sc *ShardCluster) finishUsage(versionID int64) {
sc.rcCond.Broadcast()
sc.rcCond.L.Unlock()
}()
sc.mut.Lock()
defer sc.mut.Unlock()
version, ok := sc.versions[versionID]
v, ok := sc.versions.Load(versionID)
if ok {
version := v.(*ShardClusterVersion)
version.FinishUsage()
}
}
......@@ -540,10 +541,7 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error {
// now online segment can provide service, generate a new version
// add segmentChangeInfo to pending list
signal, versionID := sc.appendHandoff(info)
// wait last version not in use
<-signal
versionID := sc.applySegmentChange(info, onlineSegmentIDs)
removes := make(map[int64][]int64) // nodeID => []segmentIDs
// remove offline segments record
......@@ -558,16 +556,6 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error {
}
var errs errorutil.ErrorList
// remove growing segments if any
// handles the case for Growing to Sealed Handoff(which does not has offline segment info)
if sc.leader != nil {
// error ignored here
sc.leader.client.ReleaseSegments(context.Background(), &querypb.ReleaseSegmentsRequest{
CollectionID: sc.collectionID,
SegmentIDs: onlineSegmentIDs,
Scope: querypb.DataScope_Streaming,
})
}
// notify querynode(s) to release segments
for nodeID, segmentIDs := range removes {
......@@ -601,9 +589,9 @@ func (sc *ShardCluster) HandoffSegments(info *querypb.SegmentChangeInfo) error {
}
// appendHandoff adds the change info into pending list and returns the token.
func (sc *ShardCluster) appendHandoff(info *querypb.SegmentChangeInfo) (chan struct{}, int64) {
sc.mut.Lock()
defer sc.mut.Unlock()
func (sc *ShardCluster) applySegmentChange(info *querypb.SegmentChangeInfo, onlineSegmentIDs []UniqueID) int64 {
sc.mutVersion.Lock()
defer sc.mutVersion.Unlock()
// generate a new version
versionID := sc.nextVersionID.Inc()
......@@ -617,33 +605,58 @@ func (sc *ShardCluster) appendHandoff(info *querypb.SegmentChangeInfo) (chan str
}
return false
}))
sc.versions[versionID] = version
sc.versions.Store(versionID, version)
var lastVersionID int64
// signal for nil current version case, no need to wait
signal := make(chan struct{})
close(signal)
/*
----------------------------------------------------------------------------
T0 |T1(S2 online)| T2(change version)|T3(remove G2)|
----------------------------------------------------------------------------
G2, G3 |G2, G3 | G2, G3 | G3
----------------------------------------------------------------------------
S1 |S1, S2 | S1, S2 | S1,S2
----------------------------------------------------------------------------
v0=[S1] |v0=[S1] | v1=[S1,S2] | v1=[S1,S2]
There is no method to ensure search after T2 does not search G2 so that it
could be removed safely
Currently, the only safe method is to block incoming allocation, so there is no
search will be dispatch to G2.
After shard cluster is able to maintain growing semgents, this version change could
reduce the lock range
*/
// currentVersion shall be not nil
if sc.currentVersion != nil {
signal = sc.currentVersion.Expire()
// wait for last version search done
<-sc.currentVersion.Expire()
lastVersionID = sc.currentVersion.versionID
// remove growing segments if any
// handles the case for Growing to Sealed Handoff(which does not has offline segment info)
if sc.leader != nil {
// error ignored here
sc.leader.client.ReleaseSegments(context.Background(), &querypb.ReleaseSegmentsRequest{
CollectionID: sc.collectionID,
SegmentIDs: onlineSegmentIDs,
Scope: querypb.DataScope_Streaming,
})
}
}
// set current version to new one
sc.currentVersion = version
sc.versions[versionID] = version
return signal, lastVersionID
return lastVersionID
}
// cleanupVersion clean up version from map
func (sc *ShardCluster) cleanupVersion(versionID int64) {
sc.mut.Lock()
defer sc.mut.Unlock()
sc.mutVersion.RLock()
defer sc.mutVersion.RUnlock()
// prevent clean up current version
if sc.currentVersion != nil && sc.currentVersion.versionID == versionID {
return
}
delete(sc.versions, versionID)
sc.versions.Delete(versionID)
}
// waitSegmentsOnline waits until all provided segments is loaded.
......
......@@ -1652,6 +1652,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
{
nodeID: 1,
nodeAddr: "addr_1",
isLeader: true,
},
{
nodeID: 2,
......@@ -1705,6 +1706,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
{
nodeID: 1,
nodeAddr: "addr_1",
isLeader: true,
},
{
nodeID: 2,
......@@ -1755,6 +1757,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
{
nodeID: 1,
nodeAddr: "addr_1",
isLeader: true,
},
{
nodeID: 2,
......@@ -1804,6 +1807,8 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
close(sig)
}()
sc.finishUsage(versionID)
evtCh <- segmentEvent{
eventType: segmentAdd,
segmentID: 3,
......@@ -1847,6 +1852,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
{
nodeID: 1,
nodeAddr: "addr_1",
isLeader: true,
},
{
nodeID: 2,
......@@ -1902,6 +1908,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
nodeIDs: []int64{2},
state: segmentStateLoaded,
}
sc.finishUsage(versionID)
// wait for handoff appended into list
assert.Eventually(t, func() bool {
......@@ -1938,6 +1945,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
{
nodeID: 1,
nodeAddr: "addr_1",
isLeader: true,
},
{
nodeID: 2,
......@@ -1986,6 +1994,7 @@ func TestShardCluster_HandoffSegments(t *testing.T) {
{
nodeID: 1,
nodeAddr: "addr_1",
isLeader: true,
},
{
nodeID: 2,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册