diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go index db85cb7c3eef44d069803b07e544abf45ee4ce00..11877fa0b463600a2f5185fab281332654d916c1 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go @@ -48,7 +48,7 @@ func TestKafkaConsumer_SeekExclusive(t *testing.T) { assert.Equal(t, "333", msg.Properties()[common.TraceIDKey]) assert.Equal(t, int64(2), msg.ID().(*kafkaID).messageID) assert.Equal(t, topic, msg.Topic()) - assert.True(t, len(msg.Properties()) == 0) + assert.True(t, len(msg.Properties()) == 1) } func TestKafkaConsumer_SeekInclusive(t *testing.T) { @@ -74,7 +74,7 @@ func TestKafkaConsumer_SeekInclusive(t *testing.T) { assert.Equal(t, "222", msg.Properties()[common.TraceIDKey]) assert.Equal(t, int64(1), msg.ID().(*kafkaID).messageID) assert.Equal(t, topic, msg.Topic()) - assert.True(t, len(msg.Properties()) == 0) + assert.True(t, len(msg.Properties()) == 1) } func TestKafkaConsumer_GetSeek(t *testing.T) { diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_message.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_message.go index 8c0af9c06ec988efbcdbb12e7dfa3567417316ae..a4a498f2370b19cf508f6d8bca02a1cb213221f6 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_message.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_message.go @@ -2,7 +2,6 @@ package kafka import ( "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" ) @@ -15,15 +14,9 @@ func (km *kafkaMessage) Topic() string { } func (km *kafkaMessage) Properties() map[string]string { - if len(km.msg.Headers) == 0 { - return nil - } - var properties map[string]string - for i := 0; i < len(km.msg.Headers); i++ { - if _, ok := properties[km.msg.Headers[i].Key]; ok { - log.Info("Repeated key in kafka message headers") - } - properties[km.msg.Headers[i].Key] = string(km.msg.Headers[i].Value) + properties := make(map[string]string) + for _, header := range km.msg.Headers { + properties[header.Key] = string(header.Value) } return properties } diff --git a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go index 38d19d06d742564836acf3b4214fdfcc72abaf7d..29721328d659acd1085b3a18fafebd651ba60c64 100644 --- a/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go +++ b/internal/mq/msgstream/mqwrapper/kafka/kafka_producer.go @@ -26,7 +26,7 @@ func (kp *kafkaProducer) Topic() string { } func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) { - var headers []kafka.Header + headers := make([]kafka.Header, 0, len(message.Properties)) for key, value := range message.Properties { header := kafka.Header{Key: key, Value: []byte(value)} headers = append(headers, header)