未验证 提交 27808850 编写于 作者: C congqixia 提交者: GitHub

Check leader ready before transfer load/release (#23190)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 051d8027
......@@ -28,6 +28,17 @@ func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmen
)
log.Info("LoadSegment start to transfer load with shard cluster")
_, err := node.queryShardService.getQueryShard(shard)
if err != nil {
log.Warn("TransferLoad failed, failed to get query shard",
zap.String("vChannel", shard),
zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NotShardLeader,
Reason: err.Error(),
}, nil
}
shardCluster, ok := node.ShardClusterService.getShardCluster(shard)
if !ok {
log.Warn("TransferLoad failed to find shard cluster")
......@@ -38,7 +49,7 @@ func (node *QueryNode) TransferLoad(ctx context.Context, req *querypb.LoadSegmen
}
req.NeedTransfer = false
err := shardCluster.LoadSegments(ctx, req)
err = shardCluster.LoadSegments(ctx, req)
if err != nil {
if errors.Is(err, ErrInsufficientMemory) {
log.Warn("insufficient memory when shard cluster load segments", zap.Error(err))
......@@ -71,6 +82,18 @@ func (node *QueryNode) TransferRelease(ctx context.Context, req *querypb.Release
log.Info("ReleaseSegments start to transfer release with shard cluster")
shard := req.GetShard()
_, err := node.queryShardService.getQueryShard(shard)
if err != nil {
log.Warn("TransferRelease failed, failed to get query shard",
zap.String("vChannel", shard),
zap.Error(err))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NotShardLeader,
Reason: err.Error(),
}, nil
}
shardCluster, ok := node.ShardClusterService.getShardCluster(req.GetShard())
if !ok {
log.Warn("TransferLoad failed to find shard cluster")
......@@ -81,7 +104,7 @@ func (node *QueryNode) TransferRelease(ctx context.Context, req *querypb.Release
}
req.NeedTransfer = false
err := shardCluster.ReleaseSegments(ctx, req, false)
err = shardCluster.ReleaseSegments(ctx, req, false)
if err != nil {
log.Warn("shard cluster failed to release segments", zap.Error(err))
return &commonpb.Status{
......
......@@ -42,6 +42,10 @@ func (s *ImplUtilsSuite) SetupSuite() {
s.querynode.UpdateStateCode(commonpb.StateCode_Healthy)
s.querynode.ShardClusterService = newShardClusterService(client, s.querynode.session, s.querynode)
s.querynode.queryShardService = &queryShardService{
cancel: func() {},
queryShards: make(map[string]*queryShard),
}
}
func (s *ImplUtilsSuite) TearDownSuite() {
......@@ -62,6 +66,7 @@ func (s *ImplUtilsSuite) SetupTest() {
}, &mockSegmentDetector{}, buildMockQueryNode)
s.querynode.ShardClusterService.clusters.Store(defaultChannelName, cs)
s.querynode.queryShardService.queryShards[defaultChannelName] = &queryShard{}
cs.SetupFirstVersion()
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册