未验证 提交 2b659e25 编写于 作者: A Avi Aryan

divide project into private and oss components #23

上级 e4ececec
......@@ -6,3 +6,4 @@
# macOS
// +build oss
package main
import (
// +build !oss
package main
import (
adaptor "github.com/appbaseio/abc/adaptorx"
func runAbout(args []string) error {
flagset := baseFlagSet("about")
flagset.Usage = usageFor(flagset, "abc about [adaptor]")
if err := flagset.Parse(args); err != nil {
return err
args = flagset.Args()
var adaptors = adaptor.Adaptors()
if len(args) > 0 {
a, _ := adaptor.GetAdaptor(args[0], map[string]interface{}{})
adaptors = map[string]adaptor.Adaptor{args[0]: a}
for name, a := range adaptors {
if d, ok := a.(adaptor.Describable); ok {
fmt.Printf("%s - %s\n", name, d.Description())
if len(args) > 0 {
// We were asked specifically about this
fmt.Printf("\n Sample configuration:\n%s\n", d.SampleConfig())
} else {
fmt.Printf("%s - %s\n", name, "no description available")
return nil
......@@ -13,10 +13,10 @@ import (
......@@ -5,7 +5,7 @@ import (
func runInit(args []string) error {
......@@ -7,8 +7,8 @@ import (
_ "github.com/appbaseio/abc/adaptor/all"
_ "github.com/appbaseio/abc/function/all"
_ "github.com/appbaseio/abc/imports/all"
// +build !oss
package adaptor
import (
// import
// Adaptors export
var Adaptors = adaptorx.Adaptors
// GetAdaptor export
var GetAdaptor = adaptorx.GetAdaptor
// Adaptor export
type Adaptor interface {
// Describable ...
type Describable adaptorx.Describable
// RegisteredAdaptors ...
func RegisteredAdaptors() []string {
return adaptorx.RegisteredAdaptors()
// Error ...
// type Error adaptorx.Error
// // ERROR ...
// const ERROR = adaptorx.ERROR
// // CRITICAL ...
// const CRITICAL = adaptorx.CRITICAL
// +build oss
package adaptor
import (
// ...
// Adaptors export
var Adaptors = adaptor.Adaptors
// GetAdaptor export
var GetAdaptor = adaptor.GetAdaptor
// Adaptor export
type Adaptor interface {
// Describable ...
type Describable adaptor.Describable
// RegisteredAdaptors ...
func RegisteredAdaptors() []string {
return adaptor.RegisteredAdaptors()
// Error ...
// type Error adaptor.Error
// // ERROR ...
// const ERROR = adaptor.ERROR
// // CRITICAL ...
// const CRITICAL = adaptor.CRITICAL
// // Error returns the error as a string
// func (t Error) Error() string {
// return fmt.Sprintf("%s: %s", t.Lvl, t.Err)
// }
// +build !oss
package all
import (
// import
_ "github.com/appbaseio/abc/adaptorx/all"
// +build oss
package all
import (
// import
_ "github.com/appbaseio/abc/adaptor/all"
// +build oss
// Copyright 2014 The Transporter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !oss
// Copyright 2014 The Transporter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package pipeline provides all adaptoremented functionality to move
// data through transporter.
package pipeline
import (
adaptor "github.com/appbaseio/abc/adaptorx"
var (
// ErrResumeTimedOut is returned when the resumeTimeout is reached after attempting
// to check that a sink offset matches the newest offset.
ErrResumeTimedOut = errors.New("resume timeout reached")
// ErrResumeStopped is returned when the underling pipe.Pipe has been stopped while
// a Node is in the process of resuming.
ErrResumeStopped = errors.New("pipe has been stopped, canceling resume")
// OptionFunc is a function that configures a Node.
// It is used in NewNodeWithOptions.
type OptionFunc func(*Node) error
// A Node is the basic building blocks of transporter pipelines.
// Nodes are constructed in a tree, with the first node broadcasting
// data to each of it's children.
type Node struct {
Name string
Type string
path string
depth int
children []*Node
parent *Node
transforms []*Transform
nsFilter *regexp.Regexp
c client.Client
reader client.Reader
writer client.Writer
done chan struct{}
wg sync.WaitGroup
l log.Logger
pipe *pipe.Pipe
clog *commitlog.CommitLog
om offset.Manager
resumeTimeout time.Duration
// Transform defines the struct for including a native function in the pipeline.
type Transform struct {
Name string
Fn function.Function
NsFilter *regexp.Regexp
// NewNodeWithOptions initializes a Node with the required parameters and then applies
// each OptionFunc provided.
func NewNodeWithOptions(name, kind, ns string, options ...OptionFunc) (*Node, error) {
compiledNs, err := regexp.Compile(strings.Trim(ns, "/"))
if err != nil {
return nil, err
n := &Node{
Name: name,
Type: kind,
path: name,
depth: 1,
nsFilter: compiledNs,
pipe: pipe.NewPipe(nil, ""),
children: make([]*Node, 0),
transforms: make([]*Transform, 0),
done: make(chan struct{}),
c: &client.Mock{},
reader: &client.MockReader{},
writer: &client.MockWriter{},
resumeTimeout: 60 * time.Second,
// Run the options on it
for _, option := range options {
if err := option(n); err != nil {
return nil, err
return n, nil
// WithClient sets the client.Client to be used for providing a client.Session to the
// client.Reader/Writer..
func WithClient(a adaptor.Adaptor) OptionFunc {
return func(n *Node) error {
cli, err := a.Client()
n.c = cli
return err
// WithReader sets the client.Reader to be used to source data from.
func WithReader(a adaptor.Adaptor) OptionFunc {
return func(n *Node) error {
r, err := a.Reader()
n.reader = r
return err
// WithWriter sets the client.Writer to be used to send data to.
func WithWriter(a adaptor.Adaptor) OptionFunc {
return func(n *Node) error {
w, err := a.Writer(n.done, &n.wg)
n.writer = w
return err
// WithParent sets the parent node and reconfigures the pipe.
func WithParent(parent *Node) OptionFunc {
return func(n *Node) error {
n.parent = parent
// TODO: remove path param
n.pipe = pipe.NewPipe(parent.pipe, "")
parent.children = append(parent.children, n)
n.path = parent.path + "/" + n.Name
n.depth = parent.depth + 1
return nil
// WithTransforms adds the provided transforms to be applied in the pipeline.
func WithTransforms(t []*Transform) OptionFunc {
return func(n *Node) error {
n.transforms = t
return nil
// WithCommitLog configures a CommitLog for the reader to persist messages.
// func WithCommitLog(dataDir string, maxBytes int) OptionFunc {
func WithCommitLog(options ...commitlog.OptionFunc) OptionFunc {
return func(n *Node) error {
clog, err := commitlog.New(options...)
n.clog = clog
return err
// WithResumeTimeout configures how long to wait before all sink offsets match the
// newest offset.
func WithResumeTimeout(timeout time.Duration) OptionFunc {
return func(n *Node) error {
n.resumeTimeout = timeout
return nil
// WithOffsetManager configures an offset.Manager to track message offsets.
// func WithOffsetManager(name, dataDir string) OptionFunc {
func WithOffsetManager(om offset.Manager) OptionFunc {
return func(n *Node) error {
n.om = om
return nil
func (n *Node) String() string {
var (
s, prefix string
prefixformatter := fmt.Sprintf("%%%ds%%-%ds", n.depth, 18-n.depth)
if n.parent == nil { // root node
prefix = fmt.Sprintf(prefixformatter, " ", "- Source: ")
} else {
prefix = fmt.Sprintf(prefixformatter, " ", "- Sink: ")
s += fmt.Sprintf("%s %-40s %-15s %-30s", prefix, n.Name, n.Type, n.nsFilter.String())
for _, child := range n.children {
s += "\n" + child.String()
return s
// Start starts the nodes children in a go routine, and then runs either Start() or Listen()
// on the node's adaptor. Root nodes (nodes with no parent) will run Start()
// and will emit messages to it's children,
// All descendant nodes run Listen() on the adaptor
func (n *Node) Start() error {
n.l = log.With("name", n.Name).With("type", n.Type).With("path", n.path)
for _, child := range n.children {
child.l = log.With("name", child.Name).With("type", child.Type).With("path", child.path)
go func(node *Node) {
if err := node.Start(); err != nil {
if n.parent == nil {
msgMap := make(map[string]client.MessageSet)
if n.clog != nil {
nsOffsetMap := make(map[string]uint64)
errc := make(chan error, len(n.children))
// TODO: not entirely sure about this logic check...
if n.clog.OldestOffset() != n.clog.NewestOffset() {
n.l.With("newestOffset", n.clog.NewestOffset()).
With("oldestOffset", n.clog.OldestOffset()).
Infoln("existing messages in commitlog, checking writer offsets...")
for _, child := range n.children {
n.l.With("name", child.Name).Infof("offsetMap: %+v", child.om.OffsetMap())
// we subtract 1 from NewestOffset() because we only need to catch up
// to the last entry in the log
if child.om.NewestOffset() < (n.clog.NewestOffset() - 1) {
r, err := n.clog.NewReader(child.om.NewestOffset())
if err != nil {
return err
go func(r io.Reader) {
errc <- child.resume(n.clog.NewestOffset()-1, r)
} else {
errc <- nil
n.l.Infoln("waiting for all children to resume...")
err := <-errc
for i := 1; i < cap(errc); i++ {
n.l.Infoln("done waiting for all children to resume")
if err != nil {
return err
n.l.Infoln("done checking for resume errors")
// compute a map of the oldest offset for every namespace from each child
for _, child := range n.children {
for ns, offset := range child.om.OffsetMap() {
if currentOffset, ok := nsOffsetMap[ns]; !ok || currentOffset > offset {
nsOffsetMap[ns] = offset
for _, offset := range nsOffsetMap {
r, err := n.clog.NewReader(int64(offset))
if err != nil {
return err
d, err := readResumeData(r)
if err != nil {
return err
mode := d.msg.Mode
// we overwrite the mode to Complete unless the offset
// was the last message processed
if mode == commitlog.Copy && int64(offset) != (n.clog.NewestOffset()-1) {
mode = commitlog.Complete
msgMap[d.ns] = client.MessageSet{
Msg: d.msg.Msg,
Timestamp: d.msg.Timestamp,
Mode: mode,
n.l.Infof("starting with metadata %+v", msgMap)
return n.start(msgMap)
return n.listen()
func (n *Node) resume(newestOffset int64, r io.Reader) error {
n.l.Infoln("adaptor Resuming...")
defer func() {
n.l.Infoln("adaptor Resume complete")
percentComplete := 0.0
for {
d, err := readResumeData(r)
if err != nil {
return err
p := (float64(d.offset) / float64(newestOffset)) * 100.0
if (p - percentComplete) >= 1.0 {
percentComplete = p
n.l.With("offset", d.offset).
With("log_offset", newestOffset).
With("percent_complete", percentComplete).
Infoln("still resuming...")
if n.pipe.Stopped {
return ErrResumeStopped
n.pipe.In <- pipe.TrackedMessage{
Msg: d.msg.Msg,
Off: offset.Offset{
Namespace: d.msg.Msg.Namespace(),
LogOffset: d.offset,
Timestamp: time.Now().Unix(),
if d.offset == uint64(newestOffset) {
n.l.Infoln("offset of message sent down pipe matches newestOffset")
n.l.With("timeout", n.resumeTimeout).Infoln("all messages sent down pipeline, waiting for offsets to match...")
timeout := time.After(n.resumeTimeout)
for {
select {
case <-timeout:
return ErrResumeTimedOut
n.l.Infoln("checking if offsets match")
sinkOffset := n.om.NewestOffset()
if sinkOffset == newestOffset {
n.l.Infoln("offsets match!!!")
return nil
n.l.With("sink_offset", sinkOffset).With("newestOffset", newestOffset).Infoln("offsets did not match, checking again in 5 seconds...")
time.Sleep(5 * time.Second)
// Start the adaptor as a source
func (n *Node) start(nsMap map[string]client.MessageSet) error {
n.l.Infoln("adaptor Starting...")
s, err := n.c.Connect()
if err != nil {
return err
if closer, ok := s.(client.Closer); ok {
defer func() {
n.l.Infoln("closing session...")
n.l.Infoln("session closed...")
readFunc := n.reader.Read(nsMap, func(check string) bool { return n.nsFilter.MatchString(check) })
msgChan, err := readFunc(s, n.done)
if err != nil {
return err
var logOffset int64
for msg := range msgChan {
if n.clog != nil {
d, _ := mejson.Marshal(msg.Msg.Data().AsMap())
b, _ := json.Marshal(d)
o, err := n.clog.Append(
Key: []byte(msg.Msg.Namespace()),
Mode: msg.Mode,
Op: msg.Msg.OP(),
Timestamp: uint64(msg.Timestamp),
Value: b,
if err != nil {
return err
logOffset = o
n.l.With("offset", logOffset).Debugln("attaching offset to message")
n.pipe.Send(msg.Msg, offset.Offset{
Namespace: msg.Msg.Namespace(),
LogOffset: uint64(logOffset),
Timestamp: time.Now().Unix(),
n.l.Infoln("adaptor Start finished...")
return nil
func (n *Node) listen() (err error) {
n.l.Infoln("adaptor Listening...")
defer n.l.Infoln("adaptor Listen closed...")
return n.pipe.Listen(n.write)
func (n *Node) write(msg message.Msg, off offset.Offset) (message.Msg, error) {
var writeErr error
if !n.nsFilter.MatchString(msg.Namespace()) {
n.l.With("ns", msg.Namespace()).Debugln("message skipped by namespace filter")
if n.om != nil {
n.om.CommitOffset(off, false)
return msg, nil
msg, writeErr = n.applyTransforms(msg)
if writeErr != nil {
return nil, writeErr
} else if msg == nil {
if n.om != nil {
n.om.CommitOffset(off, false)
return nil, nil
if n.om != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer func() {
if writeErr != nil {
msg = message.WithConfirms(make(chan struct{}), msg)
go n.confirmWrite(ctx, msg.Confirms(), off)
returnMsg, writeErr := client.Write(n.c, n.writer, msg)
return returnMsg, writeErr
func (n *Node) confirmWrite(ctx context.Context, confirmed chan struct{}, off offset.Offset) {
for {
select {
case <-confirmed:
if err := n.om.CommitOffset(off, false); err != nil {
n.l.Errorf("failed to commitoffset, %s", err)
n.l.Debugf("offset %d committed", off.LogOffset)
case <-ctx.Done():
if ctx.Err() == context.Canceled {
n.l.Debugln("offset commit canceled")
} else if ctx.Err() == context.DeadlineExceeded {
n.l.Debugln("time expired waiting for offset commit confirmation")
func (n *Node) applyTransforms(msg message.Msg) (message.Msg, error) {
if msg.OP() != ops.Command {
for _, transform := range n.transforms {
if !transform.NsFilter.MatchString(msg.Namespace()) {
n.l.With("transform", transform.Name).With("ns", msg.Namespace()).Debugln("filtered message")
m, err := transform.Fn.Apply(msg)
if err != nil {
n.l.Errorf("transform function error, %s", err)
return nil, err
} else if m == nil {
n.l.With("transform", transform.Name).Debugln("returned nil message, skipping")
return nil, nil
msg = m
if msg.OP() == ops.Skip {
n.l.With("transform", transform.Name).With("op", msg.OP()).Debugln("skipping message")
return nil, nil
return msg, nil
// Stop this node's adaptor, and sends a stop to each child of this node
func (n *Node) Stop() {
for _, node := range n.children {
func (n *Node) stop() error {
n.l.Infoln("adaptor Stopping...")
if closer, ok := n.writer.(client.Closer); ok {
defer func() {
n.l.Infoln("closing writer...")
n.l.Infoln("writer closed...")
if closer, ok := n.c.(client.Closer); ok {
defer func() {
n.l.Infoln("closing connection...")
n.l.Infoln("connection closed...")
n.l.Infoln("adaptor Stopped")
return nil
// Validate ensures that the node tree conforms to a proper structure.
// Node trees must have at least one source, and one sink.
// dangling transformers are forbidden. Validate only knows about default adaptors
// in the adaptor package, it can't validate any custom adaptors
func (n *Node) Validate() bool {
if n.parent == nil && len(n.children) == 0 { // the root node should have children
return false
for _, child := range n.children {
if !child.Validate() {
return false
return true
// Endpoints recurses down the node tree and accumulates a map associating node name with node type
// this is primarily used with the boot event
func (n *Node) Endpoints() map[string]string {
m := map[string]string{n.Name: n.Type}
for _, child := range n.children {
childMap := child.Endpoints()
for k, v := range childMap {
m[k] = v
return m
// +build oss
package pipeline
import (
// +build !oss
package pipeline
import (
adaptor "github.com/appbaseio/abc/adaptorx"
// A Pipeline is a the end to end description of a transporter data flow.
// including the source, sink, and all the transformers along the way
type Pipeline struct {
source *Node
emitter events.Emitter
metricsTicker *time.Ticker
version string
// Err is the fatal error that was sent from the adaptor
// that caused us to stop this process. If this is nil, then
// the transporter is running
Err error
done chan struct{}
// NewDefaultPipeline returns a new Transporter Pipeline with the given node tree, and
// uses the events.HttpPostEmitter to deliver metrics.
// eg.
// a, err := adaptor.GetAdaptor("mongodb", map[string]interface{}{"uri": "mongo://localhost:27017"})
// if err != nil {
// fmt.Println(err)
// os.Exit(1)
// }
// source := pipeline.NewNodeWithOptions(
// "source", "mongo", "/.*/",
// pipeline.WithClient(a),
// pipeline.WithReader(a),
// )
// f, err := adaptor.GetAdaptor("file", map[string]interface{}{"uri": "stdout://"})
// sink := pipeline.NewNodeWithOptions(
// "out", "file", "/.*/",
// pipeline.WithClient(f),
// pipeline.WithWriter(f),
// pipeline.WithParent(source),
// )
// pipeline, err := transporter.NewDefaultPipeline(source, events.Api{URI: "http://localhost/endpoint"}, 1*time.Second)
// if err != nil {
// fmt.Println(err)
// os.Exit(1)
// }
// pipeline.Run()
func NewDefaultPipeline(source *Node, uri, key, pid, version string, interval time.Duration) (*Pipeline, error) {
return NewPipeline(version, source, events.HTTPPostEmitter(uri, key, pid), interval)
// NewPipeline creates a new Transporter Pipeline using the given tree of nodes, and Event Emitter
// eg.
// a, err := adaptor.GetAdaptor("mongodb", map[string]interface{}{"uri": "mongo://localhost:27017"})
// if err != nil {
// fmt.Println(err)
// os.Exit(1)
// }
// source := pipeline.NewNodeWithOptions(
// "source", "mongo", "/.*/",
// pipeline.WithClient(a),
// pipeline.WithReader(a),
// )
// f, err := adaptor.GetAdaptor("file", map[string]interface{}{"uri": "stdout://"})
// sink := pipeline.NewNodeWithOptions(
// "out", "file", "/.*/",
// pipeline.WithClient(f),
// pipeline.WithWriter(f),
// pipeline.WithParent(source),
// )
// pipeline, err := transporter.NewPipeline("version", source, events.NewNoopEmitter(), 1*time.Second)
// if err != nil {
// fmt.Println(err)
// os.Exit(1)
// }
// pipeline.Run()
func NewPipeline(version string, source *Node, emit events.EmitFunc, interval time.Duration) (*Pipeline, error) {
pipeline := &Pipeline{
source: source,
metricsTicker: time.NewTicker(interval),
done: make(chan struct{}),
// init the emitter with the right chan
pipeline.emitter = events.NewEmitter(source.pipe.Event, emit)
// start the emitters
go pipeline.startErrorListener(source.pipe.Err)
go pipeline.startMetricsGatherer()
return pipeline, nil
func (pipeline *Pipeline) String() string {
return pipeline.source.String()
// Stop sends a stop signal to the emitter and all the nodes, whether they are running or not.
// the node's database adaptors are expected to clean up after themselves, and stop will block until
// all nodes have stopped successfully
func (pipeline *Pipeline) Stop() {
endpoints := pipeline.source.Endpoints()
// pipeline has stopped, emit one last round of metrics and send the exit event
pipeline.source.pipe.Event <- events.NewExitEvent(time.Now().UnixNano(), pipeline.version, endpoints)
// Run the pipeline
func (pipeline *Pipeline) Run() error {
endpoints := pipeline.source.Endpoints()
// send a boot event
pipeline.source.pipe.Event <- events.NewBootEvent(time.Now().UnixNano(), pipeline.version, endpoints)
// start the source
err := pipeline.source.Start()
if err != nil && pipeline.Err == nil {
pipeline.Err = err // only set it if it hasn't been set already.
return pipeline.Err
// start error listener consumes all the events on the pipe's Err channel, and stops the pipeline
// when it receives one
func (pipeline *Pipeline) startErrorListener(cherr chan error) {
for {
select {
case err, ok := <-cherr:
if !ok {
if aerr, ok := err.(adaptor.Error); ok {
pipeline.source.pipe.Event <- events.NewErrorEvent(time.Now().UnixNano(), aerr.Path, aerr.Record, aerr.Error())
if aerr.Lvl == adaptor.ERROR || aerr.Lvl == adaptor.CRITICAL {
log.With("path", aerr.Path).Errorln(aerr)
} else {
if pipeline.Err == nil {
pipeline.Err = err
case <-pipeline.done:
func (pipeline *Pipeline) startMetricsGatherer() {
for {
select {
case <-pipeline.metricsTicker.C:
case <-pipeline.done:
// emit the metrics
func (pipeline *Pipeline) emitMetrics() {
pipeline.apply(func(node *Node) {
pipeline.source.pipe.Event <- events.NewMetricsEvent(time.Now().UnixNano(), node.path, node.pipe.MessageCount)
// apply maps a function f across all nodes of a pipeline
func (pipeline *Pipeline) apply(f func(*Node)) {
head := pipeline.source
nodes := []*Node{head}
for len(nodes) > 0 {
head, nodes = nodes[0], nodes[1:]
nodes = append(nodes, head.children...)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册