未验证 提交 c731a344 编写于 作者: R Rongfeng Fu 提交者: GitHub

V1.2.0 (#70)

* fix #61

* fix #53

* v1.2.0

* update txt

* lock O

* obagent reload

* update autodeploy
上级 4e0360e8
......@@ -36,7 +36,7 @@ from tool import DirectoryUtil, FileUtil
ROOT_IO = IO(1)
VERSION = '1.1.2'
VERSION = '<VERSION>'
REVISION = '<CID>'
BUILD_BRANCH = '<B_BRANCH>'
BUILD_TIME = '<B_TIME>'
......@@ -90,6 +90,7 @@ class BaseCommand(object):
class ObdCommand(BaseCommand):
OBD_PATH = os.path.join(os.environ.get('OBD_HOME', os.getenv('HOME')), '.obd')
OBD_INSTALL_PRE = os.environ.get('OBD_INSTALL_PRE', '/')
def init_home(self):
version_path = os.path.join(self.OBD_PATH, 'version')
......@@ -100,13 +101,13 @@ class ObdCommand(BaseCommand):
if VERSION != version:
obd_plugin_path = os.path.join(self.OBD_PATH, 'plugins')
if DirectoryUtil.mkdir(self.OBD_PATH):
root_plugin_path = '/usr/obd/plugins'
root_plugin_path = os.path.join(self.OBD_INSTALL_PRE, 'usr/obd/plugins')
if os.path.exists(root_plugin_path):
DirectoryUtil.copy(root_plugin_path, obd_plugin_path, ROOT_IO)
obd_mirror_path = os.path.join(self.OBD_PATH, 'mirror')
obd_remote_mirror_path = os.path.join(self.OBD_PATH, 'mirror/remote')
if DirectoryUtil.mkdir(obd_mirror_path):
root_remote_mirror = '/usr/obd/mirror/remote'
root_remote_mirror = os.path.join(self.OBD_INSTALL_PRE, 'usr/obd/mirror/remote')
if os.path.exists(root_remote_mirror):
DirectoryUtil.copy(root_remote_mirror, obd_remote_mirror_path, ROOT_IO)
version_fobj.seek(0)
......@@ -137,7 +138,7 @@ class ObdCommand(BaseCommand):
except NotImplementedError:
ROOT_IO.exception('command \'%s\' is not implemented' % self.prev_cmd)
except IOError:
ROOT_IO.exception('OBD is running.')
ROOT_IO.exception('Another app is currently holding the obd lock.')
except SystemExit:
pass
except:
......@@ -247,18 +248,18 @@ class MirrorListCommand(ObdCommand):
else:
repos = obd.mirror_manager.get_mirrors()
for repo in repos:
if repo.name == name:
if repo.section_name == name:
pkgs = repo.get_all_pkg_info()
self.show_pkg(name, pkgs)
return True
ROOT_IO.error('No such mirror repository: %s' % name)
return False
else:
repos = obd.mirror_manager.get_mirrors()
repos = obd.mirror_manager.get_mirrors(is_enabled=None)
ROOT_IO.print_list(
repos,
['Name', 'Type', 'Update Time'],
lambda x: [x.name, x.mirror_type.value, time.strftime("%Y-%m-%d %H:%M", time.localtime(x.repo_age))],
repos,
['SectionName', 'Type', 'Enabled','Update Time'],
lambda x: [x.section_name, x.mirror_type.value, x.enabled, time.strftime("%Y-%m-%d %H:%M", time.localtime(x.repo_age))],
title='Mirror Repository List'
)
return True
......@@ -271,17 +272,39 @@ class MirrorUpdateCommand(ObdCommand):
def _do_command(self, obd):
success = True
repos = obd.mirror_manager.get_remote_mirrors()
for repo in repos:
current = int(time.time())
mirrors = obd.mirror_manager.get_remote_mirrors()
for mirror in mirrors:
try:
success = repo.update_mirror() and success
if mirror.enabled and mirror.repo_age < current:
success = mirror.update_mirror() and success
except:
success = False
ROOT_IO.stop_loading('fail')
ROOT_IO.exception('Fail to synchronize mirorr (%s)' % repo.name)
ROOT_IO.exception('Fail to synchronize mirorr (%s)' % mirror.name)
return success
class MirrorEnableCommand(ObdCommand):
def __init__(self):
super(MirrorEnableCommand, self).__init__('enable', 'Enable remote mirror repository.')
def _do_command(self, obd):
name = self.cmds[0]
obd.mirror_manager.set_remote_mirror_enabled(name, True)
class MirrorDisableCommand(ObdCommand):
def __init__(self):
super(MirrorDisableCommand, self).__init__('disable', 'Disable remote mirror repository.')
def _do_command(self, obd):
name = self.cmds[0]
obd.mirror_manager.set_remote_mirror_enabled(name, False)
class MirrorMajorCommand(MajorCommand):
def __init__(self):
......@@ -290,6 +313,8 @@ class MirrorMajorCommand(MajorCommand):
self.register_command(MirrorCloneCommand())
self.register_command(MirrorCreateCommand())
self.register_command(MirrorUpdateCommand())
self.register_command(MirrorEnableCommand())
self.register_command(MirrorDisableCommand())
class RepositoryListCommand(ObdCommand):
......@@ -428,10 +453,12 @@ class ClusterRestartCommand(ClusterMirrorCommand):
super(ClusterRestartCommand, self).__init__('restart', 'Restart a started cluster.')
self.parser.add_option('-s', '--servers', type='string', help="List the started servers. Multiple servers are separated with commas.")
self.parser.add_option('-c', '--components', type='string', help="List the started components. Multiple components are separated with commas.")
self.parser.add_option('--without-parameter', '--wop', action='store_true', help='Start without parameters.')
self.parser.add_option('--with-parameter', '--wp', action='store_true', help='Restart with parameters.')
def _do_command(self, obd):
if self.cmds:
if not getattr(self.opts, 'with_parameter', False):
setattr(self.opts, 'without_parameter', True)
return obd.restart_cluster(self.cmds[0], self.opts)
else:
return self._show_help()
......@@ -491,7 +518,12 @@ class CLusterUpgradeCommand(ClusterMirrorCommand):
def __init__(self):
super(CLusterUpgradeCommand, self).__init__('upgrade', 'Upgrade a cluster.')
self.parser.add_option('-f', '--force', action='store_true', help="Force upgrade.")
self.parser.add_option('-c', '--components', type='string', help="List the updated components. Multiple components are separated with commas.")
self.parser.add_option('-c', '--component', type='string', help="Component name to upgrade.")
self.parser.add_option('-V', '--version', type='string', help="Target version.")
self.parser.add_option('--skip-check', action='store_true', help="Skip all the possible checks.")
self.parser.add_option('--usable', type='string', help="Hash list for priority mirrors, separated with `,`.", default='')
self.parser.add_option('--disable', type='string', help="Hash list for disabled mirrors, separated with `,`.", default='')
self.parser.add_option('-e', '--executer-path', type='string', help="Executer path.", default=os.path.join(ObdCommand.OBD_INSTALL_PRE, 'usr/obd/lib/executer'))
def _do_command(self, obd):
if self.cmds:
......@@ -627,7 +659,7 @@ class SysBenchCommand(TestMirrorCommand):
self.parser.add_option('--database', type='string', help='Database for a test. [test]', default='test')
self.parser.add_option('--obclient-bin', type='string', help='OBClient bin path. [obclient]', default='obclient')
self.parser.add_option('--sysbench-bin', type='string', help='Sysbench bin path. [sysbench]', default='sysbench')
self.parser.add_option('--script-name', type='string', help='Sysbench lua script file name. [point_select]', default='oltp_point_select.lua')
self.parser.add_option('--script-name', type='string', help='Sysbench lua script file name. [oltp_point_select]', default='oltp_point_select.lua')
self.parser.add_option('--sysbench-script-dir', type='string', help='The directory of the sysbench lua script file. [/usr/sysbench/share/sysbench]', default='/usr/sysbench/share/sysbench')
self.parser.add_option('--table-size', type='int', help='Number of data initialized per table. [20000]', default=20000)
self.parser.add_option('--tables', type='int', help='Number of initialization tables. [30]', default=30)
......@@ -704,7 +736,7 @@ class UpdateCommand(ObdCommand):
return super(UpdateCommand, self).do_command()
def _do_command(self, obd):
return obd.update_obd(VERSION)
return obd.update_obd(VERSION, self.OBD_INSTALL_PRE)
class MainCommand(MajorCommand):
......@@ -735,7 +767,7 @@ if __name__ == '__main__':
pass
reload(sys)
sys.setdefaultencoding(defaultencoding)
sys.path.append('/usr/obd/lib/site-packages')
sys.path.append(os.path.join(ObdCommand.OBD_INSTALL_PRE, 'usr/obd/lib/site-packages'))
ROOT_IO.track_limit += 2
if MainCommand().init('obd', sys.argv[1:]).do_command():
ROOT_IO.exit(0)
......
......@@ -22,9 +22,11 @@ from __future__ import absolute_import, division, print_function
import os
import re
import pickle
import getpass
from copy import deepcopy
from enum import Enum
from collections import OrderedDict
from tool import ConfigUtil, FileUtil, YamlLoader
from _manager import Manager
......@@ -228,6 +230,25 @@ class ClusterConfig(object):
self._default_conf[key] = self._temp_conf[key].default
self.set_global_conf(self._global_conf) # 更新全局配置
def check_param(self):
error = []
if self._temp_conf:
error += self._check_param(self._global_conf)
for server in self._server_conf:
error += self._check_param(self._server_conf[server])
return not error, set(error)
def _check_param(self, config):
error = []
for key in config:
item = self._temp_conf.get(key)
if item:
try:
item.check_value(config[key])
except Exception as e:
error.append(str(e))
return error
def set_global_conf(self, conf):
self._original_global_conf = deepcopy(conf)
self._global_conf = deepcopy(self._default_conf)
......@@ -270,6 +291,7 @@ class DeployStatus(Enum):
STATUS_STOPPED = 'stopped'
STATUS_DESTROYING = 'destroying'
STATUS_DESTROYED = 'destroyed'
STATUS_UPRADEING = 'upgrading'
class DeployConfigStatus(Enum):
......@@ -282,7 +304,7 @@ class DeployConfigStatus(Enum):
class DeployInfo(object):
def __init__(self, name, status, components={}, config_status=DeployConfigStatus.UNCHNAGE):
def __init__(self, name, status, components=OrderedDict(), config_status=DeployConfigStatus.UNCHNAGE):
self.status = status
self.name = name
self.components = components
......@@ -301,7 +323,7 @@ class DeployConfig(object):
self._user = None
self.unuse_lib_repository = False
self.auto_create_tenant = False
self.components = {}
self.components = OrderedDict()
self._src_data = None
self.yaml_path = yaml_path
self.yaml_loader = yaml_loader
......@@ -325,6 +347,29 @@ class DeployConfig(object):
return self._dump()
return True
def update_component_package_hash(self, component, package_hash, version=None):
if component not in self.components:
return False
ori_data = self._src_data[component]
src_data = deepcopy(ori_data)
src_data['package_hash'] = package_hash
if version:
src_data['version'] = version
elif 'version' in src_data:
del src_data['version']
if 'tag' in src_data:
del src_data['tag']
self._src_data[component] = src_data
if self._dump():
cluster_config = self.components[component]
cluster_config.package_hash = src_data.get('package_hash')
cluster_config.version = src_data.get('version')
cluster_config.tag = None
return True
self._src_data[component] = ori_data
return False
def _load(self):
try:
with open(self.yaml_path, 'rb') as f:
......@@ -461,6 +506,7 @@ class Deploy(object):
DEPLOY_STATUS_FILE = '.data'
DEPLOY_YAML_NAME = 'config.yaml'
UPRADE_META_NAME = '.upgrade'
def __init__(self, config_dir, stdio=None):
self.config_dir = config_dir
......@@ -468,6 +514,7 @@ class Deploy(object):
self._info = None
self._config = None
self.stdio = stdio
self._uprade_meta = None
def use_model(self, name, repository, dump=True):
self.deploy_info.components[name] = {
......@@ -484,6 +531,10 @@ class Deploy(object):
def get_deploy_yaml_path(path):
return os.path.join(path, Deploy.DEPLOY_YAML_NAME)
@staticmethod
def get_upgrade_meta_path(path):
return os.path.join(path, Deploy.UPRADE_META_NAME)
@staticmethod
def get_temp_deploy_yaml_path(path):
return os.path.join(path, 'tmp_%s' % Deploy.DEPLOY_YAML_NAME)
......@@ -498,7 +549,7 @@ class Deploy(object):
self._info = DeployInfo(
data['name'],
getattr(DeployStatus, data['status'], DeployStatus.STATUS_CONFIGURED),
ConfigUtil.get_value_from_dict(data, 'components', {}),
ConfigUtil.get_value_from_dict(data, 'components', OrderedDict()),
getattr(DeployConfigStatus, ConfigUtil.get_value_from_dict(data, 'config_status', '_'), DeployConfigStatus.UNCHNAGE),
)
except:
......@@ -525,6 +576,26 @@ class Deploy(object):
pass
return self._config
def _get_uprade_meta(self):
if self._uprade_meta is None and self.deploy_info.status == DeployStatus.STATUS_UPRADEING:
try:
path = self.get_upgrade_meta_path(self.config_dir)
with open(path) as f:
self._uprade_meta = yaml.load(f)
except:
self.stdio and getattr(self.stdio, 'exception', print)('fail to load uprade meta data')
return self._uprade_meta
@property
def upgrade_ctx(self):
uprade_meta = self._get_uprade_meta()
return uprade_meta.get('uprade_ctx') if uprade_meta else None
@property
def upgrading_component(self):
uprade_meta = self._get_uprade_meta()
return uprade_meta.get('component') if uprade_meta else None
def apply_temp_deploy_config(self):
src_yaml_path = self.get_temp_deploy_yaml_path(self.config_dir)
target_src_path = self.get_deploy_yaml_path(self.config_dir)
......@@ -554,18 +625,85 @@ class Deploy(object):
self.stdio and getattr(self.stdio, 'exception', print)('dump deploy info to %s failed' % path)
return False
def _update_deploy_status(self, status):
old = self.deploy_info.status
self.deploy_info.status = status
if self._dump_deploy_info():
return True
self.deploy_info.status = old
return False
def _update_deploy_config_status(self, status):
old = self.deploy_info.config_status
self.deploy_info.config_status = status
if self._dump_deploy_info():
return True
self.deploy_info.config_status = old
return False
def _dump_upgrade_meta_data(self):
path = self.get_upgrade_meta_path(self.config_dir)
self.stdio and getattr(self.stdio, 'verbose', print)('dump upgrade meta data to %s' % path)
try:
if self._uprade_meta:
with open(path, 'wb') as f:
yaml.dump(self._uprade_meta, f)
else:
FileUtil.rm(path, self.stdio)
return True
except:
self.stdio and getattr(self.stdio, 'exception', print)('dump upgrade meta data to %s failed' % path)
return False
def start_upgrade(self, component, **uprade_ctx):
if self.deploy_info.status != DeployStatus.STATUS_RUNNING:
return False
self._uprade_meta = {
'component': component,
'uprade_ctx': uprade_ctx
}
if self._dump_upgrade_meta_data() and self._update_deploy_status(DeployStatus.STATUS_UPRADEING):
return True
self._uprade_meta = None
return False
def update_upgrade_ctx(self, **uprade_ctx):
if self.deploy_info.status != DeployStatus.STATUS_UPRADEING:
return False
uprade_meta = deepcopy(self._get_uprade_meta())
self._uprade_meta['uprade_ctx'].update(uprade_ctx)
if self._dump_upgrade_meta_data():
return True
self._uprade_meta = uprade_meta
return False
def _update_componet_repository(self, component, repository):
if not self.deploy_config.update_component_package_hash(component, repository.hash, repository.version):
return False
self.use_model(component, repository)
return True
def stop_upgrade(self, dest_repository=None):
if self._update_deploy_status(DeployStatus.STATUS_RUNNING):
self._uprade_meta = None
self._dump_upgrade_meta_data()
if dest_repository:
self._update_componet_repository(dest_repository.name, dest_repository)
return True
return False
def update_deploy_status(self, status):
if isinstance(status, DeployStatus):
self.deploy_info.status = status
if DeployStatus.STATUS_DESTROYED == status:
self.deploy_info.components = {}
return self._dump_deploy_info()
if self._update_deploy_status(status):
if DeployStatus.STATUS_DESTROYED == status:
self.deploy_info.components = {}
self._dump_deploy_info()
return True
return False
def update_deploy_config_status(self, status):
if isinstance(status, DeployConfigStatus):
self.deploy_info.config_status = status
return self._dump_deploy_info()
return self._update_deploy_config_status(status)
return False
......@@ -573,24 +711,36 @@ class DeployManager(Manager):
RELATIVE_PATH = 'cluster/'
def __init__(self, home_path, stdio=None):
def __init__(self, home_path, lock_manager=None, stdio=None):
super(DeployManager, self).__init__(home_path, stdio)
self.lock_manager = lock_manager
def _lock(self, name, read_only=False):
if self.lock_manager:
if read_only:
return self.lock_manager.deploy_sh_lock(name)
else:
return self.lock_manager.deploy_ex_lock(name)
return True
def get_deploy_configs(self):
def get_deploy_configs(self, read_only=True):
configs = []
for file_name in os.listdir(self.path):
path = os.path.join(self.path, file_name)
for name in os.listdir(self.path):
path = os.path.join(self.path, name)
if os.path.isdir(path):
self._lock(name, read_only)
configs.append(Deploy(path, self.stdio))
return configs
def get_deploy_config(self, name):
def get_deploy_config(self, name, read_only=False):
self._lock(name, read_only)
path = os.path.join(self.path, name)
if os.path.isdir(path):
return Deploy(path, self.stdio)
return None
def create_deploy_config(self, name, src_yaml_path):
self._lock(name)
config_dir = os.path.join(self.path, name)
target_src_path = Deploy.get_deploy_yaml_path(config_dir)
self._mkdir(config_dir)
......@@ -601,5 +751,6 @@ class DeployManager(Manager):
return None
def remove_deploy_config(self, name):
self._lock(name)
config_dir = os.path.join(self.path, name)
self._rm(config_dir)
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import os
import time
from enum import Enum
from tool import FileUtil
from _manager import Manager
class LockType(Enum):
MIR_REPO = 'mirror_and_repo'
DEPLOY = 'deploy'
GLOBAL = 'global'
class MixLock(object):
def __init__(self, path, stdio=None):
self.path = path
self.stdio = stdio
self._lock_obj = None
self._sh_cnt = 0
self._ex_cnt = 0
def __del__(self):
self._unlock()
@property
def lock_obj(self):
if self._lock_obj is None or self._lock_obj.closed:
self._lock_obj = FileUtil.open(self.path, _type='w')
return self._lock_obj
@property
def locked(self):
return self._sh_cnt or self._ex_cnt
def _ex_lock(self):
if self.lock_obj:
FileUtil.exclusive_lock_obj(self.lock_obj, stdio=self.stdio)
def _sh_lock(self):
if self.lock_obj:
FileUtil.share_lock_obj(self.lock_obj, stdio=self.stdio)
def sh_lock(self):
if not self.locked:
self._sh_lock()
self._sh_cnt += 1
self.stdio and getattr(self.stdio, 'verbose', print)('share lock `%s`, count %s' % (self.path, self._sh_cnt))
return True
def ex_lock(self):
if self._ex_cnt == 0:
try:
self._ex_lock()
except Exception as e:
if self._sh_cnt:
self.lock_escalation(LockManager.TRY_TIMES)
else:
raise e
self._ex_cnt += 1
self.stdio and getattr(self.stdio, 'verbose', print)('exclusive lock `%s`, count %s' % (self.path, self._ex_cnt))
return True
def lock_escalation(self, try_times):
self.stdio and getattr(self.stdio, 'start_loading', print)('waiting for the lock')
try:
self._lock_escalation(try_times)
self.stdio and getattr(self.stdio, 'stop_loading', print)('succeed')
except Exception as e:
self.stdio and getattr(self.stdio, 'stop_loading', print)('fail')
raise e
def _lock_escalation(self, try_times):
stdio = self.stdio
while try_times:
try:
if try_times % 1000:
self.stdio = None
else:
self.stdio = stdio
try_times -= 1
self._ex_lock()
break
except KeyboardInterrupt:
self.stdio = stdio
raise IOError('fail to get lock')
except Exception as e:
if try_times:
time.sleep(LockManager.TRY_INTERVAL)
else:
self.stdio = stdio
raise e
self.stdio = stdio
def _sh_unlock(self):
if self._sh_cnt == 0:
if self._ex_cnt == 0:
self._unlock()
def _ex_unlock(self):
if self._ex_cnt == 0:
if self._sh_cnt > 0:
self._sh_lock()
else:
self._unlock()
def sh_unlock(self):
if self._sh_cnt > 0:
self._sh_cnt -= 1
self.stdio and getattr(self.stdio, 'verbose', print)('share lock %s release, count %s' % (self.path, self._sh_cnt))
self._sh_unlock()
return self.locked is False
def ex_unlock(self):
if self._ex_cnt > 0:
self._ex_cnt -= 1
self.stdio and getattr(self.stdio, 'verbose', print)('exclusive lock %s release, count %s' % (self.path, self._ex_cnt))
self._ex_unlock()
return self.locked is False
def _unlock(self):
if self._lock_obj:
FileUtil.unlock(self._lock_obj, stdio=self.stdio)
self._lock_obj.close()
self._lock_obj = None
self._sh_cnt = 0
self._ex_cnt = 0
class Lock(object):
def __init__(self, mix_lock):
self.mix_lock = mix_lock
def lock(self):
raise NotImplementedError
def unlock(self):
raise NotImplementedError
class SHLock(Lock):
def lock(self):
self.mix_lock.sh_lock()
def unlock(self):
self.mix_lock.sh_unlock()
class EXLock(Lock):
def lock(self):
self.mix_lock.ex_lock()
def unlock(self):
self.mix_lock.ex_unlock()
class LockManager(Manager):
TRY_TIMES = 6000
TRY_INTERVAL = 0.01
RELATIVE_PATH = 'lock/'
GLOBAL_FN = LockType.GLOBAL.value
MIR_REPO_FN = LockType.MIR_REPO.value
DEPLOY_FN_PERFIX = LockType.DEPLOY.value
LOCKS = {}
def __init__(self, home_path, stdio=None):
super(LockManager, self).__init__(home_path, stdio)
self.locks = []
self.global_path = os.path.join(self.path, self.GLOBAL_FN)
self.mir_repo_path = os.path.join(self.path, self.MIR_REPO_FN)
@staticmethod
def set_try_times(try_times):
LockManager.TRY_TIMES = try_times
@staticmethod
def set_try_interval(try_interval):
LockManager.TRY_INTERVAL = try_interval
def __del__(self):
for lock in self.locks[::-1]:
lock.unlock()
def _get_mix_lock(self, path):
if path not in self.LOCKS:
self.LOCKS[path] = MixLock(path, stdio=self.stdio)
return self.LOCKS[path]
def _lock(self, path, clz):
mix_lock = self._get_mix_lock(path)
lock = clz(mix_lock)
lock.lock()
self.locks.append(lock)
return True
def _sh_lock(self, path):
return self._lock(path, SHLock)
def _ex_lock(self, path):
return self._lock(path, EXLock)
def global_ex_lock(self):
return self._ex_lock(self.global_path)
def global_sh_lock(self):
return self._sh_lock(self.global_path)
def mirror_and_repo_ex_lock(self):
return self._ex_lock(self.mir_repo_path)
def mirror_and_repo_sh_lock(self):
return self._sh_lock(self.mir_repo_path)
def _deploy_lock_fp(self, deploy_name):
return os.path.join(self.path, '%s_%s' % (self.DEPLOY_FN_PERFIX, deploy_name))
def deploy_ex_lock(self, deploy_name):
return self._ex_lock(self._deploy_lock_fp(deploy_name))
def deploy_sh_lock(self, deploy_name):
return self._sh_lock(self._deploy_lock_fp(deploy_name))
此差异已折叠。
......@@ -21,6 +21,7 @@
from __future__ import absolute_import, division, print_function
import os
import re
import sys
from enum import Enum
from glob import glob
......@@ -262,15 +263,232 @@ class PyScriptPlugin(ScriptPlugin):
class ParamPlugin(Plugin):
class ConfigItemType(object):
TYPE_STR = None
def __init__(self, s):
try:
self._origin = s
self._value = 0
self._format()
except:
raise Exception("'%s' is not %s" % (self._origin, self._type_str))
@property
def _type_str(self):
if self.TYPE_STR is None:
self.TYPE_STR = str(self.__class__.__name__).split('.')[-1]
return self.TYPE_STR
def _format(self):
raise NotImplementedError
def __str__(self):
return str(self._origin)
def __hash__(self):
return self._origin.__hash__()
@property
def __cmp_value__(self):
return self._value
def __eq__(self, value):
if value is None:
return False
return self.__cmp_value__ == value.__cmp_value__
def __gt__(self, value):
if value is None:
return True
return self.__cmp_value__ > value.__cmp_value__
def __ge__(self, value):
if value is None:
return True
return self.__eq__(value) or self.__gt__(value)
def __lt__(self, value):
if value is None:
return False
return self.__cmp_value__ < value.__cmp_value__
def __le__(self, value):
if value is None:
return False
return self.__eq__(value) or self.__lt__(value)
class Moment(ConfigItemType):
def _format(self):
if self._origin:
if self._origin.upper() == 'DISABLE':
self._value = 0
else:
r = re.match('^(\d{1,2}):(\d{1,2})$', self._origin)
h, m = r.groups()
h, m = int(h), int(m)
if 0 <= h <= 23 and 0 <= m <= 60:
self._value = h * 60 + m
else:
raise Exception('Invalid Value')
else:
self._value = 0
class Time(ConfigItemType):
UNITS = {
'ns': 0.000000001,
'us': 0.000001,
'ms': 0.001,
's': 1,
'm': 60,
'h': 3600,
'd': 86400
}
def _format(self):
if self._origin:
self._origin = str(self._origin).strip()
if self._origin.isdigit():
n = self._origin
unit = self.UNITS['s']
else:
r = re.match('^(\d+)(\w+)$', self._origin.lower())
n, u = r.groups()
unit = self.UNITS.get(u.lower())
if unit:
self._value = int(n) * unit
else:
raise Exception('Invalid Value')
else:
self._value = 0
class Capacity(ConfigItemType):
UNITS = {"B": 1, "K": 1<<10, "M": 1<<20, "G": 1<<30, "T": 1<<40, 'P': 1 << 50}
def _format(self):
if self._origin:
self._origin = str(self._origin).strip()
if self._origin.isdigit():
n = self._origin
unit = self.UNITS['M']
else:
r = re.match('^(\d+)(\w)B?$', self._origin.upper())
n, u = r.groups()
unit = self.UNITS.get(u.upper())
if unit:
self._value = int(n) * unit
else:
raise Exception('Invalid Value')
else:
self._value = 0
class StringList(ConfigItemType):
def _format(self):
if self._origin:
self._origin = str(self._origin).strip()
self._value = self._origin.split(';')
else:
self._value = []
class Double(ConfigItemType):
def _format(self):
self._value = float(self._origin) if self._origin else 0
class Boolean(ConfigItemType):
def _format(self):
if isinstance(self._origin, bool):
self._value = self._origin
else:
_origin = str(self._origin).lower()
if _origin.isdigit() or _origin in ['true', 'false']:
self._value = bool(self._origin)
else:
raise Exception('%s is not Boolean')
class Integer(ConfigItemType):
def _format(self):
if self._origin is None:
self._value = 0
self._origin = 0
else:
_origin = str(self._origin)
if _origin.isdigit():
self._value = int(_origin)
else:
raise Exception('%s is not Integer')
class String(ConfigItemType):
def _format(self):
self._value = str(self._origin) if self._origin else ''
class ConfigItem(object):
def __init__(self, name, default=None, require=False, need_restart=False, need_redeploy=False):
def __init__(
self,
name,
param_type=str,
default=None,
min_value=None,
max_value=None,
require=False,
need_restart=False,
need_redeploy=False,
modify_limit=None
):
self.name = name
self.default = default
self.require = require
self.need_restart = need_restart
self.need_redeploy = need_redeploy
self._param_type = param_type
self.min_value = param_type(min_value) if min_value is not None else None
self.max_value = param_type(max_value) if max_value is not None else None
self.modify_limit = getattr(self, ('_%s_limit' % modify_limit).lower(), self._none_limit)
self.had_modify_limit = self.modify_limit != self._none_limit
def param_type(self, value):
try:
return self._param_type(value)
except Exception as e:
raise Exception('%s: %s' % (self.name, e))
def check_value(self, value):
if not isinstance(value, self._param_type):
value = self.param_type(value)
if self.min_value is not None and value < self.min_value:
raise Exception('%s less then %s' % (self.name, self.min_value))
if self.max_value is not None and value > self.max_value:
raise Exception('%s more then %s' % (self.name, self.max_value))
return True
def _modify_limit(self, old_value, new_value):
if old_value == new_value:
return True
raise Exception('DO NOT modify %s after startup' % self.name)
def _increase_limit(self, old_value, new_value):
if self.param_type(new_value) > self.param_type(old_value):
raise Exception('DO NOT increase %s after startup' % self.name)
return True
def _decrease_limit(self, old_value, new_value):
if self.param_type(new_value) < self.param_type(old_value):
raise Exception('DO NOT decrease %s after startup' % self.name)
return True
def _none_limit(self, old_value, new_value):
return True
PLUGIN_TYPE = PluginType.PARAM
DEF_PARAM_YAML = 'parameter.yaml'
FLAG_FILE = DEF_PARAM_YAML
......@@ -279,51 +497,92 @@ class ParamPlugin(Plugin):
super(ParamPlugin, self).__init__(component_name, plugin_path, version)
self.def_param_yaml_path = os.path.join(self.plugin_path, self.DEF_PARAM_YAML)
self._src_data = None
self._need_redploy_items = None
self._had_modify_limit_items = None
self._need_restart_items = None
self._params_default = None
@property
def params(self):
if self._src_data is None:
try:
TYPES = {
'DOUBLE': ParamPlugin.Double,
'BOOL': ParamPlugin.Boolean,
'INT': ParamPlugin.Integer,
'STRING': ParamPlugin.String,
'MOMENT': ParamPlugin.Moment,
'TIME': ParamPlugin.Time,
'CAPACITY': ParamPlugin.Capacity,
'STRING_LIST': ParamPlugin.StringList
}
self._src_data = {}
with open(self.def_param_yaml_path, 'rb') as f:
configs = yaml.load(f)
for conf in configs:
param_type = ConfigUtil.get_value_from_dict(conf, 'type', 'STRING').upper()
if param_type in TYPES:
param_type = TYPES[param_type]
else:
param_type = ParamPlugin.String
self._src_data[conf['name']] = ParamPlugin.ConfigItem(
conf['name'],
ConfigUtil.get_value_from_dict(conf, 'default', None),
ConfigUtil.get_value_from_dict(conf, 'require', False),
ConfigUtil.get_value_from_dict(conf, 'need_restart', False),
ConfigUtil.get_value_from_dict(conf, 'need_redeploy', False),
name=conf['name'],
param_type=param_type,
default=ConfigUtil.get_value_from_dict(conf, 'default', None),
min_value=ConfigUtil.get_value_from_dict(conf, 'min_value', None),
max_value=ConfigUtil.get_value_from_dict(conf, 'max_value', None),
modify_limit=ConfigUtil.get_value_from_dict(conf, 'modify_limit', None),
require=ConfigUtil.get_value_from_dict(conf, 'require', False),
need_restart=ConfigUtil.get_value_from_dict(conf, 'need_restart', False),
need_redeploy=ConfigUtil.get_value_from_dict(conf, 'need_redeploy', False)
)
except:
pass
return self._src_data
def get_need_redeploy_items(self):
items = []
params = self.params
for name in params:
conf = params[name]
if conf.need_redeploy:
items.append(name)
return items
def get_need_restart_items(self):
items = []
params = self.params
for name in params:
conf = params[name]
if conf.need_restart:
items.append(name)
return items
def get_params_default(self):
temp = {}
params = self.params
for name in params:
conf = params[name]
temp[conf.name] = conf.default
return temp
@property
def redploy_params(self):
if self._need_redploy_items is None:
self._need_redploy_items = []
params = self.params
for name in params:
conf = params[name]
if conf.need_redeploy:
self._need_redploy_items.append(conf)
return self._need_redploy_items
@property
def modify_limit_params(self):
if self._had_modify_limit_items is None:
self._had_modify_limit_items = []
params = self.params
for name in params:
conf = params[name]
if conf.had_modify_limit:
self._had_modify_limit_items.append(conf)
return self._had_modify_limit_items
@property
def restart_params(self):
if self._need_restart_items is None:
self._need_restart_items = []
params = self.params
for name in params:
conf = params[name]
if conf.need_restart:
self._need_restart_items.append(conf)
return self._need_restart_items
@property
def params_default(self):
if self._params_default is None:
self._params_default = {}
params = self.params
for name in params:
conf = params[name]
temp[conf.name] = conf.default
return self._params_default
class InstallPlugin(Plugin):
......@@ -344,34 +603,65 @@ class InstallPlugin(Plugin):
PLUGIN_TYPE = PluginType.INSTALL
FILES_MAP_YAML = 'file_map.yaml'
FLAG_FILE = FILES_MAP_YAML
_KEYCRE = re.compile(r"\$(\w+)")
def __init__(self, component_name, plugin_path, version):
super(InstallPlugin, self).__init__(component_name, plugin_path, version)
self.file_map_path = os.path.join(self.plugin_path, self.FILES_MAP_YAML)
self._file_map = None
@property
def file_map(self):
if self._file_map is None:
self._file_map = {}
@classmethod
def var_replace(cls, string, var):
if not var:
return string
done = []
while string:
m = cls._KEYCRE.search(string)
if not m:
done.append(string)
break
varname = m.group(1).lower()
replacement = var.get(varname, m.group())
start, end = m.span()
done.append(string[:start])
done.append(str(replacement))
string = string[end:]
return ''.join(done)
def file_map(self, package_info):
var = {
'name': package_info.name,
'version': package_info.version,
'release': package_info.release,
'arch': package_info.arch,
'md5': package_info.md5,
}
key = str(var)
if not self._file_map.get(key):
try:
self._file_map = {}
file_map = {}
with open(self.file_map_path, 'rb') as f:
file_map = yaml.load(f)
for data in file_map:
for data in yaml.load(f):
k = data['src_path']
if k[0] != '.':
k = '.%s' % os.path.join('/', k)
self._file_map[k] = InstallPlugin.FileItem(
k = self.var_replace(k, var)
file_map[k] = InstallPlugin.FileItem(
k,
ConfigUtil.get_value_from_dict(data, 'target_path', k),
getattr(InstallPlugin.FileItemType, ConfigUtil.get_value_from_dict(data, 'type', 'FILE').upper(), None)
)
self._file_map[key] = file_map
except:
pass
return self._file_map
return self._file_map[key]
def file_list(self):
file_map = self.file_map
def file_list(self, package_info):
file_map = self.file_map(package_info)
return [file_map[k] for k in file_map]
......
......@@ -66,13 +66,36 @@ class LocalPackage(Package):
self.md5 = None
self.arch = arch if arch else getBaseArch()
self.headers = {}
self.files = files
self.files = self.get_all_files(files)
self.path = path
self.package()
def __hash__(self):
return hash(self.path)
@staticmethod
def get_all_files(source_files):
files = {}
for src_path, target_path in source_files.items():
if not os.path.isdir(target_path) or os.path.islink(target_path):
files[src_path] = target_path
else:
files[src_path+'/'] = target_path
for fp in LocalPackage.list_dir(target_path):
files[os.path.join(src_path, os.path.relpath(fp, target_path))] = fp
return files
@staticmethod
def list_dir(path):
files = []
for fn in os.listdir(path):
fp = os.path.join(path, fn)
if not os.path.isdir(fp) or os.path.islink(fp):
files.append(fp)
else:
files += LocalPackage.list_dir(fp)
return files
def package(self):
count = 0
dirnames = []
......@@ -90,21 +113,22 @@ class LocalPackage(Package):
dirnames.append(dirname)
dirnames_map[dirname] = count
count += 1
basenames.append(basename)
dirindexes.append(dirnames_map[dirname])
if os.path.islink(target_path):
filemd5s.append('')
filelinktos.append(os.readlink(target_path))
filemodes.append(-24065)
else:
m = hashlib.md5()
with open(target_path, 'rb') as f:
m.update(f.read())
m_value = m.hexdigest().encode(sys.getdefaultencoding())
m_sum.update(m_value)
filemd5s.append(m_value)
filelinktos.append('')
filemodes.append(os.stat(target_path).st_mode)
if basename:
basenames.append(basename)
dirindexes.append(dirnames_map[dirname])
if os.path.islink(target_path):
filemd5s.append('')
filelinktos.append(os.readlink(target_path))
filemodes.append(-24065)
else:
m = hashlib.md5()
with open(target_path, 'rb') as f:
m.update(f.read())
m_value = m.hexdigest().encode(sys.getdefaultencoding())
m_sum.update(m_value)
filemd5s.append(m_value)
filelinktos.append('')
filemodes.append(os.stat(target_path).st_mode)
self.headers = {
'dirnames': dirnames,
'filemd5s': filemd5s,
......@@ -152,7 +176,7 @@ class Repository(PackageInfo):
def bin_list(self, plugin):
files = []
if self.version and self.hash:
for file_item in plugin.file_list():
for file_item in plugin.file_list(self):
if file_item.type == InstallPlugin.FileItemType.BIN:
files.append(os.path.join(self.repository_dir, file_item.target_path))
return files
......@@ -160,7 +184,7 @@ class Repository(PackageInfo):
def file_list(self, plugin):
files = []
if self.version and self.hash:
for file_item in plugin.file_list():
for file_item in plugin.file_list(self):
path = os.path.join(self.repository_dir, file_item.target_path)
if file_item.type == InstallPlugin.FileItemType.DIR:
files += DirectoryUtil.list_dir(path)
......@@ -189,6 +213,7 @@ class Repository(PackageInfo):
self.set_release(data.get('release'))
self.md5 = data.get('hash')
self.arch = data.get('arch')
self.install_time = data.get('install_time', 0)
except:
pass
......@@ -205,6 +230,8 @@ class Repository(PackageInfo):
def _dump(self):
data = {'version': self.version, 'hash': self.hash, 'release': self.release, 'arch': self.arch}
if self.install_time:
data['install_time'] = self.install_time
try:
with open(self.data_file_path, 'w') as f:
YamlLoader().dump(data, f)
......@@ -223,7 +250,7 @@ class Repository(PackageInfo):
self.clear()
try:
with pkg.open() as rpm:
file_map = plugin.file_map
file_map = plugin.file_map(pkg)
need_dirs = {}
need_files = {}
for src_path in file_map:
......@@ -268,7 +295,7 @@ class Repository(PackageInfo):
if filemd5s[idx]:
fd = rpm.extractfile(src_path)
self.stdio and getattr(self.stdio, 'verbose', print)('extract %s to %s' % (src_path, target_path))
with FileUtil.open(target_path, 'wb', self.stdio) as f:
with FileUtil.open(target_path, 'wb', stdio=self.stdio) as f:
FileUtil.copy_fileobj(fd, f)
mode = filemodes[idx] & 0x1ff
if mode != 0o744:
......@@ -283,12 +310,15 @@ class Repository(PackageInfo):
os.symlink(links[link], link)
for n_dir in need_dirs:
path = os.path.join(self.repository_dir, need_dirs[n_dir])
if not os.path.exists(path) and n_dir[:-1] in dirnames:
DirectoryUtil.mkdir(path)
if not os.path.isdir(path):
raise Exception('%s: No such dir: %s' % (pkg.path, n_dir))
raise Exception('%s in %s is not dir.' % (pkg.path, n_dir))
self.set_version(pkg.version)
self.set_release(pkg.release)
self.md5 = pkg.md5
self.arch = pkg.arch
self.install_time = time.time()
if self._dump():
return True
else:
......@@ -365,11 +395,12 @@ class ComponentRepository(object):
def get_repository_by_tag(self, tag, version=None):
path_partten = os.path.join(self.repository_dir, version if version else '*', tag)
repository = None
for path in glob(path_partten):
repository = Repository(self.name, path, self.stdio)
if repository.hash:
return repository
return None
n_repository = Repository(self.name, path, self.stdio)
if n_repository.hash and n_repository > repository:
repository = n_repository
return repository
def get_repository(self, version=None, tag=None):
if tag:
......@@ -391,7 +422,9 @@ class ComponentRepository(object):
repositories = []
path_partten = os.path.join(self.repository_dir, version, '*')
for path in glob(path_partten):
repositories.append(Repository(self.name, path, self.stdio))
repository = Repository(self.name, path, self.stdio)
if repository.hash:
repositories.append(repository)
return repositories
......@@ -400,10 +433,19 @@ class RepositoryManager(Manager):
RELATIVE_PATH = 'repository'
# repository目录结构为 ./repository/{component_name}/{version}/{tag or hash}
def __init__(self, home_path, stdio=None):
def __init__(self, home_path, lock_manager=None, stdio=None):
super(RepositoryManager, self).__init__(home_path, stdio=stdio)
self.repositories = {}
self.component_repositoies = {}
self.lock_manager = lock_manager
def _lock(self, read_only=False):
if self.lock_manager:
if read_only:
return self.lock_manager.mirror_and_repo_sh_lock()
else:
return self.lock_manager.mirror_and_repo_ex_lock()
return True
def _get_repository_vo(self, repository):
return RepositoryVO(
......@@ -416,6 +458,13 @@ class RepositoryManager(Manager):
[]
)
def get_repositories(self, name, version=None, instance=True):
repositories = []
for repository in self.get_component_repositoy(name).get_repositories(version):
if instance and repository.is_shadow_repository() is False:
repositories.append(repository)
return repositories
def get_repositories_view(self, name=None):
if name:
repositories = self.get_component_repositoy(name).get_repositories()
......@@ -440,6 +489,7 @@ class RepositoryManager(Manager):
def get_component_repositoy(self, name):
if name not in self.component_repositoies:
self._lock(True)
path = os.path.join(self.path, name)
self.component_repositoies[name] = ComponentRepository(name, path, self.stdio)
return self.component_repositoies[name]
......@@ -471,6 +521,7 @@ class RepositoryManager(Manager):
def create_instance_repository(self, name, version, _hash):
path = os.path.join(self.path, name, version, _hash)
if path not in self.repositories:
self._lock()
self._mkdir(path)
repository = Repository(name, path, self.stdio)
self.repositories[path] = repository
......@@ -480,6 +531,7 @@ class RepositoryManager(Manager):
path = os.path.join(self.path, name, version, tag if tag else name)
if os.path.exists(path):
if path not in self.repositories:
self._lock(True)
self.repositories[path] = Repository(name, path, self.stdio)
return self.repositories[path]
repository = Repository(name, path, self.stdio)
......@@ -489,6 +541,7 @@ class RepositoryManager(Manager):
def create_tag_for_repository(self, repository, tag, force=False):
if repository.is_shadow_repository():
return False
self._lock()
path = os.path.join(self.path, repository.name, repository.version, tag)
if os.path.exists(path):
if not os.path.islink(path):
......@@ -509,6 +562,7 @@ class RepositoryManager(Manager):
def get_instance_repository_from_shadow(self, repository):
if not isinstance(repository, Repository) or not repository.is_shadow_repository():
return repository
self._lock(True)
try:
path = os.readlink(repository.repository_dir)
if path not in self.repositories:
......
......@@ -100,6 +100,9 @@ class PackageInfo(object):
def __cmp_value__(self):
return [self.version, self.release]
def __hash__(self):
return hash(self.md5)
def __eq__(self, value):
if value is None:
return False
......
......@@ -211,10 +211,19 @@ class IO(object):
ERROR_PREV = FormtatText.error('[ERROR]')
IS_TTY = sys.stdin.isatty()
def __init__(self, level, msg_lv=MsgLevel.DEBUG, trace_logger=None, track_limit=0, root_io=None, stream=sys.stdout):
def __init__(self,
level,
msg_lv=MsgLevel.DEBUG,
trace_logger=None,
use_cache=False,
track_limit=0,
root_io=None,
stream=sys.stdout
):
self.level = level
self.msg_lv = msg_lv
self.trace_logger = trace_logger
self._log_cache = [] if use_cache else None
self._root_io = root_io
self.track_limit = track_limit
self._verbose_prefix = '-' * self.level
......@@ -224,6 +233,12 @@ class IO(object):
self._cur_out_obj = self._out_obj
self._before_critical = None
@property
def log_cache(self):
if self._root_io:
self._root_io.log_cache
return self._log_cache
def before_close(self):
if self._before_critical:
try:
......@@ -231,8 +246,35 @@ class IO(object):
except:
pass
def __del__(self):
def _close(self):
self.before_close()
self._flush_log()
def __del__(self):
self._close()
def exit(self, code):
self._close()
sys.exit(code)
def set_cache(self, status):
if status:
self._cache_on()
def _cache_on(self):
if self._root_io:
return False
if self.log_cache is None:
self._log_cache = []
return True
def _cache_off(self):
if self._root_io:
return False
if self.log_cache is not None:
self._flush_log()
self._log_cache = None
return True
def get_cur_out_obj(self):
if self._root_io:
......@@ -303,7 +345,7 @@ class IO(object):
return False
self.sync_obj = self._start_sync_obj(IOHalo, lambda x: x.stop_loading('fail'), *arg, **kwargs)
if self.sync_obj:
self._log(MsgLevel.INFO, text)
self.log(MsgLevel.INFO, text)
return self.sync_obj.start(text)
def stop_loading(self, stop_type, *arg, **kwargs):
......@@ -319,7 +361,7 @@ class IO(object):
return False
self.sync_obj = self._start_sync_obj(IOProgressBar, lambda x: x.finish_progressbar(), text=text, maxval=maxval)
if self.sync_obj:
self._log(MsgLevel.INFO, text)
self.log(MsgLevel.INFO, text)
return self.sync_obj.start()
def update_progressbar(self, value):
......@@ -389,10 +431,31 @@ class IO(object):
kwargs['file'] = self.get_cur_out_obj()
kwargs['file'] and print(self._format(msg, *args), **kwargs)
del kwargs['file']
self._log(msg_lv, msg, *args, **kwargs)
self.log(msg_lv, msg, *args, **kwargs)
def log(self, levelno, msg, *args, **kwargs):
self._cache_log(levelno, msg, *args, **kwargs)
def _cache_log(self, levelno, msg, *args, **kwargs):
if self.trace_logger:
log_cache = self.log_cache
lines = str(msg).split('\n')
for line in lines:
if log_cache is None:
self._log(levelno, line, *args, **kwargs)
else:
log_cache.append((levelno, line, args, kwargs))
def _flush_log(self):
if not self._root_io and self.trace_logger and self._log_cache:
for levelno, line, args, kwargs in self._log_cache:
self.trace_logger.log(levelno, line, *args, **kwargs)
self._log_cache = []
def _log(self, levelno, msg, *args, **kwargs):
self.trace_logger and self.trace_logger.log(levelno, msg, *args, **kwargs)
if self.trace_logger:
self.trace_logger.log(levelno, msg, *args, **kwargs)
def print(self, msg, *args, **kwargs):
self._print(MsgLevel.INFO, msg, *args, **kwargs)
......@@ -405,17 +468,13 @@ class IO(object):
def critical(self, msg, *args, **kwargs):
if self._root_io:
return self.critical(msg, *args, **kwargs)
return self._root_io.critical(msg, *args, **kwargs)
self._print(MsgLevel.CRITICAL, '%s %s' % (self.ERROR_PREV, msg), *args, **kwargs)
self.exit(kwargs['code'] if 'code' in kwargs else 255)
def exit(self, code):
self.before_close()
sys.exit(code)
def verbose(self, msg, *args, **kwargs):
if self.level > self.VERBOSE_LEVEL:
self._log(MsgLevel.VERBOSE, '%s %s' % (self._verbose_prefix, msg), *args, **kwargs)
self.log(MsgLevel.VERBOSE, '%s %s' % (self._verbose_prefix, msg), *args, **kwargs)
return
self._print(MsgLevel.VERBOSE, '%s %s' % (self._verbose_prefix, msg), *args, **kwargs)
......@@ -448,7 +507,7 @@ class IO(object):
if self.level <= self.VERBOSE_LEVEL:
print_stack = lambda m: self._print(MsgLevel.ERROR, m)
else:
print_stack = lambda m: self._log(MsgLevel.ERROR, m)
print_stack = lambda m: self.log(MsgLevel.ERROR, m)
msg and self.error(msg)
print_stack('\n'.join(exception_msg))
else:
......@@ -465,7 +524,7 @@ class IO(object):
if self.level <= self.VERBOSE_LEVEL:
print_stack = lambda m: self._print(MsgLevel.ERROR, m)
else:
print_stack = lambda m: self._log(MsgLevel.ERROR, m)
print_stack = lambda m: self.log(MsgLevel.ERROR, m)
msg and self.error(msg)
print_stack(''.join(lines))
此差异已折叠。
......@@ -21,6 +21,10 @@ oceanbase-ce:
# data_dir: /data
# The directory for clog, ilog, and slog. The default value is the same as the data_dir value.
# redo_dir: /redo
# Please set devname as the network adaptor's name whose ip is in the setting of severs.
# if set severs as "127.0.0.1", please set devname as "lo"
# if current ip is 192.168.1.10, and the ip's network adaptor's name is "eth0", please use "eth0"
# devname: eth0
# External port for OceanBase Database. The default value is 2881. DO NOT change this value after the cluster is started.
# mysql_port: 2881
# Internal port for OceanBase Database. The default value is 2882. DO NOT change this value after the cluster is started.
......
......@@ -21,6 +21,10 @@ oceanbase-ce:
# data_dir: /data
# The directory for clog, ilog, and slog. The default value is the same as the data_dir value.
# redo_dir: /redo
# Please set devname as the network adaptor's name whose ip is in the setting of severs.
# if set severs as "127.0.0.1", please set devname as "lo"
# if current ip is 192.168.1.10, and the ip's network adaptor's name is "eth0", please use "eth0"
# devname: eth0
# External port for OceanBase Database. The default value is 2881. DO NOT change this value after the cluster is started.
# mysql_port: 2881
# Internal port for OceanBase Database. The default value is 2882. DO NOT change this value after the cluster is started.
......
......@@ -21,6 +21,10 @@ oceanbase-ce:
# data_dir: /data
# The directory for clog, ilog, and slog. The default value is the same as the data_dir value.
# redo_dir: /redo
# Please set devname as the network adaptor's name whose ip is in the setting of severs.
# if set severs as "127.0.0.1", please set devname as "lo"
# if current ip is 192.168.1.10, and the ip's network adaptor's name is "eth0", please use "eth0"
# devname: eth0
# External port for OceanBase Database. The default value is 2881.DO NOT change this value after the cluster is started.
# mysql_port: 2881
# Internal port for OceanBase Database. The default value is 2882. DO NOT change this value after the cluster is started.
......
......@@ -16,6 +16,10 @@ oceanbase-ce:
# data_dir: /data
# The directory for clog, ilog, and slog. The default value is the same as the data_dir value.
# redo_dir: /redo
# Please set devname as the network adaptor's name whose ip is in the setting of severs.
# if set severs as "127.0.0.1", please set devname as "lo"
# if current ip is 192.168.1.10, and the ip's network adaptor's name is "eth0", please use "eth0"
# devname: eth0
# External port for OceanBase Database. The default value is 2881. DO NOT change this value after the cluster is started.
# mysql_port: 2881
# Internal port for OceanBase Database. The default value is 2882. DO NOT change this value after the cluster is started.
......
......@@ -16,6 +16,10 @@ oceanbase-ce:
# data_dir: /data
# The directory for clog, ilog, and slog. The default value is the same as the data_dir value.
# redo_dir: /redo
# Please set devname as the network adaptor's name whose ip is in the setting of severs.
# if set severs as "127.0.0.1", please set devname as "lo"
# if current ip is 192.168.1.10, and the ip's network adaptor's name is "eth0", please use "eth0"
# devname: eth0
# External port for OceanBase Database. The default value is 2881. DO NOT change this value after the cluster is started.
# mysql_port: 2881
# Internal port for OceanBase Database. The default value is 2882. DO NOT change this value after the cluster is started.
......
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
import os
import sys
import time
import logging
import getopt
import json
import string
import random
import datetime
import decimal
import ConfigParser
import socket
import platform
if __name__ == '__main__':
defaultencoding = 'utf-8'
if sys.getdefaultencoding() != defaultencoding:
try:
from imp import reload
except:
pass
reload(sys)
sys.setdefaultencoding(defaultencoding)
OBD_INSTALL_PRE = os.environ.get('OBD_INSTALL_PRE', '/')
sys.path.append(os.path.join(OBD_INSTALL_PRE, 'usr/obd/lib/executer/executer27/site-packages'))
execfile(sys.argv[1])
......@@ -7,11 +7,6 @@ create database if not exists test;
use test;
grant all on *.* to 'admin' WITH GRANT OPTION;
alter system set merger_warm_up_duration_time = '0s';
alter system set zone_merge_concurrency = 2;
alter system set merger_check_interval = '10s';
alter system set enable_syslog_wf=false;
alter system set _enable_split_partition = true;
#FIXME: schema拆分模式建租户耗时增加,这里先加大语句超时时间先绕过
set @@session.ob_query_timeout = 40000000;
create resource unit box1 max_cpu 2, max_memory 4073741824, max_iops 128, max_disk_size '5G', max_session_num 64, MIN_CPU=1, MIN_MEMORY=4073741824, MIN_IOPS=128;
......
......@@ -7,11 +7,6 @@ create database if not exists test;
use test;
grant all on *.* to 'admin' WITH GRANT OPTION;
alter system set merger_warm_up_duration_time = '0s';
alter system set zone_merge_concurrency = 2;
alter system set merger_check_interval = '10s';
alter system set enable_syslog_wf=false;
alter system set _enable_split_partition = true;
#FIXME: schema拆分模式建租户耗时增加,这里先加大语句超时时间先绕过
set @@session.ob_query_timeout = 40000000;
create resource unit box1 max_cpu 2, max_memory 805306368, max_iops 128, max_disk_size '5G', max_session_num 64, MIN_CPU=1, MIN_MEMORY=805306368, MIN_IOPS=128;
......
create resource unit box1 max_cpu 2, max_memory 1073741824, max_iops 128, max_disk_size '5G', max_session_num 64, MIN_CPU=1, MIN_MEMORY=1073741824, MIN_IOPS=128;
create resource pool pool1 unit = 'box1', unit_num = 1;
create tenant ora_tt replica_num = 1, resource_pool_list=('pool1') set ob_tcp_invited_nodes='%', ob_compatibility_mode='oracle';
alter tenant ora_tt set variables autocommit='on';
alter tenant ora_tt set variables nls_date_format='YYYY-MM-DD HH24:MI:SS';
alter tenant ora_tt set variables nls_timestamp_format='YYYY-MM-DD HH24:MI:SS.FF';
alter tenant ora_tt set variables nls_timestamp_tz_format='YYYY-MM-DD HH24:MI:SS.FF TZR TZD';
\ No newline at end of file
system sleep 5;
alter system set balancer_idle_time = '60s';
create user 'admin' IDENTIFIED BY 'admin';
use oceanbase;
create database if not exists test;
use test;
grant all on *.* to 'admin' WITH GRANT OPTION;
set global ob_enable_jit='OFF';
alter system set large_query_threshold='1s';
alter system set syslog_level='info';
alter system set syslog_io_bandwidth_limit='30M';
alter system set trx_try_wait_lock_timeout='0';
alter system set zone_merge_concurrency=0;
alter system set merger_completion_percentage=100;
alter system set trace_log_slow_query_watermark='500ms';
alter system set minor_freeze_times=30;
alter system set clog_sync_time_warn_threshold = '1000ms';
alter system set trace_log_slow_query_watermark = '10s';
alter system set enable_sql_operator_dump = 'false';
alter system set rpc_timeout=1000000000;
create resource unit tpch_box1 min_memory '100g', max_memory '100g', max_disk_size '1000g', max_session_num 64, min_cpu=9, max_cpu=9, max_iops 128, min_iops=128;
create resource pool tpch_pool1 unit = 'tpch_box1', unit_num = 1, zone_list = ('z1', 'z2', 'z3');
create tenant oracle replica_num = 3, resource_pool_list=('tpch_pool1') set ob_tcp_invited_nodes='%', ob_compatibility_mode='oracle';
alter tenant oracle set variables autocommit='on';
alter tenant oracle set variables nls_date_format='yyyy-mm-dd hh24:mi:ss';
alter tenant oracle set variables nls_timestamp_format='yyyy-mm-dd hh24:mi:ss.ff';
alter tenant oracle set variables nls_timestamp_tz_format='yyyy-mm-dd hh24:mi:ss.ff tzr tzd';
alter tenant oracle set variables ob_query_timeout=7200000000;
alter tenant oracle set variables ob_trx_timeout=7200000000;
alter tenant oracle set variables max_allowed_packet=67108864;
alter tenant oracle set variables ob_enable_jit='OFF';
alter tenant oracle set variables ob_sql_work_area_percentage=80;
alter tenant oracle set variables parallel_max_servers=512;
alter tenant oracle set variables parallel_servers_target=512;
select count(*) from oceanbase.__all_server group by zone limit 1 into @num;
set @sql_text = concat('alter resource pool tpch_pool1', ' unit_num = ', @num);
prepare stmt from @sql_text;
execute stmt;
deallocate prepare stmt;
......@@ -21,7 +21,7 @@
from __future__ import absolute_import, division, print_function
def connect(plugin_context, target_server=None, sys_root=True, *args, **kwargs):
def connect(plugin_context, target_server=None, *args, **kwargs):
stdio = plugin_context.stdio
cluster_config = plugin_context.cluster_config
servers = cluster_config.servers
......
......@@ -20,12 +20,15 @@
from __future__ import absolute_import, division, print_function
global_ret = True
def destroy(plugin_context, *args, **kwargs):
def clean(server, path):
client = clients[server]
ret = client.execute_command('rm -fr %s/*' % (path))
if not ret:
global global_ret
global_ret = False
stdio.warn('fail to clean %s:%s' % (server, path))
else:
......@@ -33,7 +36,6 @@ def destroy(plugin_context, *args, **kwargs):
cluster_config = plugin_context.cluster_config
clients = plugin_context.clients
stdio = plugin_context.stdio
global_ret = True
stdio.start_loading('obagent work dir cleaning')
for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server)
......
......@@ -55,8 +55,8 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if not (client.execute_command("bash -c 'mkdir -p %s/{run,bin,lib,conf,log}'" % (home_path)) \
and client.execute_command("cp -r %s/conf %s/" % (remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/bin ]; then ln -s %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -s %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))):
and client.execute_command("if [ -d %s/bin ]; then ln -fs %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -fs %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))):
global_ret = False
stdio.error('fail to init %s home path', server)
......
......@@ -60,8 +60,10 @@ def reload(plugin_context, repository_dir, new_cluster_config, *args, **kwargs):
with open(path) as f:
data = yaml.load(f)['configs']
for config in data:
key = list(config['value'].keys())[0]
config_kv[key] = config['key']
key = config.get('value')
if key and isinstance(key, dict):
key = list(key.keys())[0]
config_kv[key] = key
global_ret = True
for server in servers:
......
......@@ -155,6 +155,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
"zone_name": "zone",
}
stdio.start_loading('Start obproxy')
for server in cluster_config.servers:
client = clients[server]
server_config = cluster_config.get_server_conf(server)
......@@ -195,80 +196,102 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if remote_pid and client.execute_command('ls /proc/%s' % remote_pid):
continue
for comp in ['oceanbase', 'oceanbase-ce']:
obs_config = cluster_config.get_depled_config(comp, server)
if obs_config is not None:
break
if obs_config is None:
obs_config = {}
for key in config_map:
k = config_map[key]
if not server_config.get(key):
server_config[key] = obs_config.get(k, default_server_config.get(key))
for key in default_server_config:
if not server_config.get(key):
server_config[key] = default_server_config.get(key)
server_config['host_ip'] = server.ip
for key in server_config:
if server_config[key] is None:
server_config[key] = ''
if isinstance(server_config[key], bool):
server_config[key] = str(server_config[key]).lower()
if server_config.get('crypto_method', 'plain').lower() == 'aes':
secret_key = generate_aes_b64_key()
crypto_path = server_config.get('crypto_path', 'conf/.config_secret.key')
crypto_path = os.path.join(home_path, crypto_path)
client.execute_command('echo "%s" > %s' % (secret_key.decode('utf-8') if isinstance(secret_key, bytes) else secret_key, crypto_path))
for key in need_encrypted:
value = server_config.get(key)
if value:
server_config[key] = encrypt(secret_key, value)
for path in config_files:
with tempfile.NamedTemporaryFile(suffix=".yaml", mode='w') as tf:
text = config_files[path].format(**server_config)
text = text.replace('\[[', '{').replace('\]]', '}')
tf.write(text)
tf.flush()
if not client.put_file(tf.name, path.replace(repository_dir, home_path)):
if getattr(options, 'without_parameter', False) and client.execute_command('ls %s/conf/monagent.yaml' % home_path):
use_parameter = False
else:
use_parameter = True
if use_parameter:
for comp in ['oceanbase', 'oceanbase-ce']:
obs_config = cluster_config.get_depled_config(comp, server)
if obs_config is not None:
break
if obs_config is None:
obs_config = {}
for key in config_map:
k = config_map[key]
if not server_config.get(key):
server_config[key] = obs_config.get(k, default_server_config.get(key))
for key in default_server_config:
if not server_config.get(key):
server_config[key] = default_server_config.get(key)
server_config['host_ip'] = server.ip
for key in server_config:
if server_config[key] is None:
server_config[key] = ''
if isinstance(server_config[key], bool):
server_config[key] = str(server_config[key]).lower()
if server_config.get('crypto_method', 'plain').lower() == 'aes':
secret_key = generate_aes_b64_key()
crypto_path = server_config.get('crypto_path', 'conf/.config_secret.key')
crypto_path = os.path.join(home_path, crypto_path)
client.execute_command('echo "%s" > %s' % (secret_key.decode('utf-8') if isinstance(secret_key, bytes) else secret_key, crypto_path))
for key in need_encrypted:
value = server_config.get(key)
if value:
server_config[key] = encrypt(secret_key, value)
for path in config_files:
stdio.verbose('format %s' % path)
with tempfile.NamedTemporaryFile(suffix=".yaml", mode='w') as tf:
text = config_files[path].format(**server_config)
text = text.replace('\[[', '{').replace('\]]', '}')
tf.write(text)
tf.flush()
if not client.put_file(tf.name, path.replace(repository_dir, home_path)):
stdio.error('Fail to send config file to %s' % server)
stdio.stop_loading('fail')
return
for path in glob(os.path.join(repository_dir, 'conf/*/*')):
if path.endswith('.yaml'):
continue
if os.path.isdir(path):
ret = client.put_dir(path, path.replace(repository_dir, home_path))
else:
ret = client.put_file(path, path.replace(repository_dir, home_path))
if not ret:
stdio.error('Fail to send config file to %s' % server)
stdio.stop_loading('fail')
return
config = {
'log': {
'level': server_config.get('log_level', 'info'),
'filename': server_config.get('log_path', 'log/monagent.log'),
'maxsize': int(server_config.get('log_size', 30)),
'maxage': int(server_config.get('log_expire_day', 7)),
'maxbackups': int(server_config.get('maxbackups', 10)),
'localtime': True if server_config.get('log_use_localtime', True) else False,
'compress': True if server_config.get('log_compress', True) else False
},
'server': {
'address': '0.0.0.0:%d' % int(server_config.get('server_port', 8088)),
'adminAddress': '0.0.0.0:%d' % int(server_config.get('pprof_port', 8089)),
'runDir': 'run'
},
'cryptoMethod': server_config['crypto_method'] if server_config.get('crypto_method').lower() in ['aes', 'plain'] else 'plain',
'cryptoPath': server_config.get('crypto_path'),
'modulePath': 'conf/module_config',
'propertiesPath': 'conf/config_properties'
}
with tempfile.NamedTemporaryFile(suffix=".yaml") as tf:
yaml.dump(config, tf)
if not client.put_file(tf.name, os.path.join(home_path, 'conf/monagent.yaml')):
stdio.error('Fail to send config file to %s' % server)
stdio.stop_loading('fail')
return
config = {
'log': {
'level': server_config.get('log_level', 'info'),
'filename': server_config.get('log_path', 'log/monagent.log'),
'maxsize': int(server_config.get('log_size', 30)),
'maxage': int(server_config.get('log_expire_day', 7)),
'maxbackups': int(server_config.get('maxbackups', 10)),
'localtime': True if server_config.get('log_use_localtime', True) else False,
'compress': True if server_config.get('log_compress', True) else False
},
'server': {
'address': '0.0.0.0:%d' % int(server_config.get('server_port', 8088)),
'adminAddress': '0.0.0.0:%d' % int(server_config.get('pprof_port', 8089)),
'runDir': 'run'
},
'cryptoMethod': server_config['crypto_method'] if server_config.get('crypto_method').lower() in ['aes', 'plain'] else 'plain',
'cryptoPath': server_config.get('crypto_path'),
'modulePath': 'conf/module_config',
'propertiesPath': 'conf/config_properties'
}
with tempfile.NamedTemporaryFile(suffix=".yaml") as tf:
yaml.dump(config, tf)
if not client.put_file(tf.name, os.path.join(home_path, 'conf/monagent.yaml')):
stdio.error('Fail to send config file to %s' % server)
return
log_path = '%s/log/monagent_stdout.log' % home_path
client.execute_command('cd %s;nohup %s/bin/monagent -c conf/monagent.yaml >> %s 2>&1 & echo $! > %s' % (home_path, home_path, log_path, remote_pid_path))
stdio.stop_loading('succeed')
stdio.start_loading('obagent program health check')
time.sleep(1)
failed = []
......
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
def upgrade(plugin_context, search_py_script_plugin, apply_param_plugin, *args, **kwargs):
components = plugin_context.components
clients = plugin_context.clients
cluster_config = plugin_context.cluster_config
cmd = plugin_context.cmd
options = plugin_context.options
stdio = plugin_context.stdio
upgrade_ctx = kwargs.get('upgrade_ctx')
local_home_path = kwargs.get('local_home_path')
upgrade_repositories = kwargs.get('upgrade_repositories')
cur_repository = upgrade_repositories[0]
dest_repository = upgrade_repositories[-1]
repository_dir = dest_repository.repository_dir
kwargs['repository_dir'] = repository_dir
for server in cluster_config.servers:
client = clients[server]
server_config = cluster_config.get_server_conf(server)
home_path = server_config['home_path']
remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
client.execute_command("bash -c 'mkdir -p %s/{bin,lib}'" % (home_path))
client.execute_command("ln -fs %s/bin/* %s/bin" % (remote_repository_dir, home_path))
client.execute_command("ln -fs %s/lib/* %s/lib" % (remote_repository_dir, home_path))
stop_plugin = search_py_script_plugin([cur_repository], 'stop')[cur_repository]
start_plugin = search_py_script_plugin([dest_repository], 'start')[dest_repository]
connect_plugin = search_py_script_plugin([dest_repository], 'connect')[dest_repository]
display_plugin = search_py_script_plugin([dest_repository], 'display')[dest_repository]
apply_param_plugin(cur_repository)
if not stop_plugin(components, clients, cluster_config, cmd, options, stdio, *args, **kwargs):
return
apply_param_plugin(dest_repository)
if not start_plugin(components, clients, cluster_config, cmd, options, stdio, *args, **kwargs):
return
ret = connect_plugin(components, clients, cluster_config, cmd, options, stdio, *args, **kwargs)
if ret and display_plugin(components, clients, cluster_config, cmd, options, stdio, ret.get_return('cursor'), *args, **kwargs):
upgrade_ctx['index'] = len(upgrade_repositories)
return plugin_context.return_true()
- name: home_path
require: true
type: STRING
need_restart: true
description_en: working directory for obagent
description_local: Obagent工作目录
- name: server_port
require: true
type: INT
default: 8088
min_value: 1025
max_value: 65535
need_restart: true
description_en: port number for pulling metrics and management
description_local: 提供拉取 metrics 和管理的端口
- name: pprof_port
require: true
type: INT
default: 8089
min_value: 1025
max_value: 65535
need_restart: true
description_en: port number for pprof
description_local: pprof 调试端口
- name: log_level
require: true
type: STRING
default: INFO
min_value: NULL
max_value: NULL
need_restart: true
description_en: log level
description_local: 日志等级
- name: log_path
require: true
type: STRING
default: log/monagent.log
min_value: NULL
max_value: NULL
need_restart: true
description_en: log path
description_local: 日志路径
- name: crypto_method
require: true
type: STRING
default: plain
min_value: NULL
max_value: NULL
need_restart: true
description_en: crypto method {aes/plain}
description_local: 加密方式,仅支持 aes 和 plain
- name: crypto_path
require: true
type: STRING
default: conf/.config_secret.key
min_value: NULL
max_value: NULL
need_restart: true
description_en: file path for crypto key
description_local: 秘钥存放路径
- name: log_size
require: true
type: INT
default: 30
min_value: 1
max_value: 256
need_restart: true
description_en: size for a log file, measured in megabytes
description_local: 单个日志文件大小,单位为 M
- name: log_expire_day
require: true
type: INT
default: 7
min_value: 1
max_value: 30
need_restart: true
description_en: log file expiration time, measured in days
description_local: 日志保留天数
- name: log_file_count
require: true
type: INT
default: 10
min_value: 1
max_value: NULL
need_restart: true
description_en: the maximum number for log files. The default value is 10.
description_local: 最大保留日志数
- name: log_use_localtime
require: true
type: BOOL
default: true
min_value: NULL
max_value: NULL
need_restart: true
description_en: whether to use local time for log files
description_local: 日志文件是否使用本地时间
- name: log_compress
require: true
type: BOOL
default: true
min_value: NULL
max_value: NULL
need_restart: true
description_en: whether to enable log compression
description_local: 是否开启日志压缩
- name: http_basic_auth_user
require: true
type: STRING
default: admin
min_value: NULL
max_value: NULL
need_restart: false
description_en: username for HTTP authentication
description_local: HTTP 服务认证用户名
- name: http_basic_auth_password
require: false
type: STRING
default: root
min_value: NULL
max_value: NULL
need_restart: false
description_en: password for HTTP authentication
description_local: HTTP 服务认证密码
- name: pprof_basic_auth_user
require: true
type: STRING
default: admin
min_value: NULL
max_value: NULL
need_restart: false
description_en: username for debug service
description_local: debug 接口认证用户名
- name: pprof_basic_auth_password
require: false
type: STRING
default: root
min_value: NULL
max_value: NULL
need_restart: false
description_en: password for debug service
description_local: debug 接口认证密码
- name: monitor_user
require: true
type: STRING
default: root
min_value: NULL
max_value: NULL
need_restart: false
description_en: monitor username for OceanBase Database. The user must have read access to OceanBase Database as a system tenant.
description_local: OceanBase 数据库监控数据采集用户名, 需要该用户具有sys租户下 oceanbase 库的读权限
- name: monitor_password
require: false
type: STRING
default: NULL
min_value: NULL
max_value: NULL
need_restart: false
description_en: monitor password for OceanBase Database
description_local: OceanBase 数据库监控数据采集用户密码
- name: sql_port
require: false
type: INT
default: 2881
min_value: 1025
max_value: 65535
need_restart: false
description_en: SQL port for observer
description_local: observer的 SQL 端口
- name: rpc_port
require: false
type: INT
default: 2882
min_value: 1025
max_value: 65535
need_restart: false
description_en: the RPC port for observer
description_local: observer 的 RPC 端口
- name: cluster_name
require: false
type: STRING
default: obcluster
min_value: NULL
max_value: NULL
need_restart: false
description_en: cluster name for OceanBase Database
description_local: OceanBase Database 集群名
- name: cluster_id
require: false
type: INT
default: 1
min_value: 1
max_value: 4294901759
need_restart: false
description_en: cluster ID for OceanBase Database
description_local: OceanBase 集群 ID
- name: zone_name
require: false
type: STRING
default: zone1
min_value: NULL
max_value: NULL
need_restart: false
description_en: zone name for your observer
description_local: observer 所在的 zone 名字
- name: ob_monitor_status
require: true
type: STRING
default: active
min_value: NULL
max_value: NULL
need_restart: false
description_en: monitor status for OceanBase Database. Active is to enable. Inactive is to disable.
description_local: OceanBase 监控指标采集状态,active 表示开启,inactive 表示关闭
- name: host_monitor_status
require: true
type: STRING
default: active
min_value: NULL
max_value: NULL
need_restart: false
description_en: monitor status for your host. Active is to enable. Inactive is to disable.
description_local: 主机监控指标采集状态, active 表示开启, inactive 表示关闭
- name: disable_http_basic_auth
require: true
type: BOOL
default: false
min_value: NULL
max_value: NULL
need_restart: false
description_en: whether to disable the basic authentication for HTTP service. True is to disable. False is to enable.
description_local: 是否禁用 HTTP 服务的basic auth 认证,true 表示禁用,false 表示不禁用
- name: disable_pprof_basic_auth
require: true
type: BOOL
default: false
min_value: NULL
max_value: NULL
need_restart: false
description_en: whether to disable the basic authentication for the debug interface. True is to disable. False is to enable.
description_local: 是否禁用 debug 接口的basic auth 认证,true 表示禁用,false 表示不禁用
- name: ob_log_monitor_status
require: true
type: STRING
default: inactive
min_value: NULL
max_value: NULL
need_restart: false
description_en: status for OceanBase Database log alarm. Active is to enable. Inactive is to disable.
description_local: OceanBase 日志报警状态,active 表示开启,inactive 表示关闭
- name: alertmanager_address
require: false
type: STRING
default: ''
min_value: NULL
max_value: NULL
need_restart: false
description_en: Alertmanager address, needed when log alarm is enabled
description_local: Alertmanager 地址, 当日志报警开启时需要
- name: ob_install_path
require: false
type: STRING
default: ''
min_value: NULL
max_value: NULL
need_restart: false
description_en: Working directory for OceanBase Database, needed when log alarm is enabled.
description_local: OceanBase 安装路径, 当日志报警开启时需要
\ No newline at end of file
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import os
import re
import sys
import time
import random
import base64
import tempfile
from glob import glob
from copy import deepcopy
from Crypto import Random
from Crypto.Cipher import AES
from tool import YamlLoader
stdio = None
if sys.version_info.major == 2:
def generate_key(key):
genKey = [chr(0)] * 16
for i in range(min(16, len(key))):
genKey[i] = key[i]
i = 16
while i < len(key):
j = 0
while j < 16 and i < len(key):
genKey[j] = chr(ord(genKey[j]) ^ ord(key[i]))
j, i = j+1, i+1
return "".join(genKey)
class AESCipher:
bs = AES.block_size
def __init__(self, key):
self.key = generate_key(key)
def encrypt(self, message):
message = self._pad(message)
iv = Random.new().read(AES.block_size)
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return base64.b64encode(iv + cipher.encrypt(message)).decode('utf-8')
def _pad(self, s):
return s + (self.bs - len(s) % self.bs) * chr(self.bs - len(s) % self.bs)
else:
def generate_key(key):
genKey = [0] * 16
for i in range(min(16, len(key))):
genKey[i] = key[i]
i = 16
while i < len(key):
j = 0
while j < 16 and i < len(key):
genKey[j] = genKey[j] ^ key[i]
j, i = j+1, i+1
genKey = [chr(k) for k in genKey]
return bytes("".join(genKey), encoding="utf-8")
class AESCipher:
bs = AES.block_size
def __init__(self, key):
self.key = generate_key(key)
def encrypt(self, message):
message = self._pad(message)
iv = Random.new().read(AES.block_size)
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return str(base64.b64encode(iv + cipher.encrypt(bytes(message, encoding='utf-8'))), encoding="utf-8")
def _pad(self, s):
return s + (self.bs - len(s) % self.bs) * chr(self.bs - len(s) % self.bs)
def encrypt(key, data):
key = base64.b64decode(key)
cipher = AESCipher(key)
return cipher.encrypt(data)
def get_port_socket_inode(client, port):
port = hex(port)[2:].zfill(4).upper()
cmd = "bash -c 'cat /proc/net/{tcp,udp}' | awk -F' ' '{print $2,$10}' | grep '00000000:%s' | awk -F' ' '{print $2}' | uniq" % port
res = client.execute_command(cmd)
if not res or not res.stdout.strip():
return False
stdio.verbose(res.stdout)
return res.stdout.strip().split('\n')
def confirm_port(client, pid, port):
socket_inodes = get_port_socket_inode(client, port)
if not socket_inodes:
return False
ret = client.execute_command("ls -l /proc/%s/fd/ |grep -E 'socket:\[(%s)\]'" % (pid, '|'.join(socket_inodes)))
if ret and ret.stdout.strip():
return True
return False
def generate_aes_b64_key():
n = random.randint(1, 3) * 8
key = []
c = 0
while c < n:
key += chr(random.randint(33, 127))
c += 1
key = ''.join(key)
return base64.b64encode(key.encode('utf-8'))
def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
global stdio
cluster_config = plugin_context.cluster_config
clients = plugin_context.clients
stdio = plugin_context.stdio
options = plugin_context.options
config_files = {}
pid_path = {}
targets = []
yaml = YamlLoader(stdio)
need_encrypted = []
config_map = {
"monitor_password": "root_password",
"sql_port": "mysql_port",
"rpc_port": "rpc_port",
"cluster_name": "appname",
"cluster_id": "cluster_id",
"zone_name": "zone",
"ob_install_path": "home_path"
}
stdio.start_loading('Start obproxy')
for server in cluster_config.servers:
client = clients[server]
server_config = cluster_config.get_server_conf(server)
targets.append('%s:%s' % (server.ip, server_config["server_port"]))
for path in glob(os.path.join(repository_dir, 'conf/*/*.yaml')):
with open(path) as f:
text = f.read()
target = set(re.findall('\n((\s+)-\s+\{target\})', text))
for pt in target:
text = text.replace(pt[0], ('%s- ' % pt[1]) + ('\n%s- ' % pt[1]).join(targets))
keys = set(re.findall('\${([\.\w]+)\}', text))
for key in keys:
text = text.replace('${%s}' % key, '$\[[%s\]]' % key)
config_files[path] = text
for path in glob(os.path.join(repository_dir, 'conf/config_properties/*.yaml')):
with open(path) as f:
data = yaml.load(f).get('configs', [])
for conf in data:
if conf.get('encrypted'):
key = conf.get('value')
if key and isinstance(key, dict):
key = list(key.keys())[0]
need_encrypted.append(key)
for server in cluster_config.servers:
client = clients[server]
server_config = deepcopy(cluster_config.get_server_conf(server))
default_server_config = cluster_config.get_server_conf_with_default(server)
obs_config = {}
home_path = server_config['home_path']
remote_pid_path = '%s/run/obagent-%s-%s.pid' % (home_path, server.ip, server_config["server_port"])
pid_path[server] = remote_pid_path
remote_pid = client.execute_command("cat %s" % pid_path[server]).stdout.strip()
if remote_pid and client.execute_command('ls /proc/%s' % remote_pid):
continue
for comp in ['oceanbase', 'oceanbase-ce']:
obs_config = cluster_config.get_depled_config(comp, server)
if obs_config is not None:
break
if obs_config is None:
obs_config = {}
for key in config_map:
k = config_map[key]
if not server_config.get(key):
server_config[key] = obs_config.get(k, default_server_config.get(key))
for key in default_server_config:
if not server_config.get(key):
server_config[key] = default_server_config.get(key)
server_config['host_ip'] = server.ip
for key in server_config:
if server_config[key] is None:
server_config[key] = ''
if isinstance(server_config[key], bool):
server_config[key] = str(server_config[key]).lower()
if server_config.get('crypto_method', 'plain').lower() == 'aes':
secret_key = generate_aes_b64_key()
crypto_path = server_config.get('crypto_path', 'conf/.config_secret.key')
crypto_path = os.path.join(home_path, crypto_path)
client.execute_command('echo "%s" > %s' % (secret_key.decode('utf-8') if isinstance(secret_key, bytes) else secret_key, crypto_path))
for key in need_encrypted:
value = server_config.get(key)
if value:
server_config[key] = encrypt(secret_key, value)
for path in config_files:
stdio.verbose('format %s' % path)
with tempfile.NamedTemporaryFile(suffix=".yaml", mode='w') as tf:
text = config_files[path].format(**server_config)
text = text.replace('\[[', '{').replace('\]]', '}')
tf.write(text)
tf.flush()
if not client.put_file(tf.name, path.replace(repository_dir, home_path)):
stdio.error('Fail to send config file to %s' % server)
stdio.stop_loading('fail')
return
for path in glob(os.path.join(repository_dir, 'conf/*/*')):
if path.endswith('.yaml'):
continue
if os.path.isdir(path):
ret = client.put_dir(path, path.replace(repository_dir, home_path))
else:
ret = client.put_file(path, path.replace(repository_dir, home_path))
if not ret:
stdio.error('Fail to send config file to %s' % server)
stdio.stop_loading('fail')
return
config = {
'log': {
'level': server_config.get('log_level', 'info'),
'filename': server_config.get('log_path', 'log/monagent.log'),
'maxsize': int(server_config.get('log_size', 30)),
'maxage': int(server_config.get('log_expire_day', 7)),
'maxbackups': int(server_config.get('maxbackups', 10)),
'localtime': True if server_config.get('log_use_localtime', True) else False,
'compress': True if server_config.get('log_compress', True) else False
},
'server': {
'address': '0.0.0.0:%d' % int(server_config.get('server_port', 8088)),
'adminAddress': '0.0.0.0:%d' % int(server_config.get('pprof_port', 8089)),
'runDir': 'run'
},
'cryptoMethod': server_config['crypto_method'] if server_config.get('crypto_method').lower() in ['aes', 'plain'] else 'plain',
'cryptoPath': server_config.get('crypto_path'),
'modulePath': 'conf/module_config',
'propertiesPath': 'conf/config_properties'
}
with tempfile.NamedTemporaryFile(suffix=".yaml") as tf:
yaml.dump(config, tf)
if not client.put_file(tf.name, os.path.join(home_path, 'conf/monagent.yaml')):
stdio.error('Fail to send config file to %s' % server)
stdio.stop_loading('fail')
return
log_path = '%s/log/monagent_stdout.log' % home_path
client.execute_command('cd %s;nohup %s/bin/monagent -c conf/monagent.yaml >> %s 2>&1 & echo $! > %s' % (home_path, home_path, log_path, remote_pid_path))
stdio.stop_loading('succeed')
stdio.start_loading('obagent program health check')
time.sleep(1)
failed = []
fail_time = 0
for server in cluster_config.servers:
client = clients[server]
server_config = cluster_config.get_server_conf(server)
stdio.verbose('%s program health check' % server)
pid = client.execute_command("cat %s" % pid_path[server]).stdout.strip()
if pid:
if confirm_port(client, pid, int(server_config["server_port"])):
stdio.verbose('%s obagent[pid: %s] started', server, pid)
client.execute_command('echo %s > %s' % (pid, pid_path[server]))
else:
fail_time += 1
else:
failed.append('failed to start %s obagent' % server)
if failed:
stdio.stop_loading('fail')
for msg in failed:
stdio.warn(msg)
plugin_context.return_false()
else:
stdio.stop_loading('succeed')
plugin_context.return_true(need_bootstrap=False)
......@@ -24,17 +24,22 @@ from __future__ import absolute_import, division, print_function
def bootstrap(plugin_context, cursor, *args, **kwargs):
cluster_config = plugin_context.cluster_config
stdio = plugin_context.stdio
global_ret = True
for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server)
for key in ['observer_sys_password', 'obproxy_sys_password']:
if server_config.get(key):
sql = 'alter proxyconfig set %s = %%s' % key
value = None
try:
value = str(server_config[key])
stdio.verbose('execute sql: %s' % (sql % value))
cursor[server].execute(sql, [value])
except:
stdio.exception('execute sql exception: %s' % (sql % (value)))
stdio.warm('failed to set %s for obproxy(%s)' % (key, server))
plugin_context.return_true()
sql = 'alter proxyconfig set %s = %%s' % key
value = None
try:
value = server_config.get(key, '')
value = '' if value is None else str(value)
stdio.verbose('execute sql: %s' % (sql % value))
cursor[server].execute(sql, [value])
except:
stdio.exception('execute sql exception: %s' % (sql % (value)))
stdio.error('failed to set %s for obproxy(%s)' % (key, server))
global_ret = False
if global_ret:
plugin_context.return_true()
else:
plugin_context.return_false()
......@@ -42,6 +42,19 @@ def _connect(ip, port, user, password=''):
return db, cursor
def execute(cursor, query, args=None):
msg = query % tuple(args) if args is not None else query
stdio.verbose('execute sql: %s' % msg)
# stdio.verbose("query: %s. args: %s" % (query, args))
try:
cursor.execute(query, args)
return cursor.fetchone()
except:
msg = 'execute sql exception: %s' % msg
stdio.exception(msg)
raise Exception(msg)
def connect(plugin_context, target_server=None, sys_root=True, *args, **kwargs):
global stdio
count = 10
......@@ -54,10 +67,30 @@ def connect(plugin_context, target_server=None, sys_root=True, *args, **kwargs):
else:
servers = cluster_config.servers
stdio.start_loading('Connect to obproxy')
if sys_root:
user = 'root@proxysys'
else:
user = 'root'
user = kwargs.get('user')
password = kwargs.get('password')
if not user:
if sys_root:
user = 'root@proxysys'
else:
user = 'root'
for comp in ['oceanbase', 'oceanbase-ce']:
if comp in cluster_config.depends:
ob_config = cluster_config.get_depled_config(comp)
if not ob_config:
continue
odp_config = cluster_config.get_global_conf()
config_map = {
'observer_sys_password': 'proxyro_password',
'cluster_name': 'appname',
'root_password': 'observer_root_password'
}
for key in config_map:
ob_key = config_map[key]
if key not in odp_config and ob_key in ob_config:
cluster_config.update_global_conf(key, ob_config.get(ob_key), save=False)
break
dbs = {}
cursors = {}
while count and servers:
......@@ -66,8 +99,16 @@ def connect(plugin_context, target_server=None, sys_root=True, *args, **kwargs):
for server in servers:
try:
server_config = cluster_config.get_server_conf(server)
pwd_key = 'obproxy_sys_password' if sys_root else 'observer_sys_password'
db, cursor = _connect(server.ip, server_config['listen_port'], user, server_config.get(pwd_key, '') if count % 2 else '')
if sys_root:
pwd_key = 'obproxy_sys_password'
default_pwd = 'proxysys'
else:
pwd_key = 'observer_root_password'
default_pwd = ''
r_password = password if password else server_config.get(pwd_key)
if r_password is None:
r_password = ''
db, cursor = _connect(server.ip, server_config['listen_port'], user, r_password if count % 2 else default_pwd)
dbs[server] = db
cursors[server] = cursor
except:
......@@ -76,7 +117,7 @@ def connect(plugin_context, target_server=None, sys_root=True, *args, **kwargs):
servers = tmp_servers
servers and time.sleep(3)
if servers:
if servers:
stdio.stop_loading('fail')
return plugin_context.return_false()
else:
......
......@@ -20,6 +20,8 @@
from __future__ import absolute_import, division, print_function
global_ret = True
def destroy(plugin_context, *args, **kwargs):
def clean(server, path):
......@@ -27,6 +29,7 @@ def destroy(plugin_context, *args, **kwargs):
ret = client.execute_command('rm -fr %s/* %s/.conf' % (path, path))
if not ret:
# pring stderror
global global_ret
global_ret = False
stdio.warn('fail to clean %s:%s' % (server, path))
else:
......@@ -34,7 +37,6 @@ def destroy(plugin_context, *args, **kwargs):
cluster_config = plugin_context.cluster_config
clients = plugin_context.clients
stdio = plugin_context.stdio
global_ret = True
stdio.start_loading('obproxy work dir cleaning')
for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server)
......@@ -44,4 +46,4 @@ def destroy(plugin_context, *args, **kwargs):
stdio.stop_loading('succeed')
plugin_context.return_true()
else:
stdio.stop_loading('fail')
\ No newline at end of file
stdio.stop_loading('fail')
- src_path: ./home/admin/obproxy-3.1.0/bin/obproxy
- src_path: ./home/admin/obproxy-$version/bin/obproxy
target_path: bin/obproxy
type: bin
mode: 755
\ No newline at end of file
......@@ -41,8 +41,8 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
stdio.error('failed to initialize %s home path: %s' % (server, ret.stderr))
continue
if not (client.execute_command("bash -c 'mkdir -p %s/{run,bin,lib}'" % (home_path)) \
and client.execute_command("if [ -d %s/bin ]; then ln -s %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -s %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))):
and client.execute_command("if [ -d %s/bin ]; then ln -fs %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -fs %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))):
global_ret = False
stdio.error('fail to init %s home path', server)
......
......@@ -242,6 +242,13 @@
max_value: true
need_restart: false
description_en: if client connections reach throttle, true is that new connection will be accepted, and eliminate lru client connection, false is that new connection will disconnect, and err packet will be returned
- name: max_connections
type: INT
default: 60000
min_value: 0
max_value: 65535
need_restart: false
description_en: max fd proxy could use
- name: client_max_connections
type: INT
default: 8192
......@@ -438,4 +445,11 @@
max_value: ''
min_value: ''
need_restart: false
description_en: password of observer proxyro user
\ No newline at end of file
description_en: password of observer proxyro user
- name: observer_root_password
type: STRING
default: ''
max_value: ''
min_value: ''
need_restart: false
description_en: password of observer root user
\ No newline at end of file
......@@ -95,7 +95,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
clusters_cmd = {}
real_cmd = {}
pid_path = {}
need_bootstrap = True
need_bootstrap = False
for comp in ['oceanbase', 'oceanbase-ce']:
if comp in cluster_config.depends:
......@@ -118,7 +118,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
}
for key in config_map:
ob_key = config_map[key]
if not odp_config.get(key) and ob_config.get(ob_key):
if key not in odp_config and ob_key in ob_config:
cluster_config.update_global_conf(key, ob_config.get(ob_key), save=False)
break
......@@ -133,6 +133,15 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
return plugin_context.return_false()
stdio.start_loading('Start obproxy')
for server in cluster_config.servers:
client = clients[server]
server_config = cluster_config.get_server_conf(server)
home_path = server_config['home_path']
if not client.execute_command('ls %s/etc/obproxy_config.bin' % home_path):
need_bootstrap = True
break
for server in cluster_config.servers:
client = clients[server]
server_config = cluster_config.get_server_conf(server)
......@@ -147,7 +156,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
pid_path[server] = "%s/run/obproxy-%s-%s.pid" % (home_path, server.ip, server_config["listen_port"])
if getattr(options, 'without_parameter', False) and client.execute_command('ls %s/etc/obproxy_config.bin' % home_path):
if getattr(options, 'without_parameter', False) and need_bootstrap is False:
use_parameter = False
else:
use_parameter = True
......@@ -159,9 +168,9 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
'rs_list',
'cluster_name'
]
start_unuse = ['home_path', 'observer_sys_password', 'obproxy_sys_password']
start_unuse = ['home_path', 'observer_sys_password', 'obproxy_sys_password', 'observer_root_password']
get_value = lambda key: "'%s'" % server_config[key] if isinstance(server_config[key], str) else server_config[key]
opt_str = []
opt_str = ["obproxy_sys_password=e3fd448c516073714189b57233c9cf428ccb1bed"]
for key in server_config:
if key not in start_unuse and key not in not_opt_str:
value = get_value(key)
......@@ -186,9 +195,9 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
remote_pid = client.execute_command("cat %s" % pid_path[server]).stdout.strip()
cmd = real_cmd[server].replace('\'', '')
if remote_pid:
ret = client.execute_command('cat /proc/%s/cmdline' % remote_pid)
ret = client.execute_command('ls /proc/%s/' % remote_pid)
if ret:
if ret.stdout.replace('\0', '') == cmd.strip().replace(' ', ''):
if confirm_port(client, remote_pid, port):
continue
stdio.stop_loading('fail')
stdio.error('%s:%s port is already used' % (server.ip, port))
......@@ -206,28 +215,35 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
stdio.stop_loading('succeed')
stdio.start_loading('obproxy program health check')
time.sleep(3)
failed = []
fail_time = 0
for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server)
client = clients[server]
stdio.verbose('%s program health check' % server)
remote_pid = client.execute_command("cat %s" % pid_path[server]).stdout.strip()
if remote_pid:
for pid in remote_pid.split('\n'):
confirm = confirm_port(client, pid, int(server_config["listen_port"]))
if confirm:
stdio.verbose('%s obproxy[pid: %s] started', server, pid)
client.execute_command('echo %s > %s' % (pid, pid_path[server]))
obproxyd(server_config["home_path"], client, server.ip, server_config["listen_port"])
break
else:
fail_time += 1
if fail_time == len(remote_pid.split('\n')):
servers = cluster_config.servers
count = 4
while servers and count:
count -= 1
tmp_servers = []
for server in servers:
server_config = cluster_config.get_server_conf(server)
client = clients[server]
stdio.verbose('%s program health check' % server)
remote_pid = client.execute_command("cat %s" % pid_path[server]).stdout.strip()
if remote_pid:
for pid in remote_pid.split('\n'):
confirm = confirm_port(client, pid, int(server_config["listen_port"]))
if confirm:
stdio.verbose('%s obproxy[pid: %s] started', server, pid)
client.execute_command('echo %s > %s' % (pid, pid_path[server]))
obproxyd(server_config["home_path"], client, server.ip, server_config["listen_port"])
break
stdio.verbose('failed to start %s obproxy, remaining retries: %d' % (server, count))
if count:
tmp_servers.append(server)
else:
failed.append('failed to start %s obproxy' % server)
else:
failed.append('failed to start %s obproxy' % server)
else:
failed.append('failed to start %s obproxy' % server)
servers = tmp_servers
if servers and count:
time.sleep(1)
if failed:
stdio.stop_loading('fail')
for msg in failed:
......@@ -235,4 +251,4 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
plugin_context.return_false()
else:
stdio.stop_loading('succeed')
plugin_context.return_true(need_bootstrap=True)
plugin_context.return_true(need_bootstrap=need_bootstrap)
......@@ -62,7 +62,7 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs):
remote_pid_path = "%s/run/obproxy-%s-%s.pid" % (server_config['home_path'], server.ip, server_config["listen_port"])
remote_pid = client.execute_command("cat %s" % remote_pid_path).stdout.strip()
if remote_pid:
if client.execute_command('ls /proc/%s' % remote_pid):
if client.execute_command('ls /proc/%s/fd' % remote_pid):
continue
if ip not in servers_port:
......
......@@ -55,6 +55,7 @@ def stop(plugin_context, *args, **kwargs):
servers = {}
stdio.start_loading('Stop obproxy')
success = True
for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server)
client = clients[server]
......@@ -64,8 +65,8 @@ def stop(plugin_context, *args, **kwargs):
remote_pid_path = '%s/run/obproxy-%s-%s.pid' % (server_config["home_path"], server.ip, server_config["listen_port"])
obproxyd_pid_path = '%s/run/obproxyd-%s-%s.pid' % (server_config["home_path"], server.ip, server_config["listen_port"])
remote_pid = client.execute_command('cat %s' % remote_pid_path).stdout.strip()
if remote_pid:
if client.execute_command('ls /proc/%s' % remote_pid):
if remote_pid and client.execute_command('ls /proc/%s' % remote_pid):
if client.execute_command('ls /proc/%s/fd' % remote_pid):
stdio.verbose('%s obproxy[pid:%s] stopping ...' % (server, remote_pid))
client.execute_command('cat %s | xargs kill -9; kill -9 -%s' % (obproxyd_pid_path, remote_pid))
servers[server] = {
......@@ -75,8 +76,14 @@ def stop(plugin_context, *args, **kwargs):
'pid': remote_pid,
'path': remote_pid_path
}
else:
stdio.verbose('failed to stop obproxy[pid:%s] in %s, permission deny' % (remote_pid, server))
success = False
else:
stdio.verbose('%s obproxy is not running' % server)
if not success:
stdio.stop_loading('fail')
return plugin_context.return_false()
count = 10
check = lambda client, pid, port: confirm_port(client, pid, port) if count < 5 else get_port_socket_inode(client, port)
......
......@@ -21,17 +21,22 @@
from __future__ import absolute_import, division, print_function
def upgrade(plugin_context, stop_plugin, start_plugin, connect_plugin, display_plugin, *args, **kwargs):
def upgrade(plugin_context, search_py_script_plugin, apply_param_plugin, *args, **kwargs):
components = plugin_context.components
clients = plugin_context.clients
cluster_config = plugin_context.cluster_config
cmd = plugin_context.cmd
options = plugin_context.options
stdio = plugin_context.stdio
upgrade_ctx = kwargs.get('upgrade_ctx')
local_home_path = kwargs.get('local_home_path')
repository_dir = kwargs.get('repository_dir')
upgrade_repositories = kwargs.get('upgrade_repositories')
cur_repository = upgrade_repositories[0]
dest_repository = upgrade_repositories[-1]
repository_dir = dest_repository.repository_dir
kwargs['repository_dir'] = repository_dir
for server in cluster_config.servers:
client = clients[server]
......@@ -43,11 +48,20 @@ def upgrade(plugin_context, stop_plugin, start_plugin, connect_plugin, display_p
client.execute_command("ln -fs %s/bin/* %s/bin" % (remote_repository_dir, home_path))
client.execute_command("ln -fs %s/lib/* %s/lib" % (remote_repository_dir, home_path))
stop_plugin = search_py_script_plugin([cur_repository], 'stop')[cur_repository]
start_plugin = search_py_script_plugin([dest_repository], 'start')[dest_repository]
connect_plugin = search_py_script_plugin([dest_repository], 'connect')[dest_repository]
display_plugin = search_py_script_plugin([dest_repository], 'display')[dest_repository]
apply_param_plugin(cur_repository)
if not stop_plugin(components, clients, cluster_config, cmd, options, stdio, *args, **kwargs):
return
apply_param_plugin(dest_repository)
if not start_plugin(components, clients, cluster_config, cmd, options, stdio, *args, **kwargs):
return
ret = connect_plugin(components, clients, cluster_config, cmd, options, stdio, *args, **kwargs)
if ret and display_plugin(components, clients, cluster_config, cmd, options, stdio, ret.get_return('cursor'), *args, **kwargs):
upgrade_ctx['index'] = len(upgrade_repositories)
return plugin_context.return_true()
- src_path: ./home/admin/obproxy-3.2.0/bin/obproxy
- src_path: ./home/admin/obproxy-$version/bin/obproxy
target_path: bin/obproxy
type: bin
mode: 755
\ No newline at end of file
oceanbase
\ No newline at end of file
......@@ -58,7 +58,7 @@ def bootstrap(plugin_context, cursor, *args, **kwargs):
sql = 'grant select on oceanbase.* to proxyro IDENTIFIED BY "%s"' % value
stdio.verbose(sql)
cursor.execute(sql)
if global_conf.get('root_password'):
if global_conf.get('root_password') is not None:
sql = 'alter user "root" IDENTIFIED BY "%s"' % global_conf.get('root_password')
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
......
......@@ -61,11 +61,11 @@ def connect(plugin_context, target_server=None, *args, **kwargs):
try:
server_config = cluster_config.get_server_conf(server)
password = server_config.get('root_password', '') if count % 2 else ''
db, cursor = _connect(server.ip, server_config['mysql_port'], password)
db, cursor = _connect(server.ip, server_config['mysql_port'], password if password is not None else '')
stdio.stop_loading('succeed')
return plugin_context.return_true(connect=db, cursor=cursor)
return plugin_context.return_true(connect=db, cursor=cursor, server=server)
except:
pass
stdio.exception('')
time.sleep(3)
stdio.stop_loading('fail')
......
......@@ -202,11 +202,11 @@ def create_tenant(plugin_context, cursor, *args, **kwargs):
min_iops = get_option('min_iops', max_iops)
if cpu_total < max_cpu:
return error('resource not enough: cpu (Avail: %s, Need: %s)' % (max_cpu, cpu_total))
return error('resource not enough: cpu (Avail: %s, Need: %s)' % (cpu_total, max_cpu))
if mem_total < max_memory:
return error('resource not enough: memory (Avail: %s, Need: %s)' % (formate_size(max_memory), formate_size(mem_total)))
return error('resource not enough: memory (Avail: %s, Need: %s)' % (formate_size(mem_total), formate_size(max_memory)))
if disk_total < max_disk_size:
return error('resource not enough: disk space (Avail: %s, Need: %s)' % (formate_size(max_disk_size), formate_size(disk_total)))
return error('resource not enough: disk space (Avail: %s, Need: %s)' % (formate_size(disk_total), formate_size(max_disk_size)))
if max_iops < MIN_IOPS:
return error('max_iops must greater than %d' % MIN_IOPS)
......
......@@ -20,6 +20,8 @@
from __future__ import absolute_import, division, print_function
global_ret = True
def destroy(plugin_context, *args, **kwargs):
def clean(server, path):
......@@ -27,14 +29,14 @@ def destroy(plugin_context, *args, **kwargs):
ret = client.execute_command('rm -fr %s/* %s/.conf' % (path, path))
if not ret:
# print stderror
global global_ret
global_ret = False
stdio.warn('fail to clean %s:%s', (server, path))
stdio.warn('fail to clean %s:%s' % (server, path))
else:
stdio.verbose('%s:%s cleaned' % (server, path))
cluster_config = plugin_context.cluster_config
clients = plugin_context.clients
stdio = plugin_context.stdio
global_ret = True
stdio.start_loading('observer work dir cleaning')
for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server)
......@@ -47,4 +49,4 @@ def destroy(plugin_context, *args, **kwargs):
stdio.stop_loading('succeed')
plugin_context.return_true()
else:
stdio.stop_loading('fail')
\ No newline at end of file
stdio.stop_loading('fail')
- src_path: ./home/admin/oceanbase/bin/observer
target_path: bin/observer
type: bin
mode: 755
\ No newline at end of file
mode: 755
- src_path: ./home/admin/oceanbase/etc
target_path: etc
type: dir
\ No newline at end of file
......@@ -56,8 +56,10 @@ def format_size(size, precision=1):
def get_system_memory(memory_limit):
if memory_limit <= (64 << 30):
system_memory = memory_limit * 0.5
else:
elif memory_limit <= (150 << 30):
system_memory = memory_limit * 0.4
else:
system_memory = memory_limit * 0.3
system_memory = max(4 << 30, system_memory)
return format_size(system_memory, 0)
......@@ -143,7 +145,7 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
free_memory = parse_size(str(v))
memory_limit = free_memory
if memory_limit < MIN_MEMORY:
stdio.errorn('(%s) not enough memory. (Free: %s, Need: %s)' % (ip, format_size(free_memory), format_size(MIN_MEMORY)))
stdio.error('(%s) not enough memory. (Free: %s, Need: %s)' % (ip, format_size(free_memory), format_size(MIN_MEMORY)))
success = False
continue
memory_limit = max(MIN_MEMORY, memory_limit * 0.9)
......@@ -171,7 +173,7 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
ret = client.execute_command("grep -e 'processor\s*:' /proc/cpuinfo | wc -l")
if ret and ret.stdout.strip().isdigit():
cpu_num = int(ret.stdout)
server_config['cpu_count'] = max(16, int(cpu_num * 0.8))
server_config['cpu_count'] = max(16, int(cpu_num * - 2))
else:
server_config['cpu_count'] = 16
......@@ -188,7 +190,19 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
'avail': int(avail) << 10,
'need': 0,
}
for include_dir in dirs.values():
while include_dir not in disk:
ret = client.execute_command('df --block-size=1024 %s' % include_dir)
if ret:
for total, used, avail, puse, path in re.findall('(\d+)\s+(\d+)\s+(\d+)\s+(\d+%)\s+(.+)', ret.stdout):
disk[path] = {
'total': int(total) << 10,
'avail': int(avail) << 10,
'need': 0,
}
break
else:
include_dir = os.path.dirname(include_dir)
mounts = {}
for key in dirs:
path = dirs[key]
......
......@@ -116,8 +116,8 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
else:
critical('fail to init %s home path: create %s failed' % (server, home_path))
ret = client.execute_command('bash -c "mkdir -p %s/{etc,admin,.conf,log,bin,lib}"' % home_path) \
and client.execute_command("if [ -d %s/bin ]; then ln -s %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -s %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))
and client.execute_command("if [ -d %s/bin ]; then ln -fs %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -fs %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))
if ret:
data_path = server_config['data_dir']
if force:
......
......@@ -12,6 +12,7 @@
default: 1
min_value: 1
max_value: 4294901759
modify_limit: modify
need_restart: true
description_en: ID of the cluster
description_local: 本OceanBase集群ID
......@@ -63,6 +64,7 @@
default: 2882
min_value: 1025
max_value: 65535
modify_limit: modify
need_restart: true
description_en: the port number for RPC protocol.
description_local: 集群内部通信的端口号
......@@ -72,6 +74,7 @@
default: 2881
min_value: 1025
max_value: 65535
modify_limit: modify
need_restart: true
description_en: port number for mysql connection
description_local: SQL服务协议端口号
......@@ -105,16 +108,6 @@
need_restart: false
description_en: separate system and user commit log. The default value is false.
description_local: 是否把系统事务日志与用户事务日志分开存储
- name: min_observer_version
require: false
type: STRING
default: 1.1.0
min_value: NULL
max_value: NULL
section: ROOT_SERVICE
need_restart: false
description_en: the min observer version
description_local: 本集群最小的observer程序版本号
- name: sys_cpu_limit_trigger
require: false
type: INT
......@@ -131,6 +124,7 @@
default: 80
min_value: 10
max_value: 90
modify_limit: decrease
section: OBSERVER
need_restart: false
description_en: memory limit percentage of the total physical memory
......@@ -207,7 +201,7 @@
description_local: 位置缓存刷新请求的最小间隔,防止产生过多刷新请求造成系统压力过大
- name: trace_log_slow_query_watermark
type: TIME
default: 100ms
default: 1s
min_value: 1ms
max_value: NULL
section: OBSERVER
......@@ -300,6 +294,7 @@
default: 0
min_value: 0M
max_value: NULL
modify_limit: decrease
section: SSTABLE
need_restart: false
description_en: size of the data file.
......@@ -417,7 +412,7 @@
- name: freeze_trigger_percentage
require: false
type: INT
default: 70
default: 50
min_value: 1
max_value: 99
section: TENANT
......@@ -856,7 +851,7 @@
- name: enable_merge_by_turn
require: false
type: BOOL
default: true
default: false
min_value: NULL
max_value: NULL
section: DAILY_MERGE
......@@ -908,7 +903,7 @@
type: INT
default: 0
min_value: 0
max_value: 64
max_value: 256
section: OBSERVER
need_restart: false
description_en: worker thread num for compaction
......@@ -926,9 +921,9 @@
- name: net_thread_count
require: false
type: INT
default: 12
min_value: 1
max_value: 100
default: 0
min_value: 0
max_value: 128
section: OBSERVER
need_restart: true
description_en: the number of rpc/mysql I/O threads for Libeasy.
......@@ -946,7 +941,7 @@
- name: minor_freeze_times
require: false
type: INT
default: 5
default: 100
min_value: 0
max_value: 65535
section: TENANT
......@@ -1286,7 +1281,7 @@
description_local: 备副本的事务日志和主副本差距超过该阈值时,触发副本重建
- name: system_memory
type: CAPACITY
default: 16G
default: 30G
min_value: 0M
max_value: NULL
section: OBSERVER
......@@ -1346,7 +1341,7 @@
- name: default_row_format
require: false
type: STRING
default: dynamic
default: compact
min_value: NULL
max_value: NULL
section: OBSERVER
......@@ -1399,6 +1394,7 @@
default: 0
min_value: NULL
max_value: NULL
modify_limit: decrease
section: OBSERVER
need_restart: false
description_en: the size of the memory reserved for internal use(for testing purpose)
......@@ -1407,7 +1403,7 @@
require: true
type: INT
default: 268435456
min_value: 536870912
min_value: 268435456
max_value:
need_restart: false
description_en: the minimum memory limit of the resource pool
......@@ -1755,7 +1751,7 @@
- name: clog_sync_time_warn_threshold
require: false
type: TIME
default: 100ms
default: 1s
min_value: 1ms
max_value: 10000ms
section: TRANS
......@@ -2028,6 +2024,7 @@
default: 90
min_value: 5
max_value: 99
modify_limit: decrease
section: SSTABLE
need_restart: false
description_en: the percentage of disk space used by the data files.
......@@ -2035,7 +2032,7 @@
- name: default_compress_func
require: false
type: STRING
default: zstd_1.0
default: zstd_1.3.8
min_value: NULL
max_value: NULL
section: OBSERVER
......
......@@ -100,8 +100,8 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
client.execute_command("bash -c 'mkdir -p %s/{bin,lib}'" % (home_path))
client.execute_command("ln -s %s/bin/* %s/bin" % (remote_repository_dir, home_path))
client.execute_command("ln -s %s/lib/* %s/lib" % (remote_repository_dir, home_path))
client.execute_command("ln -fs %s/bin/* %s/bin" % (remote_repository_dir, home_path))
client.execute_command("ln -fs %s/lib/* %s/lib" % (remote_repository_dir, home_path))
if not server_config.get('data_dir'):
server_config['data_dir'] = '%s/store' % home_path
......
......@@ -226,6 +226,23 @@ def _start_check(plugin_context, strict_check=False, *args, **kwargs):
'need': 0,
'threshold': 2
}
all_path = set(list(servers_disk[ip].keys()) + list(servers_clog_mount[ip].keys()))
for include_dir in all_path:
while include_dir not in disk:
ret = client.execute_command('df --block-size=1024 %s' % include_dir)
if ret:
for total, used, avail, puse, path in re.findall('(\d+)\s+(\d+)\s+(\d+)\s+(\d+%)\s+(.+)',
ret.stdout):
disk[path] = {
'total': int(total) << 10,
'avail': int(avail) << 10,
'need': 0,
'threshold': 2
}
break
else:
include_dir = os.path.dirname(include_dir)
stdio.verbose('disk: {}'.format(disk))
for path in servers_disk[ip]:
kp = '/'
for p in disk:
......
此差异已折叠。
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import os
def upgrade_check(plugin_context, current_repository, repositories, route, cursor, *args, **kwargs):
def execute_sql(query, args=None, one=True, error=True):
msg = query % tuple(args) if args is not None else query
stdio.verbose("query: %s. args: %s" % (query, args))
try:
stdio.verbose('execute sql: %s' % msg)
cursor.execute(query, args)
result = cursor.fetchone() if one else cursor.fetchall()
result and stdio.verbose(result)
return result
except:
msg = 'execute sql exception: %s' % msg if error else ''
stdio.exception(msg)
return False
options = plugin_context.options
stdio = plugin_context.stdio
cluster_config = plugin_context.cluster_config
skip_check = getattr(options, 'skip_check', False)
can_skip = ['upgrade_checker.py', 'upgrade_post_checker.py']
large_upgrade_need = ['upgrade_pre.py', 'upgrade_post.py']
zones = set()
for server in cluster_config.servers:
config = cluster_config.get_server_conf_with_default(server)
zone = config['zone']
zones.add(zone)
if len(zones) > 2:
tenants = execute_sql('select * from oceanbase.gv$tenant', one=False)
for tenant in tenants:
zone_list = tenant.get('zone_list', '').split(';')
if len(zone_list) < 3:
stdio.error('Tenant %s does not meet rolling upgrade conditions (zone number greater than 2).' % tenant.get('tenant_name'))
return
succeed = True
n, i = len(route), 1
while i < n:
cant_use = False
node = route[i]
repository = repositories[i]
stdio.verbose('route %s-%s use %s. file check begin.' % (node.get('version'), node.get('release'), repository))
script_dir = os.path.join(repository.repository_dir, 'etc/direct_upgrade') if node.get('direct_upgrade') else os.path.join(repository.repository_dir, 'etc')
if skip_check is False:
for name in can_skip:
path = os.path.join(script_dir, name)
if not os.path.isfile(path):
succeed = False
stdio.error('No such file: %s . You can use --skip-check to skip this check or --disable to ban this package' % path)
if repository.version != current_repository.version:
for name in large_upgrade_need:
path = os.path.join(script_dir, name)
if not os.path.isfile(path):
cant_use = True
succeed = False
stdio.error('No such file: %s .' % path)
if cant_use:
stdio.error('%s 不可用于升级,可以使用--disable禁用该镜像' % repository)
i += 1
if succeed:
plugin_context.return_true()
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import os
def upgrade_file_check(plugin_context, current_repository, repositories, route, *args, **kwargs):
options = plugin_context.options
stdio = plugin_context.stdio
skip_check = getattr(options, 'skip_check', False)
can_skip = ['upgrade_checker.py', 'upgrade_post_checker.py']
large_upgrade_need = ['upgrade_pre.py', 'upgrade_post.py']
succeed = True
n, i = len(route), 1
while i < n:
cant_use = False
node = route[i]
repository = repositories[i]
stdio.verbose('route %s-%s use %s. file check begin.' % (node.get('version'), node.get('release'), repository))
script_dir = os.path.join(repository.repository_dir, 'etc/direct_upgrade') if node.get('direct_upgrade') else os.path.join(repository.repository_dir, 'etc')
if skip_check is False:
for name in can_skip:
path = os.path.join(script_dir, name)
if not os.path.isfile(path):
succeed = False
stdio.error('No such file: %s . You can use --skip-check to skip this check or --disable to ban this package' % path)
if repository.version != current_repository.version:
for name in large_upgrade_need:
path = os.path.join(script_dir, name)
if not os.path.isfile(path):
cant_use = True
succeed = False
stdio.error('No such file: %s .' % path)
if cant_use:
stdio.error('%s 不可用于升级,可以使用--disable禁用该镜像' % repository)
i += 1
if succeed:
plugin_context.return_true()
\ No newline at end of file
此差异已折叠。
此差异已折叠。
......@@ -143,14 +143,6 @@ def run_test(plugin_context, db, cursor, *args, **kwargs):
stdio.error('fail to get tenant info')
return
sql = "select * from oceanbase.__all_user where user_name = '%s'" % user
sys_pwd = cluster_config.get_global_conf().get('root_password', '')
exec_sql_cmd = "%s -h%s -P%s -uroot@%s %s -A -e" % (obclient_bin, host, port, tenant_name, ("-p'%s'" % sys_pwd) if sys_pwd else '')
ret = LocalClient.execute_command('%s "%s"' % (exec_sql_cmd, sql), stdio=stdio)
if not ret or not ret.stdout:
stdio.error('User %s not exists.' % user)
return
if not_test_only:
sql_cmd_prefix = '%s -h%s -P%s -u%s@%s %s -A' % (obclient_bin, host, port, user, tenant_name, ("-p'%s'" % password) if password else '')
ret = local_execute_command('%s -e "%s"' % (sql_cmd_prefix, 'create database if not exists %s' % mysql_db))
......@@ -161,6 +153,12 @@ def run_test(plugin_context, db, cursor, *args, **kwargs):
else:
sql_cmd_prefix = '%s -h%s -P%s -u%s@%s %s -D %s -A' % (obclient_bin, host, port, user, tenant_name, ("-p'%s'" % password) if password else '', mysql_db)
ret = LocalClient.execute_command('%s -e "%s"' % (sql_cmd_prefix, 'select version();'), stdio=stdio)
if not ret:
stdio.error(ret.stderr)
return
for server in cluster_config.servers:
client = clients[server]
ret = client.execute_command("grep -e 'processor\s*:' /proc/cpuinfo | wc -l")
......
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册