未验证 提交 b9d3897d 编写于 作者: B bigsheeper 提交者: GitHub

Return error when any failed detected in loadSegment (#7005)

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 85ea051d
......@@ -203,6 +203,10 @@ func deleteSegment(segment *Segment) {
void
deleteSegment(CSegmentInterface segment);
*/
if segment.segmentPtr == nil {
return
}
segment.segPtrMu.Lock()
defer segment.segPtrMu.Unlock()
cPtr := segment.segmentPtr
......
......@@ -70,6 +70,22 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
return nil
}
newSegments := make([]*Segment, 0)
segmentGC := func() {
for _, s := range newSegments {
deleteSegment(s)
}
}
setSegments := func() {
for _, s := range newSegments {
err := loader.historicalReplica.setSegment(s)
if err != nil {
log.Warn(err.Error())
deleteSegment(s)
}
}
}
// start to load
for _, info := range req.Infos {
segmentID := info.SegmentID
......@@ -79,20 +95,16 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
collection, err := loader.historicalReplica.getCollectionByID(collectionID)
if err != nil {
log.Warn(err.Error())
continue
segmentGC()
return err
}
segment := newSegment(collection, segmentID, partitionID, collectionID, "", segmentTypeSealed, onService)
err = loader.loadSegmentInternal(collectionID, segment, info)
if err != nil {
deleteSegment(segment)
log.Warn(err.Error())
continue
}
err = loader.historicalReplica.setSegment(segment)
if err != nil {
deleteSegment(segment)
log.Warn(err.Error())
continue
segmentGC()
return err
}
if onService {
key := fmt.Sprintf("%s/%d", queryCoordSegmentMetaPrefix, segmentID)
......@@ -100,14 +112,16 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
if err != nil {
deleteSegment(segment)
log.Warn("error when load segment info from etcd", zap.Any("error", err.Error()))
continue
segmentGC()
return err
}
segmentInfo := &querypb.SegmentInfo{}
err = proto.UnmarshalText(value, segmentInfo)
if err != nil {
deleteSegment(segment)
log.Warn("error when unmarshal segment info from etcd", zap.Any("error", err.Error()))
continue
segmentGC()
return err
}
segmentInfo.SegmentState = querypb.SegmentState_sealed
newKey := fmt.Sprintf("%s/%d", queryNodeSegmentMetaPrefix, segmentID)
......@@ -115,9 +129,13 @@ func (loader *segmentLoader) loadSegment(req *querypb.LoadSegmentsRequest, onSer
if err != nil {
deleteSegment(segment)
log.Warn("error when update segment info to etcd", zap.Any("error", err.Error()))
segmentGC()
return err
}
}
newSegments = append(newSegments, segment)
}
setSegments()
// sendQueryNodeStats
return loader.indexLoader.sendQueryNodeStats()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册