未验证 提交 b5f178f2 编写于 作者: S smellthemoon 提交者: GitHub

Fix kafka panic (#20860)

Signed-off-by: Nlixinguo <xinguo.li@zilliz.com>
Signed-off-by: Nlixinguo <xinguo.li@zilliz.com>
Co-authored-by: Nlixinguo <xinguo.li@zilliz.com>
上级 6824188b
......@@ -48,7 +48,7 @@ func TestKafkaConsumer_SeekExclusive(t *testing.T) {
assert.Equal(t, "333", msg.Properties()[common.TraceIDKey])
assert.Equal(t, int64(2), msg.ID().(*kafkaID).messageID)
assert.Equal(t, topic, msg.Topic())
assert.True(t, len(msg.Properties()) == 0)
assert.True(t, len(msg.Properties()) == 1)
}
func TestKafkaConsumer_SeekInclusive(t *testing.T) {
......@@ -74,7 +74,7 @@ func TestKafkaConsumer_SeekInclusive(t *testing.T) {
assert.Equal(t, "222", msg.Properties()[common.TraceIDKey])
assert.Equal(t, int64(1), msg.ID().(*kafkaID).messageID)
assert.Equal(t, topic, msg.Topic())
assert.True(t, len(msg.Properties()) == 0)
assert.True(t, len(msg.Properties()) == 1)
}
func TestKafkaConsumer_GetSeek(t *testing.T) {
......
......@@ -2,7 +2,6 @@ package kafka
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
)
......@@ -15,15 +14,9 @@ func (km *kafkaMessage) Topic() string {
}
func (km *kafkaMessage) Properties() map[string]string {
if len(km.msg.Headers) == 0 {
return nil
}
var properties map[string]string
for i := 0; i < len(km.msg.Headers); i++ {
if _, ok := properties[km.msg.Headers[i].Key]; ok {
log.Info("Repeated key in kafka message headers")
}
properties[km.msg.Headers[i].Key] = string(km.msg.Headers[i].Value)
properties := make(map[string]string)
for _, header := range km.msg.Headers {
properties[header.Key] = string(header.Value)
}
return properties
}
......
......@@ -26,7 +26,7 @@ func (kp *kafkaProducer) Topic() string {
}
func (kp *kafkaProducer) Send(ctx context.Context, message *mqwrapper.ProducerMessage) (mqwrapper.MessageID, error) {
var headers []kafka.Header
headers := make([]kafka.Header, 0, len(message.Properties))
for key, value := range message.Properties {
header := kafka.Header{Key: key, Value: []byte(value)}
headers = append(headers, header)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册