pipeline_module_callback.go 5.8 KB
Newer Older
O
ob-robot 已提交

package engine

import (
	"context"
	"sync"
	"time"

	"github.com/pkg/errors"
	log "github.com/sirupsen/logrus"

	"github.com/oceanbase/obagent/config/monagent"
	"github.com/oceanbase/obagent/lib/retry"
	agentlog "github.com/oceanbase/obagent/log"
)

var pipelineConfigRetry = map[string]*retry.Retry{}
var pipelineConfigMutex = sync.Mutex{}

func addPipeline(ctx context.Context, pipelineModule *monagent.PipelineModule) error {
	logger := log.WithContext(context.WithValue(ctx, agentlog.StartTimeKey, time.Now())).WithField("module", pipelineModule.Name)
	configManager := GetConfigManager()
	event := &configEvent{
		ctx:            ctx,
		eventType:      addConfigEvent,
		pipelineModule: pipelineModule,
		callbackChan:   make(chan *configCallbackEvent, 1),
	}

	logger.Infof("add pipeline module config event")
	configManager.eventChan <- event
	callbackEvent := <-event.callbackChan

	var err error

	logger.Infof("add pipeline module config result status: %s, description: %s", callbackEvent.execStatus, callbackEvent.description)
	switch callbackEvent.execStatus {
	case configEventExecSucceed:
		err = nil
	case configEventExecFailed:
		err = errors.Errorf("add pipeline module config %s failed, description %s", pipelineModule.Name, callbackEvent.description)
	}
	return err
}

func deletePipeline(ctx context.Context, pipelineModule *monagent.PipelineModule) error {
	logger := log.WithContext(context.WithValue(ctx, agentlog.StartTimeKey, time.Now())).WithField("module", pipelineModule.Name)
	configManager := GetConfigManager()
	event := &configEvent{
		ctx:            ctx,
		eventType:      deleteConfigEvent,
		pipelineModule: pipelineModule,
		callbackChan:   make(chan *configCallbackEvent, 1),
	}

	logger.Infof("delete pipeline module config event")
	configManager.eventChan <- event
	callbackEvent := <-event.callbackChan

	var err error
	logger.Infof("delete pipeline module result status: %s, description: %s", callbackEvent.execStatus, callbackEvent.description)
	switch callbackEvent.execStatus {
	case configEventExecSucceed:
		err = nil
	case configEventExecFailed:
		err = errors.Errorf("delete pipeline module failed description %s", callbackEvent.description)
	}
	return err
}

func updatePipeline(ctx context.Context, pipelineModule *monagent.PipelineModule) error {
	logger := log.WithContext(context.WithValue(ctx, agentlog.StartTimeKey, time.Now())).WithField("module", pipelineModule.Name)
	configManager := GetConfigManager()
	event := &configEvent{
		ctx:            ctx,
		eventType:      updateConfigEvent,
		pipelineModule: pipelineModule,
		callbackChan:   make(chan *configCallbackEvent, 1),
	}

	logger.Infof("update pipeline module config event")
	configManager.eventChan <- event
	callbackEvent := <-event.callbackChan

	var err error
	logger.Infof("update pipeline module config result status: %s, description: %s", callbackEvent.execStatus, callbackEvent.description)
	switch callbackEvent.execStatus {
	case configEventExecSucceed:
		err = nil
	case configEventExecFailed:
		err = errors.Errorf("update pipeline module config failed description %s", callbackEvent.description)
	}
	return err
}

func updateOrAddPipeline(ctx context.Context, pipelineModule *monagent.PipelineModule) error {
	err := updatePipeline(ctx, pipelineModule)
	if err == nil {
		return nil
	}
	log.WithContext(ctx).WithError(err).Errorf("update pipeline module config failed, module name: %s", pipelineModule.Name)

	err = addPipeline(ctx, pipelineModule)
	if err != nil {
		log.WithContext(ctx).WithError(err).Errorf("add pipeline module config failed, module name: %s", pipelineModule.Name)
	}
	return err
}

func InitPipelineModuleCallback(ctx context.Context, pipelineModule *monagent.PipelineModule) error {
	if !pipelineModule.Status.Validate() {
		log.WithContext(ctx).Warnf("pipeline module config %s is invalid, just skip", pipelineModule.Name)
		return nil
	}
	if pipelineModule.Status == monagent.INACTIVE {
		log.WithContext(ctx).Warnf("pipeline module config %s is inactive or invalid, just skip", pipelineModule.Name)
	} else {
		notifyPipelineConfigRetryIfFailed(ctx, pipelineModule.Name, func(ctx context.Context) error {
			err := addPipeline(ctx, pipelineModule)
			if err != nil {
				return err
			}
			log.WithContext(ctx).Infof("add pipeline module config %s successfully.", pipelineModule.Name)
			return nil
		})
	}
	return nil
}

func UpdatePipelineModuleCallback(ctx context.Context, pipelineModule *monagent.PipelineModule) error {
	if !pipelineModule.Status.Validate() {
		log.WithContext(ctx).Warnf("pipeline module config %s is invalid, just skip", pipelineModule.Name)
		return nil
	}
	if pipelineModule.Status == monagent.INACTIVE {
		notifyPipelineConfigRetryIfFailed(ctx, pipelineModule.Name, func(ctx context.Context) error {
			err := deletePipeline(ctx, pipelineModule)
			if err != nil {
				return err
			}
			log.WithContext(ctx).Infof("delete pipeline module config %s successfully.", pipelineModule.Name)
			return nil
		})
	} else {
		notifyPipelineConfigRetryIfFailed(ctx, pipelineModule.Name, func(ctx context.Context) error {
			err := updateOrAddPipeline(ctx, pipelineModule)
			if err != nil {
				return err
			}
			log.WithContext(ctx).Infof("update or add pipeline module config %s successfully.", pipelineModule.Name)
			return nil
		})
	}
	return nil
}

func notifyPipelineConfigRetryIfFailed(ctx context.Context, pipeline string, retryEvent func(ctx context.Context) error) {
	pipelineConfigMutex.Lock()
	defer pipelineConfigMutex.Unlock()
	if _, ex := pipelineConfigRetry[pipeline]; !ex {
		pipelineConfigRetry[pipeline] = retry.NewRetry()
	}
	pipelineConfigRetry[pipeline].Do(ctx,
		retry.RetryConfig{
			Name:          pipeline,
			RetryEvent:    retryEvent,
			RetryTimeout:  time.Second * 30,
			MaxRetryTimes: 0,
			RetryDuration: &retry.DefaultNextDuration{
				BaseDuration: time.Second * 30,
				MaxDuration:  time.Minute * 10,
			}})
}