From a3c176045df87d5a49c6f92cd40cd1d0fa71eb59 Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Sun, 13 Aug 2023 16:59:30 +0800 Subject: [PATCH] Fix kafka producer init many times (#26314) Signed-off-by: Enwei Jiao --- .../msgstream/mqwrapper/kafka/kafka_client.go | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go index 0ea3e8c95..e12ac1f58 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -6,16 +6,21 @@ import ( "sync" "github.com/confluentinc/confluent-kafka-go/kafka" + "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" ) -var producer *kafka.Producer +var ( + producer atomic.Pointer[kafka.Producer] + sf conc.Singleflight[*kafka.Producer] +) var once sync.Once @@ -85,15 +90,21 @@ func cloneKafkaConfig(config kafka.ConfigMap) *kafka.ConfigMap { } func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) { - config := kc.newProducerConfig() - producer, err := kafka.NewProducer(config) - if err != nil { - log.Error("create sync kafka producer failed", zap.Error(err)) - return nil, err + if p := producer.Load(); p != nil { + return p, nil } - once.Do(func() { + p, err, _ := sf.Do("kafka_producer", func() (*kafka.Producer, error) { + if p := producer.Load(); p != nil { + return p, nil + } + config := kc.newProducerConfig() + p, err := kafka.NewProducer(config) + if err != nil { + log.Error("create sync kafka producer failed", zap.Error(err)) + return nil, err + } go func() { - for e := range producer.Events() { + for e := range p.Events() { switch ev := e.(type) { case kafka.Error: // Generic client instance-level errors, such as broker connection failures, @@ -109,9 +120,13 @@ func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) { } } }() + producer.Store(p) + return p, nil }) - - return producer, nil + if err != nil { + return nil, err + } + return p, nil } func (kc *kafkaClient) newProducerConfig() *kafka.ConfigMap { -- GitLab