registry.go 4.3 KB
Newer Older
O
ob-robot 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
package plugins

import (
	"fmt"
	"sync"

	"github.com/pkg/errors"

	"github.com/oceanbase/obagent/config/monagent"
)

var inputManager *InputManager
var inputManagerOnce sync.Once
var processorManager *ProcessorManager
var processorManagerOnce sync.Once
var outputManager *OutputManager
var outputManagerOnce sync.Once
var exporterManager *ExporterManager
var exporterManagerOnce sync.Once

// GetInputManager get input manager singleton
func GetInputManager() *InputManager {
	inputManagerOnce.Do(func() {
		inputManager = &InputManager{
			Registry: make(map[string]func(conf *monagent.PluginConfig) (Source, error)),
		}
	})
	return inputManager
}

// GetProcessorManager get processor manager singleton
func GetProcessorManager() *ProcessorManager {
	processorManagerOnce.Do(func() {
		processorManager = &ProcessorManager{
			Registry: make(map[string]func(conf *monagent.PluginConfig) (Processor, error)),
		}
	})

	return processorManager
}

// GetOutputManager get output manager singleton
func GetOutputManager() *OutputManager {
	outputManagerOnce.Do(func() {
		outputManager = &OutputManager{
			Registry: make(map[string]func(conf *monagent.PluginConfig) (Sink, error)),
		}
	})
	return outputManager
}

// GetExporterManager get exporter manager singleton
func GetExporterManager() *ExporterManager {
	exporterManagerOnce.Do(func() {
		exporterManager = &ExporterManager{
			Registry: make(map[string]func(conf *monagent.PluginConfig) (Sink, error)),
		}
	})

	return exporterManager
}

// InputManager responsible for managing and creating input plugin instances
type InputManager struct {
	Registry map[string]func(conf *monagent.PluginConfig) (Source, error)
}

// ProcessorManager responsible for managing and creating processor plugin instances
type ProcessorManager struct {
	Registry map[string]func(conf *monagent.PluginConfig) (Processor, error)
}

// OutputManager responsible for managing and creating output plugin instances
type OutputManager struct {
	Registry map[string]func(conf *monagent.PluginConfig) (Sink, error)
}

// ExporterManager responsible for managing and creating exporter plugin instances
type ExporterManager struct {
	Registry map[string]func(conf *monagent.PluginConfig) (Sink, error)
}

// Register add the input plugin with the manager
func (m *InputManager) Register(name string, f func(conf *monagent.PluginConfig) (Source, error)) {
	_, exist := m.Registry[name]
	if exist {
		panic(fmt.Sprintf("input plugin %s already registered", name))
	}
	m.Registry[name] = f
}

// Register add the processor plugin with the manager
func (m *ProcessorManager) Register(name string, f func(conf *monagent.PluginConfig) (Processor, error)) {
	_, exist := m.Registry[name]
	if exist {
		panic(fmt.Sprintf("processor plugin %s already registered", name))
	}
	m.Registry[name] = f
}

// Register add the output plugin with the manager
func (m *OutputManager) Register(name string, f func(conf *monagent.PluginConfig) (Sink, error)) {
	_, exist := m.Registry[name]
	if exist {
		panic(fmt.Sprintf("output plugin %s already registered", name))
	}
	m.Registry[name] = f
}

// Register add the exporter plugin with the manager
func (m *ExporterManager) Register(name string, f func(conf *monagent.PluginConfig) (Sink, error)) {
	_, exist := m.Registry[name]
	if exist {
		panic(fmt.Sprintf("exporter plugin %s already registered", name))
	}
	m.Registry[name] = f
}

// GetPlugin get input by name
func (m *InputManager) GetPlugin(name string, conf *monagent.PluginConfig) (Source, error) {
	f, exist := m.Registry[name]
	if !exist {
		return nil, errors.Errorf("input plugin %s not exist", name)
	}
	return f(conf)
}

// GetPlugin get processor by name
func (m *ProcessorManager) GetPlugin(name string, conf *monagent.PluginConfig) (Processor, error) {
	f, exist := m.Registry[name]
	if !exist {
		return nil, errors.Errorf("processor plugin %s not exist", name)
	}
	return f(conf)
}

// GetPlugin get output by name
func (m *OutputManager) GetPlugin(name string, conf *monagent.PluginConfig) (Sink, error) {
	f, exist := m.Registry[name]
	if !exist {
		return nil, errors.Errorf("output plugin %s not exist", name)
	}
	return f(conf)
}

// GetPlugin get exporter by name
func (m *ExporterManager) GetPlugin(name string, conf *monagent.PluginConfig) (Sink, error) {
	f, exist := m.Registry[name]
	if !exist {
		return nil, errors.Errorf("exporter plugin %s not exist", name)
	}
	return f(conf)
}