未验证 提交 2a2d32c4 编写于 作者: C congqixia 提交者: GitHub

Fix datacoord panics when collection info not found (#19706)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 10c03de6
......@@ -66,6 +66,7 @@ type compactionSignal struct {
var _ trigger = (*compactionTrigger)(nil)
type compactionTrigger struct {
handler Handler
meta *meta
allocator allocator
signals chan *compactionSignal
......@@ -85,6 +86,7 @@ func newCompactionTrigger(
allocator allocator,
segRefer *SegmentReferenceManager,
indexCoord types.IndexCoord,
handler Handler,
) *compactionTrigger {
return &compactionTrigger{
meta: meta,
......@@ -94,6 +96,7 @@ func newCompactionTrigger(
segRefer: segRefer,
indexCoord: indexCoord,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
handler: handler,
}
}
......@@ -168,9 +171,11 @@ func (t *compactionTrigger) allocTs() (Timestamp, error) {
}
func (t *compactionTrigger) getCompactTime(ts Timestamp, collectionID UniqueID) (*compactTime, error) {
coll := t.meta.GetCollection(collectionID)
if coll == nil {
return nil, fmt.Errorf("collection ID %d not found", collectionID)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
coll, err := t.handler.GetCollection(ctx, collectionID)
if err != nil {
return nil, fmt.Errorf("collection ID %d not found, err: %w", collectionID, err)
}
collectionTTL, err := getCollectionTTL(coll.Properties)
......@@ -256,8 +261,10 @@ func (t *compactionTrigger) allocSignalID() (UniqueID, error) {
}
func (t *compactionTrigger) estimateDiskSegmentMaxNumOfRows(collectionID UniqueID) (int, error) {
collMeta := t.meta.GetCollection(collectionID)
if collMeta == nil {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
collMeta, err := t.handler.GetCollection(ctx, collectionID)
if err != nil {
return -1, fmt.Errorf("failed to get collection %d", collectionID)
}
......@@ -327,7 +334,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
break
}
group.segments = FilterInIndexedSegments(t.meta, t.indexCoord, group.segments...)
group.segments = FilterInIndexedSegments(t.handler, t.indexCoord, group.segments...)
err := t.updateSegmentMaxSize(group.segments)
if err != nil {
......@@ -591,7 +598,7 @@ func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int)
func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo {
segments := t.meta.GetSegmentsByChannel(channel)
segments = FilterInIndexedSegments(t.meta, t.indexCoord, segments...)
segments = FilterInIndexedSegments(t.handler, t.indexCoord, segments...)
var res []*SegmentInfo
for _, s := range segments {
if !isSegmentHealthy(s) ||
......
......@@ -345,6 +345,7 @@ func Test_compactionTrigger_force(t *testing.T) {
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
......@@ -370,6 +371,7 @@ func Test_compactionTrigger_force(t *testing.T) {
}
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
......@@ -397,6 +399,7 @@ func Test_compactionTrigger_force(t *testing.T) {
}
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
......@@ -429,6 +432,7 @@ func Test_compactionTrigger_force(t *testing.T) {
}
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
......@@ -461,6 +465,7 @@ func Test_compactionTrigger_force(t *testing.T) {
}
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
......@@ -493,6 +498,7 @@ func Test_compactionTrigger_force(t *testing.T) {
}
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
......@@ -519,6 +525,7 @@ func Test_compactionTrigger_force(t *testing.T) {
indexCood := newMockIndexCoord()
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: &FailsAllocator{allocIDSucceed: true},
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
......@@ -578,6 +585,7 @@ func Test_compactionTrigger_force(t *testing.T) {
}
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
......@@ -772,6 +780,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
......@@ -946,6 +955,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
......@@ -1137,6 +1147,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) {
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
......@@ -1258,6 +1269,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
tr := &compactionTrigger{
meta: tt.fields.meta,
handler: newMockHandlerWithMeta(tt.fields.meta),
allocator: tt.fields.allocator,
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
......@@ -1308,7 +1320,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
indexCoord := newMockIndexCoord()
trigger := newCompactionTrigger(&meta{}, &compactionPlanHandler{}, newMockAllocator(),
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord)
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord, newMockHandler())
// Test too many files.
var binlogs []*datapb.FieldBinlog
......@@ -1442,7 +1454,7 @@ func Test_newCompactionTrigger(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
indexCoord := newMockIndexCoord()
got := newCompactionTrigger(tt.args.meta, tt.args.compactionHandler, tt.args.allocator,
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord)
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord, newMockHandler())
assert.Equal(t, tt.args.meta, got.meta)
assert.Equal(t, tt.args.compactionHandler, got.compactionHandler)
assert.Equal(t, tt.args.allocator, got.allocator)
......@@ -1454,7 +1466,7 @@ func Test_handleSignal(t *testing.T) {
indexCoord := newMockIndexCoord()
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(),
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord)
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, indexCoord, newMockHandler())
signal := &compactionSignal{
segmentID: 1,
}
......@@ -1465,13 +1477,13 @@ func Test_handleSignal(t *testing.T) {
func Test_allocTs(t *testing.T) {
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(),
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil)
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil, newMockHandler())
ts, err := got.allocTs()
assert.NoError(t, err)
assert.True(t, ts > 0)
got = newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, &FailsAllocator{},
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil)
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil, newMockHandler())
ts, err = got.allocTs()
assert.Error(t, err)
assert.Equal(t, uint64(0), ts)
......@@ -1499,7 +1511,11 @@ func Test_getCompactTime(t *testing.T) {
m := &meta{segments: NewSegmentsInfo(), collections: collections}
got := newCompactionTrigger(m, &compactionPlanHandler{}, newMockAllocator(),
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil)
&SegmentReferenceManager{segmentsLock: map[UniqueID]map[UniqueID]*datapb.SegmentReferenceLock{}}, nil, &ServerHandler{
&Server{
meta: m,
},
})
now := tsoutil.GetCurrentTime()
ct, err := got.getCompactTime(now, 1)
......
......@@ -54,6 +54,7 @@ type GcOption struct {
type garbageCollector struct {
option GcOption
meta *meta
handler Handler
segRefer *SegmentReferenceManager
indexCoord types.IndexCoord
......@@ -64,11 +65,12 @@ type garbageCollector struct {
}
// newGarbageCollector create garbage collector with meta and option
func newGarbageCollector(meta *meta, segRefer *SegmentReferenceManager, indexCoord types.IndexCoord, opt GcOption) *garbageCollector {
func newGarbageCollector(meta *meta, handler Handler, segRefer *SegmentReferenceManager, indexCoord types.IndexCoord, opt GcOption) *garbageCollector {
log.Info("GC with option", zap.Bool("enabled", opt.enabled), zap.Duration("interval", opt.checkInterval),
zap.Duration("missingTolerance", opt.missingTolerance), zap.Duration("dropTolerance", opt.dropTolerance))
return &garbageCollector{
meta: meta,
handler: handler,
segRefer: segRefer,
indexCoord: indexCoord,
option: opt,
......@@ -194,7 +196,7 @@ func (gc *garbageCollector) clearEtcd() {
droppedCompactTo[to] = struct{}{}
}
}
indexedSegments := FilterInIndexedSegments(gc.meta, gc.indexCoord, lo.Keys(droppedCompactTo)...)
indexedSegments := FilterInIndexedSegments(gc.handler, gc.indexCoord, lo.Keys(droppedCompactTo)...)
indexedSet := make(typeutil.UniqueSet)
for _, segment := range indexedSegments {
indexedSet.Insert(segment.GetID())
......
......@@ -59,7 +59,7 @@ func Test_garbageCollector_basic(t *testing.T) {
indexCoord := mocks.NewMockIndexCoord(t)
t.Run("normal gc", func(t *testing.T) {
gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
cli: cli,
enabled: true,
checkInterval: time.Millisecond * 10,
......@@ -75,7 +75,7 @@ func Test_garbageCollector_basic(t *testing.T) {
})
t.Run("with nil cli", func(t *testing.T) {
gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
cli: nil,
enabled: true,
checkInterval: time.Millisecond * 10,
......@@ -137,7 +137,7 @@ func Test_garbageCollector_scan(t *testing.T) {
2: 1,
},
}
gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
cli: cli,
enabled: true,
checkInterval: time.Minute * 30,
......@@ -158,7 +158,7 @@ func Test_garbageCollector_scan(t *testing.T) {
})
t.Run("missing all but save tolerance", func(t *testing.T) {
gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
cli: cli,
enabled: true,
checkInterval: time.Minute * 30,
......@@ -183,7 +183,7 @@ func Test_garbageCollector_scan(t *testing.T) {
err = meta.AddSegment(segment)
require.NoError(t, err)
gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
cli: cli,
enabled: true,
checkInterval: time.Minute * 30,
......@@ -211,7 +211,7 @@ func Test_garbageCollector_scan(t *testing.T) {
err = meta.AddSegment(segment)
require.NoError(t, err)
gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
cli: cli,
enabled: true,
checkInterval: time.Minute * 30,
......@@ -227,7 +227,7 @@ func Test_garbageCollector_scan(t *testing.T) {
gc.close()
})
t.Run("missing gc all", func(t *testing.T) {
gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
cli: cli,
enabled: true,
checkInterval: time.Minute * 30,
......@@ -248,7 +248,7 @@ func Test_garbageCollector_scan(t *testing.T) {
})
t.Run("list object with error", func(t *testing.T) {
gc := newGarbageCollector(meta, segRefer, indexCoord, GcOption{
gc := newGarbageCollector(meta, newMockHandler(), segRefer, indexCoord, GcOption{
cli: cli,
enabled: true,
checkInterval: time.Minute * 30,
......
......@@ -36,6 +36,7 @@ type Handler interface {
GetDataVChanPositions(channel *channel, partitionID UniqueID) *datapb.VchannelInfo
CheckShouldDropChannel(channel string) bool
FinishDropChannel(channel string)
GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
}
// ServerHandler is a helper of Server
......@@ -97,8 +98,8 @@ func (h *ServerHandler) GetDataVChanPositions(channel *channel, partitionID Uniq
// use collection start position when segment position is not found
if seekPosition == nil {
if channel.StartPositions == nil {
collection := h.GetCollection(h.s.ctx, channel.CollectionID)
if collection != nil {
collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
if collection != nil && err == nil {
seekPosition = getCollectionStartPosition(channel.Name, collection)
}
} else {
......@@ -126,7 +127,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni
return s.InsertChannel == channel.Name
})
segmentInfos := make(map[int64]*SegmentInfo)
indexedSegments := FilterInIndexedSegments(h.s.meta, h.s.indexCoord, segments...)
indexedSegments := FilterInIndexedSegments(h, h.s.indexCoord, segments...)
indexed := make(typeutil.UniqueSet)
for _, segment := range indexedSegments {
indexed.Insert(segment.GetID())
......@@ -201,8 +202,8 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionID Uni
// use collection start position when segment position is not found
if seekPosition == nil {
if channel.StartPositions == nil {
collection := h.GetCollection(h.s.ctx, channel.CollectionID)
if collection != nil {
collection, err := h.GetCollection(h.s.ctx, channel.CollectionID)
if collection != nil && err == nil {
seekPosition = getCollectionStartPosition(channel.Name, collection)
}
} else {
......@@ -255,17 +256,18 @@ func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
}
// GetCollection returns collection info with specified collection id
func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) *collectionInfo {
func (h *ServerHandler) GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) {
coll := h.s.meta.GetCollection(collectionID)
if coll != nil {
return coll
return coll, nil
}
err := h.s.loadCollectionFromRootCoord(ctx, collectionID)
if err != nil {
log.Warn("failed to load collection from rootcoord", zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, err
}
return h.s.meta.GetCollection(collectionID)
return h.s.meta.GetCollection(collectionID), nil
}
// CheckShouldDropChannel returns whether specified channel is marked to be removed
......
......@@ -730,6 +730,7 @@ func (m *mockRootCoordService) ListPolicy(ctx context.Context, in *internalpb.Li
}
type mockHandler struct {
meta *meta
}
func newMockHandler() *mockHandler {
......@@ -756,6 +757,20 @@ func (h *mockHandler) CheckShouldDropChannel(channel string) bool {
func (h *mockHandler) FinishDropChannel(channel string) {}
func (h *mockHandler) GetCollection(_ context.Context, collectionID UniqueID) (*collectionInfo, error) {
// empty schema
if h.meta != nil {
return h.meta.GetCollection(collectionID), nil
}
return &collectionInfo{ID: collectionID}, nil
}
func newMockHandlerWithMeta(meta *meta) *mockHandler {
return &mockHandler{
meta: meta,
}
}
type mockIndexCoord struct {
types.IndexCoord
}
......
......@@ -374,6 +374,7 @@ func (s *SegmentManager) openNewSegment(ctx context.Context, collectionID Unique
}
func (s *SegmentManager) estimateMaxNumOfRows(collectionID UniqueID) (int, error) {
// it's ok to use meta.GetCollection here, since collection meta is set before using segmentManager
collMeta := s.meta.GetCollection(collectionID)
if collMeta == nil {
return -1, fmt.Errorf("failed to get collection %d", collectionID)
......
......@@ -366,7 +366,7 @@ func (s *Server) stopCompactionHandler() {
}
func (s *Server) createCompactionTrigger() {
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.segReferManager, s.indexCoord)
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.segReferManager, s.indexCoord, s.handler)
}
func (s *Server) stopCompactionTrigger() {
......@@ -384,7 +384,7 @@ func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) {
}
func (s *Server) initGarbageCollection(cli storage.ChunkManager) {
s.garbageCollector = newGarbageCollector(s.meta, s.segReferManager, s.indexCoord, GcOption{
s.garbageCollector = newGarbageCollector(s.meta, s.handler, s.segReferManager, s.indexCoord, GcOption{
cli: cli,
enabled: Params.DataCoordCfg.EnableGarbageCollection,
checkInterval: Params.DataCoordCfg.GCInterval,
......
......@@ -150,12 +150,9 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
zap.Int64("import task ID", r.GetImportTaskID()))
// Load the collection info from Root Coordinator, if it is not found in server meta.
if s.meta.GetCollection(r.GetCollectionID()) == nil {
err := s.loadCollectionFromRootCoord(ctx, r.GetCollectionID())
if err != nil {
log.Warn("failed to load collection in alloc segment", zap.Any("request", r), zap.Error(err))
continue
}
_, err := s.handler.GetCollection(ctx, r.GetCollectionID())
if err != nil {
log.Warn("cannot get collection schema", zap.Error(err))
}
// Add the channel to cluster for watching.
......
......@@ -102,7 +102,7 @@ func GetCompactTime(ctx context.Context, allocator allocator) (*compactTime, err
return &compactTime{ttRetentionLogic, 0, 0}, nil
}
func FilterInIndexedSegments(meta *meta, indexCoord types.IndexCoord, segments ...*SegmentInfo) []*SegmentInfo {
func FilterInIndexedSegments(handler Handler, indexCoord types.IndexCoord, segments ...*SegmentInfo) []*SegmentInfo {
if len(segments) == 0 {
return nil
}
......@@ -118,8 +118,14 @@ func FilterInIndexedSegments(meta *meta, indexCoord types.IndexCoord, segments .
collectionSegments[collectionID] = append(collectionSegments[collectionID], segment.GetID())
}
for collection := range collectionSegments {
schema := meta.GetCollection(collection).Schema
for _, field := range schema.GetFields() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
coll, err := handler.GetCollection(ctx, collection)
cancel()
if err != nil {
log.Warn("failed to get collection schema", zap.Error(err))
continue
}
for _, field := range coll.Schema.GetFields() {
if field.GetDataType() == schemapb.DataType_BinaryVector ||
field.GetDataType() == schemapb.DataType_FloatVector {
vecFieldID[collection] = field.GetFieldID()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册