From b31f93730178be1c6e5e5638958f1566b19060bc Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Tue, 5 Jul 2022 10:12:20 +0800 Subject: [PATCH] Fix unsubscribe with subscription not exist (#18042) Signed-off-by: xiaofan-luan --- internal/mq/msgstream/mq_factory.go | 13 +++++-- internal/mq/msgstream/msgstream_util_test.go | 38 ++++++++++++++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) create mode 100644 internal/mq/msgstream/msgstream_util_test.go diff --git a/internal/mq/msgstream/mq_factory.go b/internal/mq/msgstream/mq_factory.go index 1d9d29a4e..80e86174b 100644 --- a/internal/mq/msgstream/mq_factory.go +++ b/internal/mq/msgstream/mq_factory.go @@ -18,8 +18,7 @@ package msgstream import ( "context" - - "go.uber.org/zap" + "strings" "github.com/apache/pulsar-client-go/pulsar" "github.com/milvus-io/milvus/internal/log" @@ -29,8 +28,11 @@ import ( rmqwrapper "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper/rmq" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/retry" + "github.com/streamnative/pulsarctl/pkg/cli" "github.com/streamnative/pulsarctl/pkg/cmdutils" "github.com/streamnative/pulsarctl/pkg/pulsar/utils" + + "go.uber.org/zap" ) // PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go) @@ -87,6 +89,13 @@ func (f *PmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, st } err = admin.Subscriptions().Delete(*topic, subname, true) if err != nil { + pulsarErr, ok := err.(cli.Error) + if ok { + // subscription not found, ignore error + if strings.Contains(pulsarErr.Reason, "Subscription not found") { + return nil + } + } log.Warn("failed to clean up subscriptions", zap.String("pulsar web", f.PulsarWebAddress), zap.String("topic", channel), zap.Any("subname", subname), zap.Error(err)) // fallback to original way diff --git a/internal/mq/msgstream/msgstream_util_test.go b/internal/mq/msgstream/msgstream_util_test.go new file mode 100644 index 000000000..feed993c6 --- /dev/null +++ b/internal/mq/msgstream/msgstream_util_test.go @@ -0,0 +1,38 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package msgstream + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPulsarMsgUtil(t *testing.T) { + pmsFactory := NewPmsFactory(&Params.PulsarCfg) + + ctx := context.Background() + msgStream, err := pmsFactory.NewMsgStream(ctx) + assert.Nil(t, err) + defer msgStream.Close() + + // create a topic + msgStream.AsProducer([]string{"test"}) + + UnsubscribeChannels(ctx, pmsFactory, "sub", []string{"test"}) +} -- GitLab