From eabdc2b11484e3a53d12671a3900f3384bc9fbf1 Mon Sep 17 00:00:00 2001 From: Xiaofan <83447078+xiaofan-luan@users.noreply.github.com> Date: Fri, 9 Sep 2022 11:26:35 +0800 Subject: [PATCH] Disable pulsar batch and change background flush goroutine to larger interval (#18888) Signed-off-by: xiaofan-luan Signed-off-by: xiaofan-luan --- internal/mq/msgstream/mq_kafka_msgstream_test.go | 8 ++++---- internal/mq/msgstream/mq_msgstream.go | 4 ++-- internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go | 5 +++++ 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/internal/mq/msgstream/mq_kafka_msgstream_test.go b/internal/mq/msgstream/mq_kafka_msgstream_test.go index 51849ed7f..295de387d 100644 --- a/internal/mq/msgstream/mq_kafka_msgstream_test.go +++ b/internal/mq/msgstream/mq_kafka_msgstream_test.go @@ -346,10 +346,12 @@ func TestStream_KafkaTtMsgStream_2(t *testing.T) { ctx := context.Background() inputStream1 := getKafkaInputStream(ctx, kafkaAddress, p1Channels) + defer inputStream1.Close() msgPacks1 := createRandMsgPacks(3, 10, 10) assert.Nil(t, sendMsgPacks(inputStream1, msgPacks1)) inputStream2 := getKafkaInputStream(ctx, kafkaAddress, p2Channels) + defer inputStream2.Close() msgPacks2 := createRandMsgPacks(5, 10, 10) assert.Nil(t, sendMsgPacks(inputStream2, msgPacks2)) @@ -365,17 +367,17 @@ func TestStream_KafkaTtMsgStream_2(t *testing.T) { } else { outputStream = getKafkaTtOutputStreamAndSeek(ctx, kafkaAddress, rcvMsgPacks[msgCount-1].EndPositions) } + defer outputStream.Close() msgPack := consumer(ctx, outputStream) rcvMsgPacks = append(rcvMsgPacks, msgPack) if len(msgPack.Msgs) > 0 { for _, msg := range msgPack.Msgs { - log.Println("msg type: ", msg.Type(), ", msg value: ", msg) + log.Println("TestStream_KafkaTtMsgStream_2 msg type: ", msg.Type(), ", msg value: ", msg) assert.Greater(t, msg.BeginTs(), msgPack.BeginTs) assert.LessOrEqual(t, msg.BeginTs(), msgPack.EndTs) } log.Println("================") } - outputStream.Close() return len(rcvMsgPacks[msgCount].Msgs) } @@ -387,8 +389,6 @@ func TestStream_KafkaTtMsgStream_2(t *testing.T) { cnt2 := (len(msgPacks2)/2 - 1) * len(msgPacks2[0].Msgs) assert.Equal(t, (cnt1 + cnt2), msgCount) - inputStream1.Close() - inputStream2.Close() } func TestStream_KafkaTtMsgStream_DataNodeTimetickMsgstream(t *testing.T) { diff --git a/internal/mq/msgstream/mq_msgstream.go b/internal/mq/msgstream/mq_msgstream.go index 845eae58d..a1ef8856e 100644 --- a/internal/mq/msgstream/mq_msgstream.go +++ b/internal/mq/msgstream/mq_msgstream.go @@ -550,7 +550,7 @@ func (ms *mqMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { return err } - log.Info("MsgStream seek begin", zap.String("channel", mp.ChannelName), zap.Any("MessageID", messageID)) + log.Info("MsgStream seek begin", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID)) err = consumer.Seek(messageID, false) if err != nil { log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err)) @@ -880,7 +880,7 @@ func (ms *MqTtMsgStream) Seek(msgPositions []*internalpb.MsgPosition) error { if err != nil { return err } - log.Info("MsgStream begin to seek start msg: ", zap.String("channel", mp.ChannelName), zap.Any("MessageID", seekMsgID)) + log.Info("MsgStream begin to seek start msg: ", zap.String("channel", mp.ChannelName), zap.Any("MessageID", mp.MsgID)) err = consumer.Seek(seekMsgID, true) if err != nil { log.Warn("Failed to seek", zap.String("channel", mp.ChannelName), zap.Error(err)) diff --git a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go index 39a618681..b4aeb7b7b 100644 --- a/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go +++ b/internal/mq/msgstream/mqwrapper/pulsar/pulsar_client.go @@ -20,6 +20,7 @@ import ( "errors" "strings" "sync" + "time" "github.com/apache/pulsar-client-go/pulsar" "github.com/milvus-io/milvus/internal/log" @@ -57,6 +58,10 @@ func (pc *pulsarClient) CreateProducer(options mqwrapper.ProducerOptions) (mqwra opts.CompressionType = pulsar.ZSTD opts.CompressionLevel = pulsar.Faster } + // disable automatic batching + opts.DisableBatching = true + // change the batching max publish delay higher to avoid extra cpu consumption + opts.BatchingMaxPublishDelay = 1 * time.Minute pp, err := pc.client.CreateProducer(opts) if err != nil { -- GitLab