pipeline_module_callback.go 5.8 KB
Newer Older
O
ob-robot 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
package engine

import (
	"context"
	"sync"
	"time"

	"github.com/pkg/errors"
	log "github.com/sirupsen/logrus"

	"github.com/oceanbase/obagent/config/monagent"
	"github.com/oceanbase/obagent/lib/retry"
	agentlog "github.com/oceanbase/obagent/log"
)

var pipelineConfigRetry = map[string]*retry.Retry{}
var pipelineConfigMutex = sync.Mutex{}

func addPipeline(ctx context.Context, pipelineModule *monagent.PipelineModule) error {
	logger := log.WithContext(context.WithValue(ctx, agentlog.StartTimeKey, time.Now())).WithField("module", pipelineModule.Name)
	configManager := GetConfigManager()
	event := &configEvent{
		ctx:            ctx,
		eventType:      addConfigEvent,
		pipelineModule: pipelineModule,
		callbackChan:   make(chan *configCallbackEvent, 1),
	}

	logger.Infof("add pipeline module config event")
	configManager.eventChan <- event
	callbackEvent := <-event.callbackChan

	var err error

	logger.Infof("add pipeline module config result status: %s, description: %s", callbackEvent.execStatus, callbackEvent.description)
	switch callbackEvent.execStatus {
	case configEventExecSucceed:
		err = nil
	case configEventExecFailed:
		err = errors.Errorf("add pipeline module config %s failed, description %s", pipelineModule.Name, callbackEvent.description)
	}
	return err
}

func deletePipeline(ctx context.Context, pipelineModule *monagent.PipelineModule) error {
	logger := log.WithContext(context.WithValue(ctx, agentlog.StartTimeKey, time.Now())).WithField("module", pipelineModule.Name)
	configManager := GetConfigManager()
	event := &configEvent{
		ctx:            ctx,
		eventType:      deleteConfigEvent,
		pipelineModule: pipelineModule,
		callbackChan:   make(chan *configCallbackEvent, 1),
	}

	logger.Infof("delete pipeline module config event")
	configManager.eventChan <- event
	callbackEvent := <-event.callbackChan

	var err error
	logger.Infof("delete pipeline module result status: %s, description: %s", callbackEvent.execStatus, callbackEvent.description)
	switch callbackEvent.execStatus {
	case configEventExecSucceed:
		err = nil
	case configEventExecFailed:
		err = errors.Errorf("delete pipeline module failed description %s", callbackEvent.description)
	}
	return err
}

func updatePipeline(ctx context.Context, pipelineModule *monagent.PipelineModule) error {
	logger := log.WithContext(context.WithValue(ctx, agentlog.StartTimeKey, time.Now())).WithField("module", pipelineModule.Name)
	configManager := GetConfigManager()
	event := &configEvent{
		ctx:            ctx,
		eventType:      updateConfigEvent,
		pipelineModule: pipelineModule,
		callbackChan:   make(chan *configCallbackEvent, 1),
	}

	logger.Infof("update pipeline module config event")
	configManager.eventChan <- event
	callbackEvent := <-event.callbackChan

	var err error
	logger.Infof("update pipeline module config result status: %s, description: %s", callbackEvent.execStatus, callbackEvent.description)
	switch callbackEvent.execStatus {
	case configEventExecSucceed:
		err = nil
	case configEventExecFailed:
		err = errors.Errorf("update pipeline module config failed description %s", callbackEvent.description)
	}
	return err
}

func updateOrAddPipeline(ctx context.Context, pipelineModule *monagent.PipelineModule) error {
	err := updatePipeline(ctx, pipelineModule)
	if err == nil {
		return nil
	}
	log.WithContext(ctx).WithError(err).Errorf("update pipeline module config failed, module name: %s", pipelineModule.Name)

	err = addPipeline(ctx, pipelineModule)
	if err != nil {
		log.WithContext(ctx).WithError(err).Errorf("add pipeline module config failed, module name: %s", pipelineModule.Name)
	}
	return err
}

func InitPipelineModuleCallback(ctx context.Context, pipelineModule *monagent.PipelineModule) error {
	if !pipelineModule.Status.Validate() {
		log.WithContext(ctx).Warnf("pipeline module config %s is invalid, just skip", pipelineModule.Name)
		return nil
	}
	if pipelineModule.Status == monagent.INACTIVE {
		log.WithContext(ctx).Warnf("pipeline module config %s is inactive or invalid, just skip", pipelineModule.Name)
	} else {
		notifyPipelineConfigRetryIfFailed(ctx, pipelineModule.Name, func(ctx context.Context) error {
			err := addPipeline(ctx, pipelineModule)
			if err != nil {
				return err
			}
			log.WithContext(ctx).Infof("add pipeline module config %s successfully.", pipelineModule.Name)
			return nil
		})
	}
	return nil
}

func UpdatePipelineModuleCallback(ctx context.Context, pipelineModule *monagent.PipelineModule) error {
	if !pipelineModule.Status.Validate() {
		log.WithContext(ctx).Warnf("pipeline module config %s is invalid, just skip", pipelineModule.Name)
		return nil
	}
	if pipelineModule.Status == monagent.INACTIVE {
		notifyPipelineConfigRetryIfFailed(ctx, pipelineModule.Name, func(ctx context.Context) error {
			err := deletePipeline(ctx, pipelineModule)
			if err != nil {
				return err
			}
			log.WithContext(ctx).Infof("delete pipeline module config %s successfully.", pipelineModule.Name)
			return nil
		})
	} else {
		notifyPipelineConfigRetryIfFailed(ctx, pipelineModule.Name, func(ctx context.Context) error {
			err := updateOrAddPipeline(ctx, pipelineModule)
			if err != nil {
				return err
			}
			log.WithContext(ctx).Infof("update or add pipeline module config %s successfully.", pipelineModule.Name)
			return nil
		})
	}
	return nil
}

func notifyPipelineConfigRetryIfFailed(ctx context.Context, pipeline string, retryEvent func(ctx context.Context) error) {
	pipelineConfigMutex.Lock()
	defer pipelineConfigMutex.Unlock()
	if _, ex := pipelineConfigRetry[pipeline]; !ex {
		pipelineConfigRetry[pipeline] = retry.NewRetry()
	}
	pipelineConfigRetry[pipeline].Do(ctx,
		retry.RetryConfig{
			Name:          pipeline,
			RetryEvent:    retryEvent,
			RetryTimeout:  time.Second * 30,
			MaxRetryTimes: 0,
			RetryDuration: &retry.DefaultNextDuration{
				BaseDuration: time.Second * 30,
				MaxDuration:  time.Minute * 10,
			}})
}