提交 b07b2484 编写于 作者: B bigsheeper 提交者: yefu.chen

Fix error when loading

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
上级 8f4995da
...@@ -397,17 +397,14 @@ func (dct *DropCollectionTask) Execute() error { ...@@ -397,17 +397,14 @@ func (dct *DropCollectionTask) Execute() error {
if err != nil { if err != nil {
return err return err
} }
dct.result, _ = dct.masterClient.DropCollection(dct.DropCollectionRequest)
dct.result, err = dct.masterClient.DropCollection(dct.DropCollectionRequest) if dct.result.ErrorCode != commonpb.ErrorCode_SUCCESS {
if err != nil { return errors.New(dct.result.Reason)
return err
} }
err = globalInsertChannelsMap.closeInsertMsgStream(collID) err = globalInsertChannelsMap.closeInsertMsgStream(collID)
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
......
...@@ -61,6 +61,7 @@ type collectionReplica interface { ...@@ -61,6 +61,7 @@ type collectionReplica interface {
getSegmentStatistics() []*internalpb2.SegmentStats getSegmentStatistics() []*internalpb2.SegmentStats
getEnabledSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) getEnabledSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
getSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID)
replaceGrowingSegmentBySealedSegment(segment *Segment) error replaceGrowingSegmentBySealedSegment(segment *Segment) error
getTSafe() tSafe getTSafe() tSafe
...@@ -477,7 +478,7 @@ func (colReplica *collectionReplicaImpl) getEnabledSealedSegmentsBySegmentType(s ...@@ -477,7 +478,7 @@ func (colReplica *collectionReplicaImpl) getEnabledSealedSegmentsBySegmentType(s
} }
if segment.getType() == segType { if segment.getType() == segType {
targetCollectionIDs = append(targetCollectionIDs, segment.collectionID) targetCollectionIDs = append(targetCollectionIDs, segment.collectionID)
targetPartitionIDs = append(targetPartitionIDs, segment.collectionID) targetPartitionIDs = append(targetPartitionIDs, segment.partitionID)
targetSegmentIDs = append(targetSegmentIDs, segment.segmentID) targetSegmentIDs = append(targetSegmentIDs, segment.segmentID)
} }
} }
...@@ -486,6 +487,25 @@ func (colReplica *collectionReplicaImpl) getEnabledSealedSegmentsBySegmentType(s ...@@ -486,6 +487,25 @@ func (colReplica *collectionReplicaImpl) getEnabledSealedSegmentsBySegmentType(s
return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
} }
func (colReplica *collectionReplicaImpl) getSealedSegmentsBySegmentType(segType segmentType) ([]UniqueID, []UniqueID, []UniqueID) {
colReplica.mu.RLock()
defer colReplica.mu.RUnlock()
targetCollectionIDs := make([]UniqueID, 0)
targetPartitionIDs := make([]UniqueID, 0)
targetSegmentIDs := make([]UniqueID, 0)
for _, segment := range colReplica.segments {
if segment.getType() == segType {
targetCollectionIDs = append(targetCollectionIDs, segment.collectionID)
targetPartitionIDs = append(targetPartitionIDs, segment.partitionID)
targetSegmentIDs = append(targetSegmentIDs, segment.segmentID)
}
}
return targetCollectionIDs, targetPartitionIDs, targetSegmentIDs
}
func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(segment *Segment) error { func (colReplica *collectionReplicaImpl) replaceGrowingSegmentBySealedSegment(segment *Segment) error {
colReplica.mu.Lock() colReplica.mu.Lock()
defer colReplica.mu.Unlock() defer colReplica.mu.Unlock()
......
...@@ -41,8 +41,9 @@ type loadIndex struct { ...@@ -41,8 +41,9 @@ type loadIndex struct {
} }
func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) { func (loader *indexLoader) doLoadIndex(wg *sync.WaitGroup) {
collectionIDs, _, segmentIDs := loader.replica.getEnabledSealedSegmentsBySegmentType(segTypeSealed) collectionIDs, _, segmentIDs := loader.replica.getSealedSegmentsBySegmentType(segTypeSealed)
if len(collectionIDs) <= 0 { if len(collectionIDs) <= 0 {
wg.Done()
return return
} }
fmt.Println("do load index for sealed segments:", segmentIDs) fmt.Println("do load index for sealed segments:", segmentIDs)
......
...@@ -40,8 +40,9 @@ func (s *loadService) close() { ...@@ -40,8 +40,9 @@ func (s *loadService) close() {
} }
func (s *loadService) loadSegmentActively(wg *sync.WaitGroup) { func (s *loadService) loadSegmentActively(wg *sync.WaitGroup) {
collectionIDs, partitionIDs, segmentIDs := s.segLoader.replica.getEnabledSealedSegmentsBySegmentType(segTypeGrowing) collectionIDs, partitionIDs, segmentIDs := s.segLoader.replica.getSealedSegmentsBySegmentType(segTypeGrowing)
if len(collectionIDs) <= 0 { if len(collectionIDs) <= 0 {
wg.Done()
return return
} }
fmt.Println("do load segment for growing segments:", segmentIDs) fmt.Println("do load segment for growing segments:", segmentIDs)
...@@ -92,7 +93,7 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni ...@@ -92,7 +93,7 @@ func (s *loadService) loadSegmentInternal(collectionID UniqueID, partitionID Uni
_, buildID, errIndex := s.segLoader.indexLoader.getIndexInfo(collectionID, segmentID) _, buildID, errIndex := s.segLoader.indexLoader.getIndexInfo(collectionID, segmentID)
if errIndex == nil { if errIndex == nil {
// we don't need load to vector fields // we don't need load to vector fields
vectorFields, err := s.segLoader.replica.getVecFieldIDsByCollectionID(segmentID) vectorFields, err := s.segLoader.replica.getVecFieldIDsByCollectionID(collectionID)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -146,14 +146,14 @@ func (node *QueryNode) Start() error { ...@@ -146,14 +146,14 @@ func (node *QueryNode) Start() error {
// init services and manager // init services and manager
node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica) node.dataSyncService = newDataSyncService(node.queryNodeLoopCtx, node.replica)
node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica) node.searchService = newSearchService(node.queryNodeLoopCtx, node.replica)
node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica) //node.metaService = newMetaService(node.queryNodeLoopCtx, node.replica)
node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream) node.loadService = newLoadService(node.queryNodeLoopCtx, node.masterClient, node.dataClient, node.indexClient, node.replica, node.dataSyncService.dmStream)
node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan) node.statsService = newStatsService(node.queryNodeLoopCtx, node.replica, node.loadService.segLoader.indexLoader.fieldStatsChan)
// start services // start services
go node.dataSyncService.start() go node.dataSyncService.start()
go node.searchService.start() go node.searchService.start()
go node.metaService.start() //go node.metaService.start()
go node.loadService.start() go node.loadService.start()
go node.statsService.start() go node.statsService.start()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册