config_notify.go 7.3 KB
Newer Older
Z
zhj-luo 已提交
1 2 3 4 5 6 7 8 9 10 11 12
/*
 * Copyright (c) 2023 OceanBase
 * OCP Express is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

W
wangzelin.wzl 已提交
13 14 15 16 17 18 19 20 21 22
package config

import (
	"context"
	"strings"
	"sync"

	"github.com/pkg/errors"
	log "github.com/sirupsen/logrus"

O
ob-robot 已提交
23
	"github.com/oceanbase/obagent/executor/agent"
W
wangzelin.wzl 已提交
24 25 26 27 28 29 30 31 32 33 34
)

var (
	processConfigNotifyAddresses     map[string]ProcessConfigNotifyAddress
	processConfigNotifyAddressesLock sync.Mutex
)

func init() {
	processConfigNotifyAddresses = make(map[string]ProcessConfigNotifyAddress, 4)
}

O
ob-robot 已提交
35 36
// ProcessConfigNotifyAddress business process configuration information.
// This information serves as sdk source data for the mgragent and agentctl.
W
wangzelin.wzl 已提交
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
type ProcessConfigNotifyAddress struct {
	Local         bool            `yaml:"local"`
	Process       string          `yaml:"process"`
	NotifyAddress string          `yaml:"notifyAddress"`
	AuthConfig    BasicAuthConfig `yaml:"authConfig"`
}

func getProcessModuleConfigNotifyAddress(process string) (ProcessConfigNotifyAddress, bool) {
	processConfigNotifyAddressesLock.Lock()
	defer processConfigNotifyAddressesLock.Unlock()
	notify, ex := processConfigNotifyAddresses[process]
	return notify, ex
}

func SetProcessModuleConfigNotifyAddress(notifyConfig ProcessConfigNotifyAddress) {
	processConfigNotifyAddressesLock.Lock()
	defer processConfigNotifyAddressesLock.Unlock()
	processConfigNotifyAddresses[notifyConfig.Process] = notifyConfig
}

// InitModuleConfig init module config, then the init callback will be trigger
func InitModuleConfig(ctx context.Context, module string) error {
	callback, ex := getModuleCallback(module)
	if !ex {
		return errors.Errorf("module %s callback is not found", module)
	}
	return callback.InitConfigCallback(ctx, module)
}

// InitModuleTypeConfig init module type config, then the init callback will be trigger
func InitModuleTypeConfig(ctx context.Context, moduleType ModuleType) error {
	var errs []string
	for m, t := range modules {
		if t == moduleType {
			err := InitModuleConfig(ctx, m)
			if err != nil {
O
ob-robot 已提交
73
				errs = append(errs, errors.Errorf("init module %s err:%s", m, err).Error())
W
wangzelin.wzl 已提交
74 75 76 77 78 79 80 81 82 83 84 85 86 87
			}
		}
	}
	if len(errs) > 0 {
		return errors.New(strings.Join(errs, ","))
	}
	return nil
}

// NotifyModuleConfigs notify module config changed
func NotifyModuleConfigs(ctx context.Context, verifyConfigResult *VerifyConfigResult) error {
	if verifyConfigResult == nil {
		return nil
	}
O
ob-robot 已提交
88 89
	log.WithContext(ctx).Infof("notify module configs length:%d", len(verifyConfigResult.UpdatedConfigs))
	var errs []string
W
wangzelin.wzl 已提交
90 91 92
	for _, conf := range verifyConfigResult.UpdatedConfigs {
		err := notifyModuleConfig(ctx, conf)
		if err != nil {
O
ob-robot 已提交
93 94
			log.WithContext(ctx).Errorf("notify module %s err:%+v", conf.Module, err)
			errs = append(errs, err.Error())
W
wangzelin.wzl 已提交
95 96
		}
	}
O
ob-robot 已提交
97 98 99
	if len(errs) > 0 {
		return errors.Errorf("notify modules err:%s", strings.Join(errs, ","))
	}
W
wangzelin.wzl 已提交
100 101 102 103 104
	return nil
}

// notify module config changed
func notifyModuleConfig(ctx context.Context, econfig *NotifyModuleConfig) error {
O
ob-robot 已提交
105
	log.WithContext(ctx).Infof("module %s, notify process %s, current process %s", econfig.Module, econfig.Process, CurProcess)
W
wangzelin.wzl 已提交
106 107 108 109 110 111 112
	notifyAddress, ex := getProcessModuleConfigNotifyAddress(string(econfig.Process))
	if !ex {
		return errors.Errorf("process %s notify config is not found", econfig.Process)
	}
	if !notifyAddress.Local && econfig.Process != CurProcess {
		return NotifyRemoteModuleConfig(ctx, econfig)
	}
O
ob-robot 已提交
113
	return NotifyLocalModuleConfig(ctx, econfig)
W
wangzelin.wzl 已提交
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
}

// NotifyLocalModuleConfig notify local process's module config changed
func NotifyLocalModuleConfig(ctx context.Context, econfig *NotifyModuleConfig) error {
	callback, ex := getModuleCallback(econfig.Module)
	if !ex {
		return errors.Errorf("module %s not found", econfig.Module)
	}
	return callback.NotifyConfigCallback(ctx, econfig)
}

// NotifyRemoteModuleConfig notify remote process 's module config changed
func NotifyRemoteModuleConfig(ctx context.Context, econfig *NotifyModuleConfig) error {
	notifyAddress, ex := getProcessModuleConfigNotifyAddress(string(econfig.Process))
	if !ex {
		return errors.Errorf("module %s, process %s, config notify address is not found.", econfig.Module, econfig.Process)
	}
	ctxlog := log.WithContext(ctx).WithFields(log.Fields{
		"module":         econfig.Module,
		"process":        econfig.Process,
		"notify address": notifyAddress.NotifyAddress,
	})
	ctxlog.Infof("notify module config")

O
ob-robot 已提交
138 139
	admin := agent.NewAdmin(agent.DefaultAdminConf())
	client, err := admin.NewClient(notifyAddress.Process)
W
wangzelin.wzl 已提交
140
	if err != nil {
O
ob-robot 已提交
141
		ctxlog.Errorf("new client err:%+v", err)
W
wangzelin.wzl 已提交
142 143 144
		return err
	}

O
ob-robot 已提交
145 146
	resp := new(agent.AgentctlResponse)
	err = client.Call(notifyAddress.NotifyAddress, econfig, resp)
W
wangzelin.wzl 已提交
147
	if err != nil {
O
ob-robot 已提交
148
		ctxlog.Errorf("notify config err:%+v", err)
W
wangzelin.wzl 已提交
149 150
		return err
	}
O
ob-robot 已提交
151 152
	return nil
}
W
wangzelin.wzl 已提交
153

O
ob-robot 已提交
154 155 156
func NotifyAllModules(ctx context.Context) error {
	return notifyModules(ctx, nil, true)
}
W
wangzelin.wzl 已提交
157

O
ob-robot 已提交
158 159
func NotifyModules(ctx context.Context, modules []string) error {
	return notifyModules(ctx, modules, false)
W
wangzelin.wzl 已提交
160 161
}

O
ob-robot 已提交
162
func notifyModules(ctx context.Context, modules []string, all bool) error {
W
wangzelin.wzl 已提交
163
	moduleConfigs := GetModuleConfigs()
O
ob-robot 已提交
164 165 166 167 168 169 170 171 172 173 174 175 176 177
	modulesToNotify := make([]*NotifyModuleConfig, 0, len(moduleConfigs))
	for _, moduleConfTpl := range moduleConfigs {
		if moduleConfTpl.Disabled {
			log.WithContext(ctx).Infof("module %s config is disabled", moduleConfTpl.Module)
			continue
		}
		affected := false
		if all {
			affected = true
		} else {
			for _, module := range modules {
				if moduleConfTpl.Module == module {
					affected = true
					break
W
wangzelin.wzl 已提交
178 179 180
				}
			}
		}
O
ob-robot 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
		if !affected {
			continue
		}

		process := moduleConfTpl.Process
		moduleConf, err := GetFinalModuleConfig(moduleConfTpl.Module)
		if err != nil {
			log.WithContext(ctx).Error(err)
			return err
		}
		modulesToNotify = append(modulesToNotify, &NotifyModuleConfig{
			Process: process,
			Module:  moduleConfTpl.Module,
			Config:  moduleConf.Config,
		})
W
wangzelin.wzl 已提交
196
	}
O
ob-robot 已提交
197
	err := NotifyModuleConfigs(ctx, &VerifyConfigResult{
W
wangzelin.wzl 已提交
198
		ConfigVersion:  nil,
O
ob-robot 已提交
199
		UpdatedConfigs: modulesToNotify,
W
wangzelin.wzl 已提交
200 201
	})
	if err != nil {
O
ob-robot 已提交
202
		log.WithContext(ctx).Errorf("notify module configs %+v, err:%+v", modulesToNotify, err)
W
wangzelin.wzl 已提交
203 204 205 206 207 208
	}
	return err
}

func NotifyModuleConfigForHttp(ctx context.Context, nconfig *NotifyModuleConfig) error {
	ctxlog := log.WithContext(ctx)
O
ob-robot 已提交
209
	err := ReloadConfigFromFiles(ctx)
W
wangzelin.wzl 已提交
210
	if err != nil {
O
ob-robot 已提交
211
		return errors.Errorf("reload config err:%s", err)
W
wangzelin.wzl 已提交
212 213 214 215
	}

	moduleConf, err := GetFinalModuleConfig(nconfig.Module)
	if err != nil {
O
ob-robot 已提交
216
		return errors.Errorf("get module config err:%s", err)
W
wangzelin.wzl 已提交
217 218
	}

O
ob-robot 已提交
219
	if nconfig.Process != moduleConf.Process {
W
wangzelin.wzl 已提交
220 221 222 223 224 225 226 227 228 229 230 231 232 233
		ctxlog.Warnf("module %s process should be %s, not %s", nconfig.Module, moduleConf.Process, nconfig.Process)
	}

	if moduleConf.Disabled {
		return errors.Errorf("module %s config is disabled.", nconfig.Module)
	}

	if moduleConf.Process != string(CurProcess) {
		return errors.Errorf("process %s is not reached, cur process is %s", moduleConf.Process, CurProcess)
	}

	nconfig.Config = moduleConf.Config
	err = NotifyLocalModuleConfig(ctx, nconfig)
	if err != nil {
O
ob-robot 已提交
234
		return errors.Errorf("notify module config err:%s", err)
W
wangzelin.wzl 已提交
235 236 237 238
	}

	return nil
}