consumers.go 4.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
package common

import (
	"fmt"

	sarama "gopkg.in/Shopify/sarama.v1"
)

// A PartitionOffset is the offset into a partition of the input log.
type PartitionOffset struct {
	// The ID of the partition.
	Partition int32
	// The offset into the partition.
	Offset int64
}

// A PartitionStorer has the storage APIs needed by the consumer.
type PartitionStorer interface {
	// PartitionOffsets returns the offsets the consumer has reached for each partition.
	PartitionOffsets(topic string) ([]PartitionOffset, error)
	// SetPartitionOffset records where the consumer has reached for a partition.
	SetPartitionOffset(topic string, partition int32, offset int64) error
}

// A ContinualConsumer continually consumes logs even across restarts. It requires a PartitionStorer to
// remember the offset it reached.
type ContinualConsumer struct {
	// The kafkaesque topic to consume events from.
	// This is the name used in kafka to identify the stream to consume events from.
	Topic string
	// A kafkaesque stream consumer providing the APIs for talking to the event source.
	// The interface is taken from a client library for Apache Kafka.
	// But any equivalent event streaming protocol could be made to implement the same interface.
	Consumer sarama.Consumer
	// A thing which can load and save partition offsets for a topic.
	PartitionStore PartitionStorer
	// ProcessMessage is a function which will be called for each message in the log. Return an error to
	// stop processing messages. See ErrShutdown for specific control signals.
	ProcessMessage func(msg *sarama.ConsumerMessage) error
	// ShutdownCallback is called when ProcessMessage returns ErrShutdown, after the partition has been saved.
	// It is optional.
	ShutdownCallback func()
}

// ErrShutdown can be returned from ContinualConsumer.ProcessMessage to stop the ContinualConsumer.
var ErrShutdown = fmt.Errorf("shutdown")

// Start starts the consumer consuming.
// Starts up a goroutine for each partition in the kafka stream.
// Returns nil once all the goroutines are started.
// Returns an error if it can't start consuming for any of the partitions.
func (c *ContinualConsumer) Start() error {
	offsets := map[int32]int64{}

	partitions, err := c.Consumer.Partitions(c.Topic)
	if err != nil {
		return err
	}
	for _, partition := range partitions {
		// Default all the offsets to the beginning of the stream.
		offsets[partition] = sarama.OffsetOldest
	}

	storedOffsets, err := c.PartitionStore.PartitionOffsets(c.Topic)
	if err != nil {
		return err
	}
	for _, offset := range storedOffsets {
		// We've already processed events from this partition so advance the offset to where we got to.
84 85 86
		// ConsumePartition will start streaming from the message with the given offset (inclusive),
		// so increment 1 to avoid getting the same message a second time.
		offsets[offset.Partition] = 1 + offset.Offset
87 88 89 90 91 92 93
	}

	var partitionConsumers []sarama.PartitionConsumer
	for partition, offset := range offsets {
		pc, err := c.Consumer.ConsumePartition(c.Topic, partition, offset)
		if err != nil {
			for _, p := range partitionConsumers {
E
Erik Johnston 已提交
94
				p.Close() // nolint: errcheck
95 96 97 98 99 100 101 102 103 104 105 106 107 108
			}
			return err
		}
		partitionConsumers = append(partitionConsumers, pc)
	}
	for _, pc := range partitionConsumers {
		go c.consumePartition(pc)
	}

	return nil
}

// consumePartition consumes the room events for a single partition of the kafkaesque stream.
func (c *ContinualConsumer) consumePartition(pc sarama.PartitionConsumer) {
E
Erik Johnston 已提交
109
	defer pc.Close() // nolint: errcheck
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
	for message := range pc.Messages() {
		msgErr := c.ProcessMessage(message)
		// Advance our position in the stream so that we will start at the right position after a restart.
		if err := c.PartitionStore.SetPartitionOffset(c.Topic, message.Partition, message.Offset); err != nil {
			panic(fmt.Errorf("the ContinualConsumer failed to SetPartitionOffset: %s", err))
		}
		// Shutdown if we were told to do so.
		if msgErr == ErrShutdown {
			if c.ShutdownCallback != nil {
				c.ShutdownCallback()
			}
			return
		}
	}
}