kafka_consumer.go 7.8 KB
Newer Older
J
jaime 已提交
1 2 3 4 5 6
package kafka

import (
	"sync"
	"time"

7
	"github.com/cockroachdb/errors"
J
jaime 已提交
8 9
	"github.com/confluentinc/confluent-kafka-go/kafka"
	"go.uber.org/zap"
10 11 12 13

	"github.com/milvus-io/milvus/pkg/log"
	"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
	"github.com/milvus-io/milvus/pkg/util/merr"
J
jaime 已提交
14 15 16
)

type Consumer struct {
X
Xiaofan 已提交
17 18 19 20 21 22 23 24 25 26
	c          *kafka.Consumer
	config     *kafka.ConfigMap
	msgChannel chan mqwrapper.Message
	hasAssign  bool
	skipMsg    bool
	topic      string
	groupID    string
	chanOnce   sync.Once
	closeOnce  sync.Once
	closeCh    chan struct{}
27
	wg         sync.WaitGroup
J
jaime 已提交
28 29
}

X
Xiaofan 已提交
30
const timeout = 3000
J
jaime 已提交
31

X
Xiaofan 已提交
32 33 34
func newKafkaConsumer(config *kafka.ConfigMap, topic string, groupID string, position mqwrapper.SubscriptionInitialPosition) (*Consumer, error) {
	msgChannel := make(chan mqwrapper.Message, 256)
	kc := &Consumer{
J
jaime 已提交
35 36 37 38
		config:     config,
		msgChannel: msgChannel,
		topic:      topic,
		groupID:    groupID,
X
Xiaofan 已提交
39 40 41 42 43 44
		closeCh:    make(chan struct{}),
	}

	err := kc.createKafkaConsumer()
	if err != nil {
		return nil, err
J
jaime 已提交
45 46
	}

X
Xiaofan 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
	// if it's unknown, we leave the assign to seek
	if position != mqwrapper.SubscriptionPositionUnknown {
		var offset kafka.Offset
		if position == mqwrapper.SubscriptionPositionEarliest {
			offset, err = kafka.NewOffset("earliest")
			if err != nil {
				return nil, err
			}
		} else {
			latestMsgID, err := kc.GetLatestMsgID()

			if err != nil {
				switch v := err.(type) {
				case kafka.Error:
					if v.Code() == kafka.ErrUnknownTopic || v.Code() == kafka.ErrUnknownPartition || v.Code() == kafka.ErrUnknownTopicOrPart {
						log.Warn("get latest msg ID failed, topic or partition does not exists!",
							zap.String("topic", kc.topic),
							zap.String("err msg", v.String()))
						offset, err = kafka.NewOffset("earliest")
						if err != nil {
							return nil, err
						}
					}
				default:
					log.Error("kafka get latest msg ID failed", zap.String("topic", kc.topic), zap.Error(err))
					return nil, err
				}
			} else {
				offset = kafka.Offset(latestMsgID.(*kafkaID).messageID)
				kc.skipMsg = true
			}
		}

		start := time.Now()
		topicPartition := []kafka.TopicPartition{{Topic: &topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}}
		err = kc.c.Assign(topicPartition)
		if err != nil {
			log.Error("kafka consumer assign failed ", zap.String("topic name", topic), zap.Any("Msg position", position), zap.Error(err))
			return nil, err
		}
		cost := time.Since(start).Milliseconds()
		if cost > 200 {
			log.Warn("kafka consumer assign take too long!", zap.String("topic name", topic), zap.Any("Msg position", position), zap.Int64("time cost(ms)", cost))
		}

		kc.hasAssign = true
	}

	return kc, nil
J
jaime 已提交
96 97 98 99 100 101
}

func (kc *Consumer) createKafkaConsumer() error {
	var err error
	kc.c, err = kafka.NewConsumer(kc.config)
	if err != nil {
102
		log.Error("create kafka consumer failed", zap.String("topic", kc.topic), zap.Error(err))
J
jaime 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115 116
		return err
	}
	return nil
}

func (kc *Consumer) Subscription() string {
	return kc.groupID
}

// Chan provides a channel to read consumed message.
// confluent-kafka-go recommend us to use function-based consumer,
// channel-based consumer API had already deprecated, see more details
// https://github.com/confluentinc/confluent-kafka-go.
func (kc *Consumer) Chan() <-chan mqwrapper.Message {
X
Xiaofan 已提交
117 118 119 120
	if !kc.hasAssign {
		log.Error("can not chan with not assigned channel", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID))
		panic("failed to chan a kafka consumer without assign")
	}
J
jaime 已提交
121
	kc.chanOnce.Do(func() {
122
		kc.wg.Add(1)
123
		go func() {
124
			defer kc.wg.Done()
X
Xiaofan 已提交
125 126 127 128 129 130 131 132
			for {
				select {
				case <-kc.closeCh:
					log.Info("close consumer ", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID))
					start := time.Now()
					err := kc.c.Close()
					if err != nil {
						log.Warn("failed to close ", zap.String("topic", kc.topic), zap.Error(err))
133
					}
X
Xiaofan 已提交
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
					cost := time.Since(start).Milliseconds()
					if cost > 200 {
						log.Warn("close consumer costs too long time", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Int64("time(ms)", cost))
					}
					if kc.msgChannel != nil {
						close(kc.msgChannel)
					}
					return
				default:
					e, err := kc.c.ReadMessage(30 * time.Second)
					if err != nil {
						// if we failed to read message in 30 Seconds, print out a warn message since there should always be a tt
						log.Warn("consume msg failed", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(err))
					} else {
						if kc.skipMsg {
							kc.skipMsg = false
							continue
						}
						kc.msgChannel <- &kafkaMessage{msg: e}
153 154 155 156
					}
				}
			}
		}()
J
jaime 已提交
157
	})
158

J
jaime 已提交
159 160 161 162
	return kc.msgChannel
}

func (kc *Consumer) Seek(id mqwrapper.MessageID, inclusive bool) error {
X
Xiaofan 已提交
163 164 165 166
	if kc.hasAssign {
		return errors.New("kafka consumer is already assigned, can not seek again")
	}

167 168 169 170 171
	offset := kafka.Offset(id.(*kafkaID).messageID)
	return kc.internalSeek(offset, inclusive)
}

func (kc *Consumer) internalSeek(offset kafka.Offset, inclusive bool) error {
X
Xiaofan 已提交
172
	log.Info("kafka consumer seek start", zap.String("topic name", kc.topic),
J
jaime 已提交
173 174
		zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive))

175
	start := time.Now()
J
jaime 已提交
176 177
	err := kc.c.Assign([]kafka.TopicPartition{{Topic: &kc.topic, Partition: mqwrapper.DefaultPartitionIdx, Offset: offset}})
	if err != nil {
X
Xiaofan 已提交
178
		log.Warn("kafka consumer assign failed ", zap.String("topic name", kc.topic), zap.Any("Msg offset", offset), zap.Error(err))
J
jaime 已提交
179 180 181
		return err
	}

182
	cost := time.Since(start).Milliseconds()
X
Xiaofan 已提交
183 184
	if cost > 200 {
		log.Warn("kafka consumer assign take too long!", zap.String("topic name", kc.topic),
185 186 187
			zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost))
	}

J
jaime 已提交
188 189 190 191 192 193
	// If seek timeout is not 0 the call twice will return error isStarted RD_KAFKA_RESP_ERR__STATE.
	// if the timeout is 0 it will initiate the seek  but return immediately without any error reporting
	kc.skipMsg = !inclusive
	if err := kc.c.Seek(kafka.TopicPartition{
		Topic:     &kc.topic,
		Partition: mqwrapper.DefaultPartitionIdx,
X
Xiaofan 已提交
194
		Offset:    offset}, timeout); err != nil {
J
jaime 已提交
195 196
		return err
	}
197
	cost = time.Since(start).Milliseconds()
X
Xiaofan 已提交
198
	log.Info("kafka consumer seek finished", zap.String("topic name", kc.topic),
199 200
		zap.Any("Msg offset", offset), zap.Bool("inclusive", inclusive), zap.Int64("time cost(ms)", cost))

X
Xiaofan 已提交
201
	kc.hasAssign = true
J
jaime 已提交
202 203 204 205
	return nil
}

func (kc *Consumer) Ack(message mqwrapper.Message) {
J
jaime 已提交
206 207 208
	// Do nothing
	// Kafka retention mechanism only depends on retention configuration,
	// it does not relate to the commit with consumer's offsets.
J
jaime 已提交
209 210 211
}

func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
X
Xiaofan 已提交
212
	low, high, err := kc.c.QueryWatermarkOffsets(kc.topic, mqwrapper.DefaultPartitionIdx, timeout)
J
jaime 已提交
213 214 215 216 217 218 219 220 221 222
	if err != nil {
		return nil, err
	}

	// Current high value is next offset of the latest message ID, in order to keep
	// semantics consistency with the latest message ID, the high value need to move forward.
	if high > 0 {
		high = high - 1
	}

X
Xiaofan 已提交
223
	log.Info("get latest msg ID ", zap.Any("topic", kc.topic), zap.Int64("oldest offset", low), zap.Int64("latest offset", high))
J
jaime 已提交
224 225 226
	return &kafkaID{messageID: high}, nil
}

S
smellthemoon 已提交
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
func (kc *Consumer) CheckTopicValid(topic string) error {
	latestMsgID, err := kc.GetLatestMsgID()
	log.With(zap.String("topic", kc.topic))
	// check topic is existed
	if err != nil {
		switch v := err.(type) {
		case kafka.Error:
			if v.Code() == kafka.ErrUnknownTopic || v.Code() == kafka.ErrUnknownPartition || v.Code() == kafka.ErrUnknownTopicOrPart {
				return merr.WrapErrTopicNotFound(topic, "topic get latest msg ID failed, topic or partition does not exists")
			}
		default:
			return err
		}
	}

	// check topic is empty
	if !latestMsgID.AtEarliestPosition() {
		return merr.WrapErrTopicNotEmpty(topic, "topic is not empty")
	}
	log.Info("created topic is empty")

	return nil
}

J
jaime 已提交
251 252
func (kc *Consumer) Close() {
	kc.closeOnce.Do(func() {
X
Xiaofan 已提交
253
		close(kc.closeCh)
254
		kc.wg.Wait() // wait worker exist and close the client
J
jaime 已提交
255 256
	})
}