From 2a2d32c4fc486aee24a48eb1dfdfd73832c59cfb Mon Sep 17 00:00:00 2001 From: congqixia Date: Tue, 11 Oct 2022 21:39:24 +0800 Subject: [PATCH] Fix datacoord panics when collection info not found (#19706) Signed-off-by: Congqi Xia Signed-off-by: Congqi Xia --- internal/datacoord/compaction_trigger.go | 21 +++++++++----- internal/datacoord/compaction_trigger_test.go | 28 +++++++++++++++---- internal/datacoord/garbage_collector.go | 6 ++-- internal/datacoord/garbage_collector_test.go | 16 +++++------ internal/datacoord/handler.go | 18 ++++++------ internal/datacoord/mock_test.go | 15 ++++++++++ internal/datacoord/segment_manager.go | 1 + internal/datacoord/server.go | 4 +-- internal/datacoord/services.go | 9 ++---- internal/datacoord/util.go | 12 ++++++-- 10 files changed, 88 insertions(+), 42 deletions(-) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 651b26bb0..cd469394a 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -66,6 +66,7 @@ type compactionSignal struct { var _ trigger = (*compactionTrigger)(nil) type compactionTrigger struct { + handler Handler meta *meta allocator allocator signals chan *compactionSignal @@ -85,6 +86,7 @@ func newCompactionTrigger( allocator allocator, segRefer *SegmentReferenceManager, indexCoord types.IndexCoord, + handler Handler, ) *compactionTrigger { return &compactionTrigger{ meta: meta, @@ -94,6 +96,7 @@ func newCompactionTrigger( segRefer: segRefer, indexCoord: indexCoord, estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex, + handler: handler, } } @@ -168,9 +171,11 @@ func (t *compactionTrigger) allocTs() (Timestamp, error) { } func (t *compactionTrigger) getCompactTime(ts Timestamp, collectionID UniqueID) (*compactTime, error) { - coll := t.meta.GetCollection(collectionID) - if coll == nil { - return nil, fmt.Errorf("collection ID %d not found", collectionID) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + coll, err := t.handler.GetCollection(ctx, collectionID) + if err != nil { + return nil, fmt.Errorf("collection ID %d not found, err: %w", collectionID, err) } collectionTTL, err := getCollectionTTL(coll.Properties) @@ -256,8 +261,10 @@ func (t *compactionTrigger) allocSignalID() (UniqueID, error) { } func (t *compactionTrigger) estimateDiskSegmentMaxNumOfRows(collectionID UniqueID) (int, error) { - collMeta := t.meta.GetCollection(collectionID) - if collMeta == nil { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + collMeta, err := t.handler.GetCollection(ctx, collectionID) + if err != nil { return -1, fmt.Errorf("failed to get collection %d", collectionID) } @@ -327,7 +334,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) { break } - group.segments = FilterInIndexedSegments(t.meta, t.indexCoord, group.segments...) + group.segments = FilterInIndexedSegments(t.handler, t.indexCoord, group.segments...) err := t.updateSegmentMaxSize(group.segments) if err != nil { @@ -591,7 +598,7 @@ func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int) func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo { segments := t.meta.GetSegmentsByChannel(channel) - segments = FilterInIndexedSegments(t.meta, t.indexCoord, segments...) + segments = FilterInIndexedSegments(t.handler, t.indexCoord, segments...) var res []*SegmentInfo for _, s := range segments { if !isSegmentHealthy(s) || diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 337d912c0..47d76c2fc 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -345,6 +345,7 @@ func Test_compactionTrigger_force(t *testing.T) { tr := &compactionTrigger{ meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), allocator: tt.fields.allocator, signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, @@ -370,6 +371,7 @@ func Test_compactionTrigger_force(t *testing.T) { } tr := &compactionTrigger{ meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), allocator: tt.fields.allocator, signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, @@ -397,6 +399,7 @@ func Test_compactionTrigger_force(t *testing.T) { } tr := &compactionTrigger{ meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), allocator: tt.fields.allocator, signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, @@ -429,6 +432,7 @@ func Test_compactionTrigger_force(t *testing.T) { } tr := &compactionTrigger{ meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), allocator: tt.fields.allocator, signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, @@ -461,6 +465,7 @@ func Test_compactionTrigger_force(t *testing.T) { } tr := &compactionTrigger{ meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), allocator: tt.fields.allocator, signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, @@ -493,6 +498,7 @@ func Test_compactionTrigger_force(t *testing.T) { } tr := &compactionTrigger{ meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), allocator: tt.fields.allocator, signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, @@ -519,6 +525,7 @@ func Test_compactionTrigger_force(t *testing.T) { indexCood := newMockIndexCoord() tr := &compactionTrigger{ meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), allocator: &FailsAllocator{allocIDSucceed: true}, signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, @@ -578,6 +585,7 @@ func Test_compactionTrigger_force(t *testing.T) { } tr := &compactionTrigger{ meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), allocator: tt.fields.allocator, signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, @@ -772,6 +780,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { tr := &compactionTrigger{ meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), allocator: tt.fields.allocator, signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, @@ -946,6 +955,7 @@ func Test_compactionTrigger_noplan(t *testing.T) { tr := &compactionTrigger{ meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), allocator: tt.fields.allocator, signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, @@ -1137,6 +1147,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) { tr := &compactionTrigger{ meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), allocator: tt.fields.allocator, signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, @@ -1258,6 +1269,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) { tr := &compactionTrigger{ meta: tt.fields.meta, + handler: newMockHandlerWithMeta(tt.fields.meta), allocator: tt.fields.allocator, signals: tt.fields.signals, compactionHandler: tt.fields.compactionHandler, @@ -1308,7 +1320,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) { indexCoord := newMockIndexCoord() trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(), - &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord) + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord, newMockHandler()) // Test too many files. var binlogs []*datapb.FieldBinlog @@ -1442,7 +1454,7 @@ func Test_newCompactionTrigger(t *testing.T) { t.Run(tt.name, func(t *testing.T) { indexCoord := newMockIndexCoord() got := newCompactionTrigger(tt.args.meta, tt.args.compactionHandler, tt.args.allocator, - &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord) + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord, newMockHandler()) assert.Equal(t, tt.args.meta, got.meta) assert.Equal(t, tt.args.compactionHandler, got.compactionHandler) assert.Equal(t, tt.args.allocator, got.allocator) @@ -1454,7 +1466,7 @@ func Test_handleSignal(t *testing.T) { indexCoord := newMockIndexCoord() got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), - &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord) + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord, newMockHandler()) signal := &compactionSignal{ segmentID: 1, } @@ -1465,13 +1477,13 @@ func Test_handleSignal(t *testing.T) { func Test_allocTs(t *testing.T) { got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), - &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil) + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil, newMockHandler()) ts, err := got.allocTs() assert.NoError(t, err) assert.True(t, ts > 0) got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, &FailsAllocator{}, - &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil) + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil, newMockHandler()) ts, err = got.allocTs() assert.Error(t, err) assert.Equal(t, uint64(0), ts) @@ -1499,7 +1511,11 @@ func Test_getCompactTime(t *testing.T) { m := &meta{segments: NewSegmentsInfo(), collections: collections} got := newCompactionTrigger(m, &compactionPlanHandler{}, newMockAllocator(), - &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil) + &SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil, &ServerHandler{ + &Server{ + meta: m, + }, + }) now := tsoutil.GetCurrentTime() ct, err := got.getCompactTime(now, 1) diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 0b83f9f0e..e56edba0d 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -54,6 +54,7 @@ type GcOption struct { type garbageCollector struct { option GcOption meta *meta + handler Handler segRefer *SegmentReferenceManager indexCoord types.IndexCoord @@ -64,11 +65,12 @@ type garbageCollector struct { } // newGarbageCollector create garbage collector with meta and option -func newGarbageCollector(meta *meta, segRefer *SegmentReferenceManager, indexCoord types.IndexCoord, opt GcOption) *garbageCollector { +func newGarbageCollector(meta *meta, handler Handler, segRefer *SegmentReferenceManager, indexCoord types.IndexCoord, opt GcOption) *garbageCollector { log.Info("GC with option", zap.Bool("enabled", opt.enabled), zap.Duration("interval", opt.checkInterval), zap.Duration("missingTolerance", opt.missingTolerance), zap.Duration("dropTolerance", opt.dropTolerance)) return &garbageCollector{ meta: meta, + handler: handler, segRefer: segRefer, indexCoord: indexCoord, option: opt, @@ -194,7 +196,7 @@ func (gc *garbageCollector) clearEtcd() { droppedCompactTo[to] = struct{}{} } } - indexedSegments := FilterInIndexedSegments(gc.meta, gc.indexCoord, lo.Keys(droppedCompactTo)...) + indexedSegments := FilterInIndexedSegments(gc.handler, gc.indexCoord, lo.Keys(droppedCompactTo)...) indexedSet := make(typeutil.UniqueSet) for _, segment := range indexedSegments { indexedSet.Insert(segment.GetID()) diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 5e83374d1..ddef38c29 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -59,7 +59,7 @@ func Test_garbageCollector_basic(t *testing.T) { indexCoord := mocks.NewMockIndexCoord(t) t.Run("normal gc", func(t *testing.T) { - gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ + gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{ cli: cli, enabled: true, checkInterval: time.Millisecond * 10, @@ -75,7 +75,7 @@ func Test_garbageCollector_basic(t *testing.T) { }) t.Run("with nil cli", func(t *testing.T) { - gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ + gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{ cli: nil, enabled: true, checkInterval: time.Millisecond * 10, @@ -137,7 +137,7 @@ func Test_garbageCollector_scan(t *testing.T) { 2: 1, }, } - gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ + gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{ cli: cli, enabled: true, checkInterval: time.Minute * 30, @@ -158,7 +158,7 @@ func Test_garbageCollector_scan(t *testing.T) { }) t.Run("missing all but save tolerance", func(t *testing.T) { - gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ + gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{ cli: cli, enabled: true, checkInterval: time.Minute * 30, @@ -183,7 +183,7 @@ func Test_garbageCollector_scan(t *testing.T) { err = meta.AddSegment(segment) require.NoError(t, err) - gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ + gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{ cli: cli, enabled: true, checkInterval: time.Minute * 30, @@ -211,7 +211,7 @@ func Test_garbageCollector_scan(t *testing.T) { err = meta.AddSegment(segment) require.NoError(t, err) - gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ + gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{ cli: cli, enabled: true, checkInterval: time.Minute * 30, @@ -227,7 +227,7 @@ func Test_garbageCollector_scan(t *testing.T) { gc.close() }) t.Run("missing gc all", func(t *testing.T) { - gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ + gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{ cli: cli, enabled: true, checkInterval: time.Minute * 30, @@ -248,7 +248,7 @@ func Test_garbageCollector_scan(t *testing.T) { }) t.Run("list object with error", func(t *testing.T) { - gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{ + gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{ cli: cli, enabled: true, checkInterval: time.Minute * 30, diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 26881260c..b1b618089 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -36,6 +36,7 @@ type Handler interface { GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo CheckShouldDropChannel(channel string) bool FinishDropChannel(channel string) + GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) } // ServerHandler is a helper of Server @@ -97,8 +98,8 @@ func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID Uniq // use collection start position when segment position is not found if seekPosition == nil { if channel.StartPositions == nil { - collection := h.GetCollection(h.s.ctx, channel.CollectionID) - if collection != nil { + collection, err := h.GetCollection(h.s.ctx, channel.CollectionID) + if collection != nil && err == nil { seekPosition = getCollectionStartPosition(channel.Name, collection) } } else { @@ -126,7 +127,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni return s.InsertChannel == channel.Name }) segmentInfos := make(map[int64]*SegmentInfo) - indexedSegments := FilterInIndexedSegments(h.s.meta, h.s.indexCoord, segments...) + indexedSegments := FilterInIndexedSegments(h, h.s.indexCoord, segments...) indexed := make(typeutil.UniqueSet) for _, segment := range indexedSegments { indexed.Insert(segment.GetID()) @@ -201,8 +202,8 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni // use collection start position when segment position is not found if seekPosition == nil { if channel.StartPositions == nil { - collection := h.GetCollection(h.s.ctx, channel.CollectionID) - if collection != nil { + collection, err := h.GetCollection(h.s.ctx, channel.CollectionID) + if collection != nil && err == nil { seekPosition = getCollectionStartPosition(channel.Name, collection) } } else { @@ -255,17 +256,18 @@ func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo { } // GetCollection returns collection info with specified collection id -func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) *collectionInfo { +func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) { coll := h.s.meta.GetCollection(collectionID) if coll != nil { - return coll + return coll, nil } err := h.s.loadCollectionFromRootCoord(ctx, collectionID) if err != nil { log.Warn("failed to load collection from rootcoord", zap.Int64("collectionID", collectionID), zap.Error(err)) + return nil, err } - return h.s.meta.GetCollection(collectionID) + return h.s.meta.GetCollection(collectionID), nil } // CheckShouldDropChannel returns whether specified channel is marked to be removed diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index a8cb27385..fe61c3893 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -730,6 +730,7 @@ func (m *mockRootCoordService) ListPolicy(ctx context.Context, in *internalpb.Li } type mockHandler struct { + meta *meta } func newMockHandler() *mockHandler { @@ -756,6 +757,20 @@ func (h *mockHandler) CheckShouldDropChannel(channel string) bool { func (h *mockHandler) FinishDropChannel(channel string) {} +func (h *mockHandler) GetCollection(_ context.Context, collectionID UniqueID) (*collectionInfo, error) { + // empty schema + if h.meta != nil { + return h.meta.GetCollection(collectionID), nil + } + return &collectionInfo{ID: collectionID}, nil +} + +func newMockHandlerWithMeta(meta *meta) *mockHandler { + return &mockHandler{ + meta: meta, + } +} + type mockIndexCoord struct { types.IndexCoord } diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index d8591c041..eabc27ecf 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -374,6 +374,7 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique } func (s *SegmentManager) estimateMaxNumOfRows(collectionID UniqueID) (int, error) { + // it's ok to use meta.GetCollection here, since collection meta is set before using segmentManager collMeta := s.meta.GetCollection(collectionID) if collMeta == nil { return -1, fmt.Errorf("failed to get collection %d", collectionID) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 81af50673..22c220834 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -366,7 +366,7 @@ func (s *Server) stopCompactionHandler() { } func (s *Server) createCompactionTrigger() { - s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.segReferManager, s.indexCoord) + s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.segReferManager, s.indexCoord, s.handler) } func (s *Server) stopCompactionTrigger() { @@ -384,7 +384,7 @@ func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) { } func (s *Server) initGarbageCollection(cli storage.ChunkManager) { - s.garbageCollector = newGarbageCollector(s.meta, s.segReferManager, s.indexCoord, GcOption{ + s.garbageCollector = newGarbageCollector(s.meta, s.handler, s.segReferManager, s.indexCoord, GcOption{ cli: cli, enabled: Params.DataCoordCfg.EnableGarbageCollection, checkInterval: Params.DataCoordCfg.GCInterval, diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 4a13f7c0d..fbe898f1f 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -150,12 +150,9 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI zap.Int64("import task ID", r.GetImportTaskID())) // Load the collection info from Root Coordinator, if it is not found in server meta. - if s.meta.GetCollection(r.GetCollectionID()) == nil { - err := s.loadCollectionFromRootCoord(ctx, r.GetCollectionID()) - if err != nil { - log.Warn("failed to load collection in alloc segment", zap.Any("request", r), zap.Error(err)) - continue - } + _, err := s.handler.GetCollection(ctx, r.GetCollectionID()) + if err != nil { + log.Warn("cannot get collection schema", zap.Error(err)) } // Add the channel to cluster for watching. diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 10bb0850b..85e08a202 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -102,7 +102,7 @@ func GetCompactTime(ctx context.Context, allocator allocator) (*compactTime, err return &compactTime{ttRetentionLogic, 0, 0}, nil } -func FilterInIndexedSegments(meta *meta, indexCoord types.IndexCoord, segments ...*SegmentInfo) []*SegmentInfo { +func FilterInIndexedSegments(handler Handler, indexCoord types.IndexCoord, segments ...*SegmentInfo) []*SegmentInfo { if len(segments) == 0 { return nil } @@ -118,8 +118,14 @@ func FilterInIndexedSegments(meta *meta, indexCoord types.IndexCoord, segments . collectionSegments[collectionID] = append(collectionSegments[collectionID], segment.GetID()) } for collection := range collectionSegments { - schema := meta.GetCollection(collection).Schema - for _, field := range schema.GetFields() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + coll, err := handler.GetCollection(ctx, collection) + cancel() + if err != nil { + log.Warn("failed to get collection schema", zap.Error(err)) + continue + } + for _, field := range coll.Schema.GetFields() { if field.GetDataType() == schemapb.DataType_BinaryVector || field.GetDataType() == schemapb.DataType_FloatVector { vecFieldID[collection] = field.GetFieldID() -- GitLab