未验证 提交 abd250da 编写于 作者: Y yah01 提交者: GitHub

Fix QueryNode is not able to recovery (#24300)

Signed-off-by: Nyah01 <yang.cen@zilliz.com>
上级 7f96dd85
......@@ -226,6 +226,7 @@ message WatchDmChannelsRequest {
// for node down load balance, need to remove offline node in time after every watchDmChannel finish.
int64 offlineNodeID = 11;
int64 version = 12;
repeated index.IndexInfo index_info_list = 13;
}
message UnsubDmChannelRequest {
......
......@@ -222,6 +222,32 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
log.Info("channel already subscribed")
return util.SuccessStatus(), nil
}
fieldIndexMetas := make([]*segcorepb.FieldIndexMeta, 0)
for _, info := range req.GetIndexInfoList() {
fieldIndexMetas = append(fieldIndexMetas, &segcorepb.FieldIndexMeta{
CollectionID: info.GetCollectionID(),
FieldID: info.GetFieldID(),
IndexName: info.GetIndexName(),
TypeParams: info.GetTypeParams(),
IndexParams: info.GetIndexParams(),
IsAutoIndex: info.GetIsAutoIndex(),
UserIndexParams: info.GetUserIndexParams(),
})
}
sizePerRecord, err := typeutil.EstimateSizePerRecord(req.Schema)
maxIndexRecordPerSegment := int64(0)
if err != nil || sizePerRecord == 0 {
log.Warn("failed to transfer segment size to collection, because failed to estimate size per record", zap.Error(err))
} else {
threshold := paramtable.Get().DataCoordCfg.SegmentMaxSize.GetAsFloat() * 1024 * 1024
proportion := paramtable.Get().DataCoordCfg.SegmentSealProportion.GetAsFloat()
maxIndexRecordPerSegment = int64(threshold * proportion / float64(sizePerRecord))
}
node.manager.Collection.Put(req.GetCollectionID(), req.GetSchema(), &segcorepb.CollectionIndexMeta{
IndexMetas: fieldIndexMetas,
MaxIndexRowCount: maxIndexRecordPerSegment,
}, req.GetLoadMeta())
delegator, err := delegator.NewShardDelegator(req.GetCollectionID(), req.GetReplicaID(), channel.GetChannelName(), req.GetVersion(),
node.clusterManager, node.manager, node.tSafeManager, node.loader, node.factory, channel.GetSeekPosition().GetTimestamp())
if err != nil {
......@@ -430,6 +456,8 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
return node.loadDeltaLogs(ctx, req), nil
}
node.manager.Collection.Put(req.GetCollectionID(), req.GetSchema(), nil, req.GetLoadMeta())
// Delegates request to workers
if req.GetNeedTransfer() {
delegator, ok := node.delegators.Get(segment.GetInsertChannel())
......
......@@ -53,6 +53,7 @@ type ServiceSuite struct {
msgChan chan *msgstream.MsgPack
collectionID int64
collectionName string
schema *schemapb.CollectionSchema
partitionIDs []int64
// Test segments
validSegmentIDs []int64
......@@ -124,15 +125,6 @@ func (suite *ServiceSuite) SetupTest() {
// start node
err = suite.node.Start()
suite.NoError(err)
// init collection
schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64)
LoadMeta := &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
CollectionID: suite.collectionID,
PartitionIDs: suite.partitionIDs,
}
suite.node.manager.Collection.Put(suite.collectionID, schema, nil, LoadMeta)
}
func (suite *ServiceSuite) TearDownTest() {
......@@ -250,6 +242,12 @@ func (suite *ServiceSuite) TestWatchDmChannelsInt64() {
DroppedSegmentIds: suite.droppedSegmentIDs,
},
},
Schema: segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64),
LoadMeta: &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
CollectionID: suite.collectionID,
PartitionIDs: suite.partitionIDs,
},
}
// mocks
......@@ -292,6 +290,12 @@ func (suite *ServiceSuite) TestWatchDmChannelsVarchar() {
DroppedSegmentIds: suite.droppedSegmentIDs,
},
},
Schema: segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_VarChar),
LoadMeta: &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
CollectionID: suite.collectionID,
PartitionIDs: suite.partitionIDs,
},
}
// mocks
......@@ -334,6 +338,7 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() {
DroppedSegmentIds: suite.droppedSegmentIDs,
},
},
Schema: segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64),
}
// init msgstream failed
......@@ -470,6 +475,7 @@ func (suite *ServiceSuite) TestLoadSegments_Int64() {
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
NeedTransfer: true,
}
......@@ -500,6 +506,7 @@ func (suite *ServiceSuite) TestLoadSegments_VarChar() {
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
DeltaPositions: []*msgpb.MsgPosition{{Timestamp: 20000}},
NeedTransfer: true,
}
......@@ -523,6 +530,7 @@ func (suite *ServiceSuite) TestLoadDeltaInt64() {
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
NeedTransfer: true,
LoadScope: querypb.LoadScope_Delta,
}
......@@ -546,6 +554,7 @@ func (suite *ServiceSuite) TestLoadDeltaVarchar() {
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
NeedTransfer: true,
LoadScope: querypb.LoadScope_Delta,
}
......@@ -568,6 +577,7 @@ func (suite *ServiceSuite) TestLoadSegments_Failed() {
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
NeedTransfer: true,
}
......@@ -609,6 +619,7 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
NeedTransfer: true,
}
......@@ -629,6 +640,7 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
NeedTransfer: true,
}
......@@ -654,6 +666,7 @@ func (suite *ServiceSuite) TestLoadSegments_Transfer() {
CollectionID: suite.collectionID,
DstNodeID: suite.node.session.ServerID,
Infos: suite.genSegmentLoadInfos(schema),
Schema: schema,
NeedTransfer: true,
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册