route_manager.go 2.5 KB
Newer Older
O
ob-robot 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
package engine

import (
	"container/list"
	"context"
	"github.com/oceanbase/obagent/errors"
	"net/http"
	"reflect"
	"sync"
)

// PipelineRouteHandler responsible for transferring the pipeline exposeUrl and handler
type PipelineRouteHandler struct {
	Ctx         context.Context
	ExposeUrl   string
	FuncHandler func(http.Handler) http.Handler
}

var PipelineRouteChan = make(chan *PipelineRouteHandler, 10)

// RouteManager responsible for managing the pipeline corresponding to the url
type RouteManager struct {
	routeMap map[string]*list.List
	rwMutex  sync.RWMutex
}

var routeManager *RouteManager
var routeManagerOnce sync.Once

// GetRouteManager get route manager singleton
func GetRouteManager() *RouteManager {

	routeManagerOnce.Do(func() {
		routeManager = &RouteManager{
			routeMap: make(map[string]*list.List, 16),
			rwMutex:  sync.RWMutex{},
		}
	})
	return routeManager
}

// GetPipelineGroup get the pipeline group corresponding to the route
func (r *RouteManager) GetPipelineGroup(route string) (*list.List, error) {
	r.rwMutex.RLock()
	defer r.rwMutex.RUnlock()

	l, exist := r.routeMap[route]
	if !exist {
		return nil, errors.New("route path is not exist")
	}
	copyList := list.New()
	copyList.PushBackList(l)
	return copyList, nil
}

// AddPipelineGroup add data to the route
func (r *RouteManager) AddPipelineGroup(route string, data interface{}) {
	r.rwMutex.Lock()
	defer r.rwMutex.Unlock()

	_, exist := r.routeMap[route]
	if !exist {
		r.routeMap[route] = list.New()
	}
	r.routeMap[route].PushBack(data)
}

// delPipelineFromPipelineGroup delete pipeline instance to the route
func (r *RouteManager) DeletePipelineGroup(route string, data interface{}) error {
	r.rwMutex.Lock()
	defer r.rwMutex.Unlock()

	var element *list.Element
	l, exist := r.routeMap[route]
	if !exist {
		return errors.New("route path is not exist")
	}
	for e := l.Front(); e != nil; e = e.Next() {
		if reflect.DeepEqual(e.Value, data) {
			element = e
			break
		}
	}

	if element != nil {
		l.Remove(element)
	} else {
		return errors.New("pipeline is not exist")
	}
	return nil
}

// RegisterHTTPRoute register http route
func (r *RouteManager) RegisterHTTPRoute(ctx context.Context, exposeURL string, handler http.Handler) {
	r.rwMutex.Lock()
	defer r.rwMutex.Unlock()
	if _, exist := r.routeMap[exposeURL]; !exist {
		handlerFunc := func(h http.Handler) http.Handler {
			return handler
		}
		pipelineRoute := &PipelineRouteHandler{
			Ctx:         ctx,
			ExposeUrl:   exposeURL,
			FuncHandler: handlerFunc,
		}
		PipelineRouteChan <- pipelineRoute
		r.routeMap[exposeURL] = list.New()
	}
}