未验证 提交 1e61112b 编写于 作者: C congqixia 提交者: GitHub

Add retry logic in pulsar consumer unsubscribe (#15284)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 524d4126
......@@ -143,8 +143,6 @@ func TestDataNode(t *testing.T) {
err = node1.Start()
assert.Nil(t, err)
defer func() {
// TODO: wait for reconnecting to Pulsar, delete sleep after Seek wouldn't lead to disconnect with Pulsar
time.Sleep(200 * time.Millisecond)
err := node1.Stop()
assert.Nil(t, err)
}()
......@@ -338,8 +336,6 @@ func TestDataNode(t *testing.T) {
if i <= 2 {
err = node.flowgraphManager.addAndStart(node, &datapb.VchannelInfo{CollectionID: 1, ChannelName: test.dmChannelName})
assert.Nil(t, err)
// TODO: wait for reconnecting to Pulsar, delete sleep after Seek wouldn't lead to disconnect with Pulsar
time.Sleep(200 * time.Millisecond)
vchanNameCh <- test.dmChannelName
}
}
......@@ -413,8 +409,6 @@ func TestWatchChannel(t *testing.T) {
exist := node.flowgraphManager.exist(ch)
assert.True(t, exist)
// TODO: wait for reconnecting to Pulsar, delete sleep after Seek wouldn't lead to disconnect with Pulsar
time.Sleep(200 * time.Millisecond)
err = kv.RemoveWithPrefix(fmt.Sprintf("%s/%d", Params.DataNodeCfg.ChannelWatchSubPath, node.NodeID))
assert.Nil(t, err)
//TODO there is not way to sync Release done, use sleep for now
......
......@@ -19,7 +19,6 @@ package datanode
import (
"context"
"testing"
"time"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
......@@ -46,8 +45,6 @@ func TestFlowGraphManager(t *testing.T) {
fm := newFlowgraphManager()
defer func() {
// TODO: wait for reconnecting to Pulsar, delete sleep after Seek wouldn't lead to disconnect with Pulsar
time.Sleep(200 * time.Millisecond)
fm.dropAll()
}()
t.Run("Test addAndStart", func(t *testing.T) {
......@@ -62,8 +59,6 @@ func TestFlowGraphManager(t *testing.T) {
assert.NoError(t, err)
assert.True(t, fm.exist(vchanName))
// TODO: wait for reconnecting to Pulsar, delete sleep after Seek wouldn't lead to disconnect with Pulsar
time.Sleep(200 * time.Millisecond)
fm.dropAll()
})
......@@ -79,8 +74,6 @@ func TestFlowGraphManager(t *testing.T) {
assert.NoError(t, err)
assert.True(t, fm.exist(vchanName))
// TODO: wait for reconnecting to Pulsar, delete sleep after Seek wouldn't lead to disconnect with Pulsar
time.Sleep(200 * time.Millisecond)
fm.release(vchanName)
assert.False(t, fm.exist(vchanName))
......
......@@ -17,11 +17,15 @@
package mqclient
import (
"context"
"sync"
"time"
"unsafe"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
"go.uber.org/zap"
)
// PulsarConsumer consumes from pulsar
......@@ -108,8 +112,12 @@ func (pc *PulsarConsumer) Close() {
pc.closeOnce.Do(func() {
defer pc.c.Close()
// Unsubscribe for the consumer
err := pc.c.Unsubscribe()
err := retry.Do(context.Background(), func() error {
//TODO need to check error retryable
return pc.c.Unsubscribe()
}, retry.MaxSleepTime(50*time.Millisecond), retry.Attempts(3))
if err != nil {
log.Error("failed to unsubscribe", zap.String("subscription", pc.Subscription()), zap.Error(err))
panic(err)
}
close(pc.closeCh)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册