channel.go 1.2 KB
Newer Older
S
stormgbs 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
package events

import (
	"fmt"
	"sync"
)

// Channel provides a sink that can be listened on. The writer and channel
// listener must operate in separate goroutines.
//
// Consumers should listen on Channel.C until Closed is closed.
type Channel struct {
	C chan Event

	closed chan struct{}
	once   sync.Once
}

// NewChannel returns a channel. If buffer is zero, the channel is
// unbuffered.
func NewChannel(buffer int) *Channel {
	return &Channel{
		C:      make(chan Event, buffer),
		closed: make(chan struct{}),
	}
}

// Done returns a channel that will always proceed once the sink is closed.
func (ch *Channel) Done() chan struct{} {
	return ch.closed
}

// Write the event to the channel. Must be called in a separate goroutine from
// the listener.
func (ch *Channel) Write(event Event) error {
	select {
	case ch.C <- event:
		return nil
	case <-ch.closed:
		return ErrSinkClosed
	}
}

// Close the channel sink.
func (ch *Channel) Close() error {
	ch.once.Do(func() {
		close(ch.closed)
	})

	return nil
}

func (ch *Channel) String() string {
	// Serialize a copy of the Channel that doesn't contain the sync.Once,
	// to avoid a data race.
	ch2 := map[string]interface{}{
		"C":      ch.C,
		"closed": ch.closed,
	}
	return fmt.Sprint(ch2)
}