未验证 提交 274bc962 编写于 作者: L LIN 提交者: GitHub

Merge pull request #130 from frf12/v1.5.0

v1.5.0
此差异已折叠。
此差异已折叠。
......@@ -49,7 +49,8 @@ class InitDirFailedErrorMessage(object):
PERMISSION_DENIED = ': {path} permission denied .'
DOC_LINK_MSG = 'See https://open.oceanbase.com/docs/obd-cn/V1.4.0/10000000000436999 .'
DOC_LINK = '<DOC_LINK>'
DOC_LINK_MSG = 'See {}'.format(DOC_LINK if DOC_LINK else "https://open.oceanbase.com/docs/obd-cn/V1.4.0/10000000000436999 .")
EC_CONFIG_CONFLICT_PORT = OBDErrorCode(1000, 'Configuration conflict {server1}:{port} port is used for {server2}\'s {key}')
EC_CONFLICT_PORT = OBDErrorCode(1001, '{server}:{port} port is already used')
......
......@@ -64,7 +64,7 @@ class MixLock(object):
FileUtil.exclusive_lock_obj(self.lock_obj, stdio=self.stdio)
except Exception as e:
raise LockError(e)
def _sh_lock(self):
if self.lock_obj:
try:
......@@ -100,7 +100,7 @@ class MixLock(object):
except Exception as e:
self.stdio and getattr(self.stdio, 'stop_loading', print)('fail')
raise LockError(e)
def _lock_escalation(self, try_times):
stdio = self.stdio
while try_times:
......
......@@ -22,9 +22,10 @@ from __future__ import absolute_import, division, print_function
import os
from tool import DirectoryUtil
from _stdio import SafeStdio
class Manager(object):
class Manager(SafeStdio):
RELATIVE_PATH = ''
......
......@@ -40,11 +40,10 @@ except:
from _arch import getArchList, getBaseArch
from _rpm import Package, PackageInfo
from tool import ConfigUtil, FileUtil
from tool import ConfigUtil, FileUtil, var_replace
from _manager import Manager
_KEYCRE = re.compile(r"\$(\w+)")
_ARCH = getArchList()
_RELEASE = None
SUP_MAP = {
......@@ -118,7 +117,7 @@ class MirrorRepository(object):
self.stdio and getattr(self.stdio, 'verbose', print)('pkg %s is %s, but %s is required' % (key, getattr(pkg, key), pattern[key]))
return None
return pkg
def get_rpm_pkg_by_info(self, pkg_info):
return None
......@@ -286,14 +285,20 @@ class RemoteMirrorRepository(MirrorRepository):
if self._db is None:
fp = FileUtil.unzip(file_path, stdio=self.stdio)
if not fp:
FileUtil.rm(file_path, stdio=self.stdio)
return []
self._db = {}
parser = cElementTree.iterparse(fp)
for event, elem in parser:
if RemoteMirrorRepository.ns_cleanup(elem.tag) == 'package' and elem.attrib.get('type') == 'rpm':
info = RemotePackageInfo(elem)
self._db[info.md5] = info
self._dump_db_cache()
try:
parser = cElementTree.iterparse(fp)
for event, elem in parser:
if RemoteMirrorRepository.ns_cleanup(elem.tag) == 'package' and elem.attrib.get('type') == 'rpm':
info = RemotePackageInfo(elem)
self._db[info.md5] = info
self._dump_db_cache()
except:
FileUtil.rm(file_path, stdio=self.stdio)
self.stdio and self.stdio.critical('failed to parse file %s, please retry later.' % file_path)
return []
return self._db
def _load_db_cache(self, path):
......@@ -341,29 +346,6 @@ class RemoteMirrorRepository(MirrorRepository):
def get_db_cache_file(mirror_path):
return os.path.join(mirror_path, RemoteMirrorRepository.DB_CACHE_FILE)
@staticmethod
def var_replace(string, var):
if not var:
return string
done = []
while string:
m = _KEYCRE.search(string)
if not m:
done.append(string)
break
varname = m.group(1).lower()
replacement = var.get(varname, m.group())
start, end = m.span()
done.append(string[:start])
done.append(str(replacement))
string = string[end:]
return ''.join(done)
def _load_repo_age(self):
try:
with open(self.get_repo_age_file(self.mirror_path), 'r') as f:
......@@ -817,8 +799,8 @@ class MirrorRepositorySection(object):
def get_mirror(self, server_vars, stdio=None):
meta_data = self.meta_data
meta_data['name'] = RemoteMirrorRepository.var_replace(meta_data['name'], server_vars)
meta_data['baseurl'] = RemoteMirrorRepository.var_replace(meta_data['baseurl'], server_vars)
meta_data['name'] = var_replace(meta_data['name'], server_vars)
meta_data['baseurl'] = var_replace(meta_data['baseurl'], server_vars)
mirror_path = os.path.join(self.remote_path, meta_data['name'])
mirror = RemoteMirrorRepository(mirror_path, meta_data, stdio)
return mirror
......@@ -947,9 +929,9 @@ class MirrorRepositoryManager(Manager):
def get_mirrors(self, is_enabled=True):
self._lock()
mirros = self.get_remote_mirrors(is_enabled=is_enabled)
mirros.append(self.local_mirror)
return mirros
mirrors = self.get_remote_mirrors(is_enabled=is_enabled)
mirrors.append(self.local_mirror)
return mirrors
def get_exact_pkg(self, **pattern):
only_info = 'only_info' in pattern and pattern['only_info']
......
......@@ -29,6 +29,7 @@ from copy import deepcopy
from _manager import Manager
from _rpm import Version
from ssh import ConcurrentExecutor
from tool import ConfigUtil, DynamicLoading, YamlLoader
......@@ -124,6 +125,7 @@ class PluginContext(object):
self.options = options
self.dev_mode = dev_mode
self.stdio = stdio
self.concurrent_exector = ConcurrentExecutor(32)
self._return = PluginReturn()
def get_return(self):
......@@ -164,7 +166,8 @@ class ScriptPlugin(Plugin):
def __getattr__(self, key):
def new_method(*args, **kwargs):
kwargs['stdio'] = self.stdio
if "stdio" not in kwargs:
kwargs['stdio'] = self.stdio
return attr(*args, **kwargs)
attr = getattr(self.client, key)
if hasattr(attr, '__call__'):
......@@ -595,12 +598,18 @@ class InstallPlugin(Plugin):
DIR = 1
BIN = 2
class InstallMethod(Enum):
ANY = 0
CP = 1
class FileItem(object):
def __init__(self, src_path, target_path, _type):
def __init__(self, src_path, target_path, _type, install_method):
self.src_path = src_path
self.target_path = target_path
self.type = _type if _type else InstallPlugin.FileItemType.FILE
self.install_method = install_method or InstallPlugin.InstallMethod.ANY
PLUGIN_TYPE = PluginType.INSTALL
FILES_MAP_YAML = 'file_map.yaml'
......@@ -611,6 +620,7 @@ class InstallPlugin(Plugin):
super(InstallPlugin, self).__init__(component_name, plugin_path, version, dev_mode)
self.file_map_path = os.path.join(self.plugin_path, self.FILES_MAP_YAML)
self._file_map = {}
self._file_map_data = None
@classmethod
def var_replace(cls, string, var):
......@@ -634,6 +644,13 @@ class InstallPlugin(Plugin):
return ''.join(done)
@property
def file_map_data(self):
if self._file_map_data is None:
with open(self.file_map_path, 'rb') as f:
self._file_map_data = yaml.load(f)
return self._file_map_data
def file_map(self, package_info):
var = {
'name': package_info.name,
......@@ -646,17 +663,17 @@ class InstallPlugin(Plugin):
if not self._file_map.get(key):
try:
file_map = {}
with open(self.file_map_path, 'rb') as f:
for data in yaml.load(f):
k = data['src_path']
if k[0] != '.':
k = '.%s' % os.path.join('/', k)
k = self.var_replace(k, var)
file_map[k] = InstallPlugin.FileItem(
k,
ConfigUtil.get_value_from_dict(data, 'target_path', k),
getattr(InstallPlugin.FileItemType, ConfigUtil.get_value_from_dict(data, 'type', 'FILE').upper(), None)
)
for data in self.file_map_data:
k = data['src_path']
if k[0] != '.':
k = '.%s' % os.path.join('/', k)
k = self.var_replace(k, var)
file_map[k] = InstallPlugin.FileItem(
k,
ConfigUtil.get_value_from_dict(data, 'target_path', k),
getattr(InstallPlugin.FileItemType, ConfigUtil.get_value_from_dict(data, 'type', 'FILE').upper(), None),
getattr(InstallPlugin.InstallMethod, ConfigUtil.get_value_from_dict(data, 'install_method', 'ANY').upper(), None),
)
self._file_map[key] = file_map
except:
pass
......
......@@ -31,6 +31,7 @@ from _arch import getBaseArch
from tool import DirectoryUtil, FileUtil, YamlLoader
from _manager import Manager
from _plugin import InstallPlugin
from ssh import LocalClient
class LocalPackage(Package):
......@@ -121,10 +122,15 @@ class LocalPackage(Package):
filelinktos.append(os.readlink(target_path))
filemodes.append(-24065)
else:
m = hashlib.md5()
with open(target_path, 'rb') as f:
m.update(f.read())
m_value = m.hexdigest().encode(sys.getdefaultencoding())
ret = LocalClient().execute_command('md5sum {}'.format(target_path))
if ret:
m_value = ret.stdout.strip().split(' ')[0].encode('utf-8')
else:
m = hashlib.md5()
with open(target_path, 'rb') as f:
m.update(f.read())
m_value = m.hexdigest().encode(sys.getdefaultencoding())
# raise Exception('Failed to get md5sum for {}, error: {}'.format(target_path, ret.stderr))
m_sum.update(m_value)
filemd5s.append(m_value)
filelinktos.append('')
......@@ -158,7 +164,7 @@ class Repository(PackageInfo):
return self.md5
def __str__(self):
return '%s-%s-%s' % (self.name, self.version, self.hash)
return '%s-%s-%s-%s' % (self.name, self.version, self.release, self.hash)
def __hash__(self):
return hash(self.repository_dir)
......@@ -380,48 +386,29 @@ class ComponentRepository(object):
repositories[repository.hash] = repository
return repositories
def get_repository_by_version(self, version, tag=None):
if tag:
return self.get_repository_by_tag(tag, version)
repository = self.get_repository_by_tag(self.name, version)
if repository:
return repository
path_partten = os.path.join(self.repository_dir, version, tag if tag else '*')
for path in glob(path_partten):
n_repository = Repository(self.name, path, self.stdio)
if n_repository.hash and n_repository > repository:
repository = n_repository
return repository
def get_repository_by_tag(self, tag, version=None):
path_partten = os.path.join(self.repository_dir, version if version else '*', tag)
def search_repository(self, version=None, tag=None, release=None):
path_pattern = os.path.join(self.repository_dir, version or '*', tag or '*')
repository = None
for path in glob(path_partten):
for path in glob(path_pattern):
n_repository = Repository(self.name, path, self.stdio)
if release and release != n_repository.release:
continue
if n_repository.hash and n_repository > repository:
repository = n_repository
return repository
def get_repository(self, version=None, tag=None):
if tag:
return self.get_repository_by_tag(tag, version)
if version:
return self.get_repository_by_version(version, tag)
version = None
for rep_version in os.listdir(self.repository_dir):
rep_version = Version(rep_version)
if rep_version > version:
version = rep_version
if version:
return self.get_repository_by_version(version, tag)
return None
def get_repository(self, version=None, tag=None, release=None):
if version or tag or release:
return self.search_repository(version=version, tag=tag, release=release)
else:
return self.search_repository(tag=self.name) or self.search_repository()
def get_repositories(self, version=None):
if not version:
version = '*'
repositories = []
path_partten = os.path.join(self.repository_dir, version, '*')
for path in glob(path_partten):
path_pattern = os.path.join(self.repository_dir, version, '*')
for path in glob(path_pattern):
repository = Repository(self.name, path, self.stdio)
if repository.hash:
repositories.append(repository)
......@@ -436,7 +423,7 @@ class RepositoryManager(Manager):
def __init__(self, home_path, lock_manager=None, stdio=None):
super(RepositoryManager, self).__init__(home_path, stdio=stdio)
self.repositories = {}
self.component_repositoies = {}
self.component_repositories = {}
self.lock_manager = lock_manager
def _lock(self, read_only=False):
......@@ -460,20 +447,20 @@ class RepositoryManager(Manager):
def get_repositories(self, name, version=None, instance=True):
repositories = []
for repository in self.get_component_repositoy(name).get_repositories(version):
for repository in self.get_component_repository(name).get_repositories(version):
if instance and repository.is_shadow_repository() is False:
repositories.append(repository)
return repositories
def get_repositories_view(self, name=None):
if name:
repositories = self.get_component_repositoy(name).get_repositories()
repositories = self.get_component_repository(name).get_repositories()
else:
repositories = []
path_partten = os.path.join(self.path, '*')
for path in glob(path_partten):
path_pattern = os.path.join(self.path, '*')
for path in glob(path_pattern):
_, name = os.path.split(path)
repositories += self.get_component_repositoy(name).get_repositories()
repositories += self.get_component_repository(name).get_repositories()
repositories_vo = {}
for repository in repositories:
......@@ -487,36 +474,46 @@ class RepositoryManager(Manager):
repositories_vo[repository] = self._get_repository_vo(repository)
return list(repositories_vo.values())
def get_component_repositoy(self, name):
if name not in self.component_repositoies:
def get_component_repository(self, name):
if name not in self.component_repositories:
self._lock(True)
path = os.path.join(self.path, name)
self.component_repositoies[name] = ComponentRepository(name, path, self.stdio)
return self.component_repositoies[name]
def get_repository_by_version(self, name, version, tag=None, instance=True):
if not tag:
tag = name
path = os.path.join(self.path, name, version, tag)
if path not in self.repositories:
component_repositoy = self.get_component_repositoy(name)
repository = component_repositoy.get_repository(version, tag)
if repository:
self.repositories[repository.repository_dir] = repository
self.repositories[path] = repository
self.component_repositories[name] = ComponentRepository(name, path, self.stdio)
return self.component_repositories[name]
def get_repository(self, name, version=None, tag=None, release=None, package_hash=None, instance=True):
self.stdio.verbose(
"Search repository {name} version: {version}, tag: {tag}, release: {release}, package_hash: {package_hash}".format(
name=name, version=version, tag=tag, release=release, package_hash=package_hash))
tag = tag or package_hash
component_repository = self.get_component_repository(name)
if version and tag:
repository_dir = os.path.join(self.path, name, version, tag)
if repository_dir in self.repositories:
repository = self.repositories[repository_dir]
else:
repository = component_repository.get_repository(version=version, tag=tag, release=release)
else:
repository = component_repository.get_repository(version=version, tag=tag, release=release)
if not repository:
return None
else:
repository = self.repositories[path]
if repository.repository_dir not in self.repositories:
self.repositories[repository.repository_dir] = repository
else:
repository = self.repositories[repository.repository_dir]
if not self._check_repository_pattern(repository, version=version, release=release, hash=package_hash):
return None
self.stdio.verbose("Found repository {}".format(repository))
return self.get_instance_repository_from_shadow(repository) if instance else repository
def get_repository(self, name, version=None, tag=None, instance=True):
if version:
return self.get_repository_by_version(name, version, tag)
component_repositoy = self.get_component_repositoy(name)
repository = component_repositoy.get_repository(version, tag)
if repository:
self.repositories[repository.repository_dir] = repository
return self.get_instance_repository_from_shadow(repository) if repository and instance else repository
def _check_repository_pattern(self, repository, **kwargs):
for key in ["version", "release", "hash"]:
current_value = getattr(repository, key)
if kwargs.get(key) is not None and current_value != kwargs[key]:
self.stdio.verbose("repository {} is {}, but {} is required".format(key, current_value, kwargs[key]))
return False
return True
def create_instance_repository(self, name, version, _hash):
path = os.path.join(self.path, name, version, _hash)
......@@ -534,7 +531,7 @@ class RepositoryManager(Manager):
self._lock(True)
self.repositories[path] = Repository(name, path, self.stdio)
return self.repositories[path]
repository = Repository(name, path, self.stdio)
repository = Repository(name, path, self.stdio)
repository.set_version(version)
return repository
......
......@@ -24,12 +24,16 @@ import os
import signal
import sys
import traceback
import inspect2
import six
from enum import Enum
from halo import Halo, cursor
from colorama import Fore
from prettytable import PrettyTable
from progressbar import AdaptiveETA, Bar, SimpleProgress, ETA, FileTransferSpeed, Percentage, ProgressBar
from types import MethodType
from inspect2 import Parameter
if sys.version_info.major == 3:
......@@ -74,8 +78,8 @@ class FormtatText(object):
return FormtatText.format(text, Fore.RED)
class LogSymbols(Enum):
class LogSymbols(Enum):
INFO = FormtatText.info('!')
SUCCESS = FormtatText.success('ok')
WARNING = FormtatText.warning('!!')
......@@ -112,7 +116,7 @@ class IOTable(PrettyTable):
val = 'l'
for field in self._field_names:
self._align[field] = val
class IOHalo(Halo):
......@@ -230,14 +234,14 @@ class IO(object):
WARNING_PREV = FormtatText.warning('[WARN]')
ERROR_PREV = FormtatText.error('[ERROR]')
IS_TTY = sys.stdin.isatty()
def __init__(self,
level,
msg_lv=MsgLevel.DEBUG,
trace_logger=None,
def __init__(self,
level,
msg_lv=MsgLevel.DEBUG,
trace_logger=None,
use_cache=False,
track_limit=0,
root_io=None,
track_limit=0,
root_io=None,
stream=sys.stdout
):
self.level = level
......@@ -258,7 +262,7 @@ class IO(object):
if self._root_io:
self._root_io.log_cache
return self._log_cache
def before_close(self):
if self._before_critical:
try:
......@@ -272,7 +276,7 @@ class IO(object):
def __del__(self):
self._close()
def exit(self, code):
self._close()
sys.exit(code)
......@@ -280,14 +284,14 @@ class IO(object):
def set_cache(self, status):
if status:
self._cache_on()
def _cache_on(self):
if self._root_io:
return False
if self.log_cache is None:
self._log_cache = []
return True
def _cache_off(self):
if self._root_io:
return False
......@@ -359,7 +363,7 @@ class IO(object):
finally:
self._clear_sync_ctx()
return ret
def start_loading(self, text, *arg, **kwargs):
if self.sync_obj:
return False
......@@ -405,7 +409,7 @@ class IO(object):
if not isinstance(self.sync_obj, IOProgressBar):
return False
return self._stop_sync_obj(IOProgressBar, 'interrupt')
def sub_io(self, pid=None, msg_lv=None):
if not pid:
pid = os.getpid()
......@@ -414,16 +418,20 @@ class IO(object):
key = "%s-%s" % (pid, msg_lv)
if key not in self.sub_ios:
self.sub_ios[key] = self.__class__(
self.level + 1,
msg_lv=msg_lv,
self.level + 1,
msg_lv=msg_lv,
trace_logger=self.trace_logger,
track_limit=self.track_limit,
root_io=self._root_io if self._root_io else self
)
return self.sub_ios[key]
def print_list(self, ary, field_names=None, exp=lambda x: x if isinstance(x, list) else [x], show_index=False, start=0, **kwargs):
def print_list(self, ary, field_names=None, exp=lambda x: x if isinstance(x, (list, tuple)) else [x], show_index=False, start=0, **kwargs):
if not ary:
title = kwargs.get("title", "")
empty_msg = kwargs.get("empty_msg", "{} is empty.".format(title))
if empty_msg:
self.print(empty_msg)
return
show_index = field_names is not None and show_index
if show_index:
......@@ -464,7 +472,7 @@ class IO(object):
kwargs['file'] and print(self._format(msg, *args), **kwargs)
del kwargs['file']
self.log(msg_lv, msg, *args, **kwargs)
def log(self, levelno, msg, *args, **kwargs):
self._cache_log(levelno, msg, *args, **kwargs)
......@@ -478,13 +486,11 @@ class IO(object):
else:
log_cache.append((levelno, line, args, kwargs))
def _flush_log(self):
if not self._root_io and self.trace_logger and self._log_cache:
for levelno, line, args, kwargs in self._log_cache:
self.trace_logger.log(levelno, line, *args, **kwargs)
self._log_cache = []
def _log(self, levelno, msg, *args, **kwargs):
if self.trace_logger:
self.trace_logger.log(levelno, msg, *args, **kwargs)
......@@ -560,3 +566,144 @@ class IO(object):
msg and self.error(msg)
print_stack(''.join(lines))
class _Empty(object):
pass
EMPTY = _Empty()
del _Empty
class FakeReturn(object):
def __call__(self, *args, **kwargs):
return None
def __len__(self):
return 0
FAKE_RETURN = FakeReturn()
class StdIO(object):
def __init__(self, io=None):
self.io = io
self._attrs = {}
self._warn_func = getattr(self.io, "warn", print)
def __getattr__(self, item):
if self.io is None:
return FAKE_RETURN
if item not in self._attrs:
attr = getattr(self.io, item, EMPTY)
if attr is not EMPTY:
self._attrs[item] = attr
else:
self._warn_func(FormtatText.warning("WARNING: {} has no attribute '{}'".format(self.io, item)))
self._attrs[item] = FAKE_RETURN
return self._attrs[item]
FAKE_IO = StdIO()
def get_stdio(io_obj):
if io_obj is None:
return FAKE_IO
elif isinstance(io_obj, StdIO):
return io_obj
else:
return StdIO(io_obj)
def safe_stdio_decorator(default_stdio=None):
def decorated(func):
is_bond_method = False
_type = None
if isinstance(func, (staticmethod, classmethod)):
is_bond_method = True
_type = type(func)
func = func.__func__
all_parameters = inspect2.signature(func).parameters
if "stdio" in all_parameters:
default_stdio_in_params = all_parameters["stdio"].default
if not isinstance(default_stdio_in_params, Parameter.empty):
_default_stdio = default_stdio_in_params or default_stdio
def func_wrapper(*args, **kwargs):
_params_keys = list(all_parameters.keys())
_index = _params_keys.index("stdio")
if "stdio" not in kwargs and len(args) > _index:
stdio = get_stdio(args[_index])
tmp_args = list(args)
tmp_args[_index] = stdio
args = tuple(tmp_args)
else:
stdio = get_stdio(kwargs.get("stdio", _default_stdio))
kwargs["stdio"] = stdio
return func(*args, **kwargs)
return _type(func_wrapper) if is_bond_method else func_wrapper
else:
return _type(func) if is_bond_method else func
return decorated
class SafeStdioMeta(type):
@staticmethod
def _init_wrapper_func(func):
def wrapper(*args, **kwargs):
setattr(args[0], "_wrapper_func", {})
func(*args, **kwargs)
if "stdio" in args[0].__dict__:
args[0].__dict__["stdio"] = get_stdio(args[0].__dict__["stdio"])
if func.__name__ != wrapper.__name__:
return wrapper
else:
return func
def __new__(mcs, name, bases, attrs):
for key, attr in attrs.items():
if key.startswith("__") and key.endswith("__"):
continue
if isinstance(attr, (staticmethod, classmethod)):
attrs[key] = safe_stdio_decorator()(attr)
cls = type.__new__(mcs, name, bases, attrs)
cls.__init__ = mcs._init_wrapper_func(cls.__init__)
return cls
class _StayTheSame(object):
pass
STAY_THE_SAME = _StayTheSame()
class SafeStdio(six.with_metaclass(SafeStdioMeta)):
_wrapper_func = {}
def __getattribute__(self, item):
_wrapper_func = super(SafeStdio, self).__getattribute__("_wrapper_func")
if item not in _wrapper_func:
attr = super(SafeStdio, self).__getattribute__(item)
if (not item.startswith("__") or not item.endswith("__")) and isinstance(attr, MethodType):
if "stdio" in inspect2.signature(attr).parameters:
_wrapper_func[item] = safe_stdio_decorator(default_stdio=getattr(self, "stdio", None))(attr)
return _wrapper_func[item]
_wrapper_func[item] = STAY_THE_SAME
return attr
if _wrapper_func[item] is STAY_THE_SAME:
return super(SafeStdio, self).__getattribute__(item)
return _wrapper_func[item]
def __setattr__(self, key, value):
if key in self._wrapper_func:
del self._wrapper_func[key]
return super(SafeStdio, self).__setattr__(key, value)
......@@ -30,7 +30,9 @@ from _deploy import (
ServerConfigFlyweightFactory,
ClusterConfig,
ConfigParser,
CommentedMap
CommentedMap,
RsyncConfig,
ENV
)
......@@ -85,11 +87,12 @@ class ClusterConfigParser(ConfigParser):
server_config['zone'] = zone_name
servers[server] = server_config
cluster_conf = ClusterConfig(
cluster_config = ClusterConfig(
servers.keys(),
component_name,
ConfigUtil.get_value_from_dict(conf, 'version', None, str),
ConfigUtil.get_value_from_dict(conf, 'tag', None, str),
ConfigUtil.get_value_from_dict(conf, 'release', None, str),
ConfigUtil.get_value_from_dict(conf, 'package_hash', None, str)
)
global_config = {}
......@@ -99,11 +102,17 @@ class ClusterConfigParser(ConfigParser):
global_config['appname'] = str(conf['name'])
if 'config' in conf:
global_config.update(conf['config'])
cluster_conf.set_global_conf(global_config)
cluster_config.set_global_conf(global_config)
if RsyncConfig.RSYNC in conf:
cluster_config.set_rsync_list(conf[RsyncConfig.RSYNC])
if ENV in conf:
cluster_config.set_environments(conf[ENV])
for server in servers:
cluster_conf.add_server_conf(server, servers[server])
return cluster_conf
cluster_config.add_server_conf(server, servers[server])
return cluster_config
@classmethod
def extract_inner_config(cls, cluster_config, config):
......
此差异已折叠。
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import os
from tool import YamlLoader, ConfigUtil
ALLOWED_LEVEL = [0, 1, 2]
YAML_LOADER = YamlLoader()
YAML_TEMPLATE_PATH = os.path.join(os.path.dirname(__file__), "command_template.yaml")
class CommandConfig(object):
def __init__(self, yaml_path=YAML_TEMPLATE_PATH, loader=YAML_LOADER, stdio=None):
self.yaml_path = yaml_path
self.loader = loader
self.stdio = stdio
self._load()
def _load(self):
try:
with open(self.yaml_path, 'rb') as f:
self._data = self.loader.load(f)
self.all_variables = self._data.get('variables')
self.global_variables = self.all_variables.get('global', [])
self.server_variables = self.all_variables.get('server', [])
self.ssh_variables = self.all_variables.get('ssh', [])
self.all_commands = self._data.get('commands', [])
self.all_wrappers = self._data.get('wrappers', [])
except:
if self.stdio:
self.stdio.exception('failed to load command template')
def check_opt(plugin_context, name, context, *args, **kwargs):
def get_option(key, default=''):
value = getattr(options, key, default)
if value is None:
value = default
stdio.verbose('get option: %s value %s' % (key, value))
return value
stdio = plugin_context.stdio
cluster_config = plugin_context.cluster_config
options = plugin_context.options
clients = plugin_context.clients
deployed_components = list(plugin_context.components)
components = get_option("components", None)
servers = get_option("servers", None)
interactive = False
command_config = CommandConfig()
for command in command_config.all_commands:
command_name = ConfigUtil.get_value_from_dict(command, 'name', transform_func=str)
if command_name == name:
interactive = ConfigUtil.get_value_from_dict(command, 'interactive', False, transform_func=bool)
if components is None:
if interactive:
components = deployed_components[:1]
stdio.verbose("Component {} will be used according to the order in the deploy configuration yaml.".format(components[0]))
else:
components = deployed_components
stdio.verbose("Component {} will be used because {} is a non-interactive command".format(", ".join(components), name))
elif components == "*":
components = deployed_components
else:
components = components.split(',')
if not clients:
stdio.error("{} server list is empty".format(','.join(components)))
return
if servers is None:
if interactive:
servers = [None, ]
else:
servers = list(clients.keys())
stdio.verbose("Server {} will be used because {} is a non-interactive command".format(", ".join([str(s) for s in servers]), name))
elif servers == '*':
servers = list(clients.keys())
else:
server_names = servers.split(',')
servers = []
for server in clients:
if server.name in server_names:
server_names.remove(server.name)
servers.append(server)
if server_names:
stdio.error("Server {} not found in current deployment".format(','.join(server_names)))
return
failed_components = []
for component in components:
if component not in deployed_components:
failed_components.append(component)
if failed_components:
stdio.error('{} not support. {} is allowed'.format(','.join(failed_components), deployed_components))
return plugin_context.return_false()
context.update(components=components, servers=servers, command_config=command_config)
return plugin_context.return_true(context=context)
variables:
ssh:
- name: host
config_key: host
components: ['oceanbase', 'obproxy', 'oceanbase-ce', 'obproxy-ce']
- name: user
config_key: username
components: ['oceanbase', 'obproxy', 'oceanbase-ce', 'obproxy-ce']
server:
- name: home_path
config_key: home_path
components: ['oceanbase', 'oceanbase-ce', 'obproxy', 'obproxy-ce']
- name: mysql_port
config_key: mysql_port
components: ['oceanbase', 'oceanbase-ce']
global:
- name: password
config_key: root_password
components: ['oceanbase', 'oceanbase-ce']
- name: password
config_key: observer_root_password
components: ['obproxy', 'obproxy-ce']
wrappers:
- name: ssh
remote_command: ssh {user}@{host} -t '{cmd}'
local_command: "{cmd}"
- name: ssh_client
command: "{cmd}"
executor: "ssh_client"
commands:
- name: ssh
components: ['oceanbase', 'obproxy', 'oceanbase-ce', 'obproxy-ce']
command: "cd {home_path}/log;bash --login"
wrapper: "ssh"
interactive: true
- name: less
command: "less {home_path}/log/observer.log"
components: ['oceanbase', 'oceanbase-ce']
wrapper: "ssh"
interactive: true
no_interruption: true
- name: less
command: "less {home_path}/log/obproxy.log"
components: ['obproxy', 'obproxy-ce']
wrapper: "ssh"
interactive: true
no_interruption: true
- name: pid
wrapper: ssh_client
command: "pgrep -u {user} -f ^{home_path}/bin/observer"
components: ['oceanbase', 'oceanbase-ce']
no_excption: true
- name: pid
wrapper: ssh_client
command: "pgrep -u {user} -f ^{home_path}/bin/obproxy"
components: ['obproxy', 'obproxy-ce']
no_excption: true
- name: gdb
wrapper: "ssh"
command: "cd {home_path}; LD_LIBRARY_PATH=./lib:$LD_LIBRARY_PATH gdb --pid=`$pid`"
components: ['oceanbase', 'oceanbase-ce']
interactive: true
no_interruption: true
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
try:
import subprocess32 as subprocess
except:
import subprocess
import signal
import os
from ssh import LocalClient
from tool import var_replace, COMMAND_ENV
def commands(plugin_context, context, *args, **kwargs):
def get_value_from_context(key, default=None):
value = context.get(key, default)
stdio.verbose('get value from context: %s value %s' % (key, value))
return value
stdio = plugin_context.stdio
command_template = get_value_from_context("command_template")
command_variables = get_value_from_context("command_variables", {})
interactive = get_value_from_context("interactive")
results = get_value_from_context("results", [])
failed = get_value_from_context("failed", False)
no_exception = get_value_from_context("no_exception", False)
no_interruption = get_value_from_context("no_interruption", False)
executor = get_value_from_context("executor", False)
component = get_value_from_context("component", False)
server = get_value_from_context("server", None)
env = get_value_from_context("env", {})
cmd = command_template.format(**command_variables)
cmd = var_replace(cmd, env)
if interactive:
if no_interruption:
stdio.verbose('ctrl c is not accepted in this command')
def _no_interruption(signum, frame):
stdio.verbose('ctrl c is not accepted in this command')
signal.signal(signal.SIGINT, _no_interruption)
stdio.verbose('exec cmd: {}'.format(cmd))
subprocess.call(cmd, env=os.environ.copy(), shell=True)
else:
client = plugin_context.clients[server]
if executor == "ssh_client":
ret = client.execute_command(cmd, stdio=stdio)
else:
ret = LocalClient.execute_command(cmd, env=client.env, stdio=stdio)
if ret and ret.stdout:
results.append([component, server, ret.stdout.strip()])
elif not no_exception:
failed = True
context.update(results=results, failed=failed)
return plugin_context.return_true(context=context)
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
from tool import ConfigUtil
class CommandVariables(dict):
def __getitem__(self, item):
if item not in self.items():
return item
else:
return super(CommandVariables, self).__getitem__(item)
def load_variables_from_config(variables, component, config, command_variables, stdio=None):
for variable in variables:
if component not in ConfigUtil.get_list_from_dict(variable, 'components', str):
continue
variable_name = ConfigUtil.get_value_from_dict(variable, 'name', transform_func=str)
config_key = ConfigUtil.get_value_from_dict(variable, 'config_key', transform_func=str)
value = config.get(config_key)
if value is not None:
command_variables[variable_name] = str(value)
if stdio:
stdio.verbose('get variable %s for config key %s, value is %s' % (variable_name, config_key, value))
def prepare_variables(plugin_context, name, context, component, server, *args, **kwargs):
def get_value_from_context(key, default=None):
value = context.get(key, default)
stdio.verbose('get value from context: %s value %s' % (key, value))
return value
cluster_config = plugin_context.cluster_config
stdio = plugin_context.stdio
clients = plugin_context.clients
components = get_value_from_context("components", [])
servers = get_value_from_context("servers", [])
cmd_conf = get_value_from_context("command_config")
loading_env = {}
if server is None:
server = cluster_config.servers[0]
# find command template
command_template = None
interactive = None
wrapper_name = None
no_exception = False
no_interruption = False
executor = None
command_variables = CommandVariables()
for command in cmd_conf.all_commands:
cmd_name = ConfigUtil.get_value_from_dict(command, 'name', transform_func=str)
allow_components = ConfigUtil.get_list_from_dict(command, 'components', str)
if component in allow_components:
current_command = ConfigUtil.get_value_from_dict(command, 'command', transform_func=str)
loading_env[cmd_name] = current_command
if name == cmd_name:
command_template = current_command
interactive = ConfigUtil.get_value_from_dict(command, 'interactive', transform_func=bool)
wrapper_name = ConfigUtil.get_value_from_dict(command, 'wrapper', transform_func=str)
no_exception = ConfigUtil.get_value_from_dict(command, 'no_exception', transform_func=bool)
no_interruption = ConfigUtil.get_value_from_dict(command, 'no_interruption', transform_func=bool)
if command_template is None:
stdio.error(
'There is no command {} in component {}. Please use --components to set the right component.'.format(name,
component))
return
if interactive and (len(components) > 1 or len(servers) > 1):
stdio.error('Interactive commands do not support specifying multiple components or servers.')
return
cmd_input = None
if server not in cluster_config.servers:
if interactive:
stdio.error("{} is not a server in {}".format(server, component))
return plugin_context.return_false()
else:
stdio.verbose("{} is not a server in {}".format(server, component))
return plugin_context.return_true(skip=True)
global_config = cluster_config.get_global_conf()
server_config = cluster_config.get_server_conf(server)
client = clients[server]
ssh_config = vars(client.config)
# load global config
stdio.verbose('load variables from global config')
load_variables_from_config(cmd_conf.global_variables, component, global_config, command_variables, stdio)
# load server config
stdio.verbose('load variables from server config')
load_variables_from_config(cmd_conf.server_variables, component, server_config, command_variables, stdio)
# load ssh config
stdio.verbose('load variables from ssh config')
load_variables_from_config(cmd_conf.ssh_variables, component, ssh_config, command_variables, stdio)
if wrapper_name:
for wrapper in cmd_conf.all_wrappers:
if wrapper_name == ConfigUtil.get_value_from_dict(wrapper, 'name', transform_func=str):
local_command = ConfigUtil.get_value_from_dict(wrapper, "local_command", transform_func=str)
remote_command = ConfigUtil.get_value_from_dict(wrapper, "remote_command", transform_func=str)
command = ConfigUtil.get_value_from_dict(wrapper, "command", transform_func=str)
cmd_input = ConfigUtil.get_value_from_dict(wrapper, "input", transform_func=str)
executor = ConfigUtil.get_value_from_dict(wrapper, "executor", transform_func=str)
if local_command and remote_command:
if client.is_localhost():
command = local_command
else:
command = remote_command
command_template = command.format(cmd=command_template, **command_variables)
if cmd_input:
cmd_input = cmd_input.format(cmd=command_template, **command_variables)
break
else:
stdio.error("Wrapper {} not found in component {}.".format(wrapper_name, component))
for key, value in loading_env.items():
loading_env[key] = str(value).format(**command_variables)
context.update(
command_variables=command_variables, command_config=cmd_conf, command_template=command_template,
interactive=interactive, cmd_input=cmd_input, no_exception=no_exception, no_interruption=no_interruption,
component=component, server=server, env=loading_env, executor=executor)
return plugin_context.return_true()
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
from subprocess import call, Popen, PIPE
from ssh import LocalClient
def db_connect(plugin_context, *args, **kwargs):
def get_option(key, default=''):
value = getattr(options, key)
if value is None:
value = default
stdio.verbose('get option: %s value %s' % (key, value))
return value
def local_execute_command(command, env=None, timeout=None):
return LocalClient.execute_command(command, env, timeout, stdio)
def get_connect_cmd():
cmd = r"{obclient_bin} -h{host} -P{port} -u {user}@{tenant} --prompt 'OceanBase(\u@\d)>' -A".format(
obclient_bin=obclient_bin,
host=server.ip,
port=port,
user=user,
tenant=tenant
)
if need_password:
cmd += " -p"
elif password:
cmd += " -p{}".format(password)
if database:
cmd += " -D{}".format(database)
return cmd
def test_connect():
return local_execute_command(get_connect_cmd() + " -e 'help'")
def connect():
conn_cmd = get_connect_cmd()
stdio.verbose('execute cmd: {}'.format(conn_cmd))
p = None
return_code = 255
try:
p = Popen(conn_cmd, shell=True)
return_code = p.wait()
except:
stdio.exception("")
if p:
p.kill()
stdio.verbose('exit code: {}'.format(return_code))
return return_code
options = plugin_context.options
cluster_config = plugin_context.cluster_config
stdio = plugin_context.stdio
user = get_option('user', 'root')
tenant = get_option('tenant', 'sys')
database = get_option('database')
password = get_option('password')
obclient_bin = get_option('obclient_bin')
server = get_option('server')
component = get_option('component')
global_conf = cluster_config.get_global_conf()
server_config = cluster_config.get_server_conf(server)
need_password = False
# use oceanbase if root@sys as default
if not database and user == 'root' and tenant == 'sys':
database = 'oceanbase'
if component in ["oceanbase", "oceanbase-ce"]:
port = server_config.get("mysql_port")
else:
port = server_config.get("listen_port")
if not obclient_bin:
ret = local_execute_command('%s --help' % obclient_bin)
if not ret:
stdio.error(
'%s\n%s is not an executable file. Please use `--obclient-bin` to set.\nYou may not have obclient installed' % (
ret.stderr, obclient_bin))
return
if not password:
connected = test_connect()
if not connected:
if user == "root" and tenant == "sys":
if component in ["oceanbase", "oceanbase-ce"]:
password = global_conf.get('root_password')
elif component in ["obproxy", "obproxy-ce"]:
password = global_conf.get('observer_root_password')
elif user == "root" and tenant == "proxysys":
if component in ["obproxy", "obproxy-ce"]:
password = global_conf.get("obproxy_sys_password")
elif user == "proxyro" and tenant == 'sys':
if component in ["oceanbase", "oceanbase-ce"]:
password = global_conf.get("proxyro_password")
elif component in ["obproxy", "obproxy-ce"]:
password = global_conf.get("observer_sys_password")
if password:
connected = test_connect()
need_password = not connected
try:
code = connect()
except KeyboardInterrupt:
stdio.exception("")
return False
return code == 0
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import os
import re
from _plugin import InstallPlugin
from _deploy import InnerConfigKeywords
from tool import YamlLoader
def install_repo(plugin_context, obd_home, install_repository, install_plugin, check_repository, check_file_map,
msg_lv, *args, **kwargs):
cluster_config = plugin_context.cluster_config
def install_to_home_path():
repo_dir = install_repository.repository_dir.replace(obd_home, remote_obd_home, 1)
if is_lib_repo:
home_path = os.path.join(remote_home_path, 'lib')
else:
home_path = remote_home_path
client.add_env("_repo_dir", repo_dir, True)
client.add_env("_home_path", home_path, True)
mkdir_bash = "mkdir -p ${_home_path} && cd ${_repo_dir} && find -type d | xargs -i mkdir -p ${_home_path}/{}"
if not client.execute_command(mkdir_bash):
return False
success = True
for install_file_item in install_file_items:
source = os.path.join(repo_dir, install_file_item.target_path)
target = os.path.join(home_path, install_file_item.target_path)
client.add_env("source", source, True)
client.add_env("target", target, True)
if install_file_item.install_method == InstallPlugin.InstallMethod.CP:
install_cmd = "cp -f"
else:
install_cmd = "ln -fs"
if install_file_item.type == InstallPlugin.FileItemType.DIR:
if client.execute_command("ls -1 ${source}"):
success = client.execute_command("cd ${source} && find -type f | xargs -i %(install_cmd)s ${source}/{} ${target}/{}" % {"install_cmd": install_cmd}) and success
success = client.execute_command("cd ${source} && find -type l | xargs -i %(install_cmd)s ${source}/{} ${target}/{}" % {"install_cmd": install_cmd}) and success
else:
success = client.execute_command("%(install_cmd)s ${source} ${target}" % {"install_cmd": install_cmd}) and success
return success
stdio = plugin_context.stdio
clients = plugin_context.clients
servers = cluster_config.servers
is_lib_repo = install_repository.name.endswith("-libs")
home_path_map = {}
for server in servers:
server_config = cluster_config.get_server_conf(server)
home_path_map[server] = server_config.get("home_path")
is_ln_install_mode = cluster_config.is_ln_install_mode()
# remote install repository
stdio.start_loading('Remote %s repository install' % install_repository)
stdio.verbose('Remote %s repository integrity check' % install_repository)
for server in servers:
client = clients[server]
remote_home_path = home_path_map[server]
install_file_items = install_plugin.file_map(install_repository).values()
stdio.verbose('%s %s repository integrity check' % (server, install_repository))
if is_ln_install_mode:
remote_obd_home = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
install_path = install_repository.repository_dir.replace(obd_home, remote_obd_home, 1)
else:
if is_lib_repo:
install_path = os.path.join(remote_home_path, 'lib')
else:
install_path = remote_home_path
client.execute_command('mkdir -p {}'.format(install_path))
remote_repository_data_path = os.path.join(install_path, '.data')
remote_repository_data = client.execute_command('cat %s' % remote_repository_data_path).stdout
stdio.verbose('%s %s install check' % (server, install_repository))
try:
yaml_loader = YamlLoader(stdio=stdio)
data = yaml_loader.load(remote_repository_data)
if not data:
stdio.verbose('%s %s need to be installed ' % (server, install_repository))
elif data == install_repository:
# Version sync. Check for damages (TODO)
stdio.verbose('%s %s has installed ' % (server, install_repository))
if not install_to_home_path():
stdio.error("Failed to install repository {} to {}".format(install_repository, remote_home_path))
return False
continue
else:
stdio.verbose('%s %s need to be updated' % (server, install_repository))
except:
stdio.exception('')
stdio.verbose('%s %s need to be installed ' % (server, install_repository))
stdio.verbose('%s %s installing' % (server, install_repository))
for file_item in install_file_items:
file_path = os.path.join(install_repository.repository_dir, file_item.target_path)
remote_file_path = os.path.join(install_path, file_item.target_path)
if file_item.type == InstallPlugin.FileItemType.DIR:
if os.path.isdir(file_path) and not client.put_dir(file_path, remote_file_path):
stdio.stop_loading('fail')
return False
else:
if not client.put_file(file_path, remote_file_path):
stdio.stop_loading('fail')
return False
if is_ln_install_mode:
# save data file for later comparing
client.put_file(install_repository.data_file_path, remote_repository_data_path)
# link files to home_path
install_to_home_path()
stdio.verbose('%s %s installed' % (server, install_repository.name))
stdio.stop_loading('succeed')
# check lib
lib_check = True
stdio.start_loading('Remote %s repository lib check' % check_repository)
for server in servers:
stdio.verbose('%s %s repository lib check' % (server, check_repository))
client = clients[server]
remote_home_path = home_path_map[server]
need_libs = set()
client.add_env('LD_LIBRARY_PATH', '%s/lib:' % remote_home_path, True)
for file_item in check_file_map.values():
if file_item.type == InstallPlugin.FileItemType.BIN:
remote_file_path = os.path.join(remote_home_path, file_item.target_path)
ret = client.execute_command('ldd %s' % remote_file_path)
libs = re.findall('(/?[\w+\-/]+\.\w+[\.\w]+)[\s\\n]*\=\>[\s\\n]*not found', ret.stdout)
if not libs:
libs = re.findall('(/?[\w+\-/]+\.\w+[\.\w]+)[\s\\n]*\=\>[\s\\n]*not found', ret.stderr)
if not libs and not ret:
stdio.error('Failed to execute repository lib check.')
return
need_libs.update(libs)
if need_libs:
for lib in need_libs:
getattr(stdio, msg_lv, '%s %s require: %s' % (server, check_repository, lib))
lib_check = False
client.add_env('LD_LIBRARY_PATH', '', True)
if msg_lv == 'error':
stdio.stop_loading('succeed' if lib_check else 'fail')
elif msg_lv == 'warn':
stdio.stop_loading('succeed' if lib_check else 'warn')
return plugin_context.return_true(checked=lib_check)
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import os
from _deploy import RsyncConfig
def rsync(plugin_context, *args, **kwargs):
cluster_config = plugin_context.cluster_config
stdio = plugin_context.stdio
clients = plugin_context.clients
rsync_configs = cluster_config.get_rsync_list()
if not rsync_configs:
return plugin_context.return_true()
stdio.start_loading("Synchronizing runtime dependencies")
succeed = True
for rsync_config in rsync_configs:
source_path = rsync_config.get(RsyncConfig.SOURCE_PATH)
target_path = rsync_config.get(RsyncConfig.TARGET_PATH)
if os.path.isabs(target_path):
rsync_config[RsyncConfig.TARGET_PATH] = os.path.normpath('./' + target_path)
sub_io = stdio.sub_io()
for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server)
client = clients[server]
home_path = server_config['home_path']
for rsync_config in rsync_configs:
source_path = rsync_config.get(RsyncConfig.SOURCE_PATH)
target_path = rsync_config.get(RsyncConfig.TARGET_PATH)
if os.path.isdir(source_path):
stdio.verbose('put local dir %s to %s: %s.' % (source_path, server, target_path))
if not client.put_dir(source_path, os.path.join(home_path, target_path), stdio=sub_io):
stdio.warn('failed to put local dir %s to %s: %s.' % (source_path, server, target_path))
succeed = False
elif os.path.exists(source_path):
stdio.verbose('put local file %s to %s: %s.' % (source_path, server, target_path))
if not client.put_file(source_path, os.path.join(home_path, target_path), stdio=sub_io):
stdio.warn('failed to put local file %s to %s: %s.' % (source_path, server, target_path))
succeed = False
else:
stdio.verbose('%s is not found.' % source_path)
if succeed:
stdio.stop_loading("succeed")
return plugin_context.return_true()
else:
stdio.stop_loading("fail")
return plugin_context.return_false()
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
def sync_cluster_config(plugin_context, *args, **kwargs):
cluster_config = plugin_context.cluster_config
stdio = plugin_context.stdio
for comp in ['oceanbase', 'oceanbase-ce']:
if comp in cluster_config.depends:
root_servers = {}
ob_config = cluster_config.get_depend_config(comp)
if not ob_config:
continue
odp_config = cluster_config.get_global_conf()
for server in cluster_config.get_depend_servers(comp):
config = cluster_config.get_depend_config(comp, server)
zone = config['zone']
if zone not in root_servers:
root_servers[zone] = '%s:%s' % (server.ip, config['mysql_port'])
depend_rs_list = ';'.join([root_servers[zone] for zone in root_servers])
cluster_config.update_global_conf('rs_list', depend_rs_list, save=False)
config_map = {
'observer_sys_password': 'proxyro_password',
'cluster_name': 'appname',
'observer_root_password': 'root_password'
}
for key in config_map:
ob_key = config_map[key]
if not odp_config.get(key) and ob_config.get(ob_key):
stdio.verbose("update config, key: {}, value: {}".format(key, ob_config.get(ob_key)))
cluster_config.update_global_conf(key, ob_config.get(ob_key), save=False)
break
......@@ -30,6 +30,23 @@ def check_opt(plugin_context, opt, *args, **kwargs):
server = opt['test_server']
obclient_bin = opt['obclient_bin']
mysqltest_bin = opt['mysqltest_bin']
reboot_retries = opt['reboot_retries']
if int(reboot_retries) <= 0:
stdio.error('invalid reboot-retries')
return
case_filter = opt.get('case_filter')
default_case_filter = './mysql_test/filter.py'
if case_filter is None and os.path.exists(default_case_filter):
stdio.verbose('case-filter not set and {} exists, use it'.format(default_case_filter))
opt['case_filter'] = default_case_filter
case_filter = opt.get('reboot_cases')
default_reboot_case = './mysql_test/rebootcases.py'
if case_filter is None and os.path.exists(default_reboot_case):
stdio.verbose('reboot-cases not set and {} exists, use it'.format(default_reboot_case))
opt['reboot_cases'] = default_reboot_case
if not server:
stdio.error('test server is None. please use `--test-server` to set')
......@@ -42,7 +59,7 @@ def check_opt(plugin_context, opt, *args, **kwargs):
if not ret:
mysqltest_bin = opt['mysqltest_bin'] = 'mysqltest'
if not LocalClient.execute_command('%s --help' % mysqltest_bin, stdio=stdio):
stdio.error('%s\n%s is not an executable file. please use `--mysqltest-bin` to set\nYou may not have obclient installed' % (ret.stderr, mysqltest_bin))
stdio.error('%s\n%s is not an executable file. please use `--mysqltest-bin` to set\nYou may not have mysqltest installed' % (ret.stderr, mysqltest_bin))
return
if 'suite_dir' not in opt or not os.path.exists(opt['suite_dir']):
......@@ -55,5 +72,37 @@ def check_opt(plugin_context, opt, *args, **kwargs):
if 'slb' in opt:
opt['slb_host'], opt['slb_id'] = opt['slb'].split(',')
if 'exclude' in opt and opt['exclude']:
opt['exclude'] = opt['exclude'].split(',')
cluster_config = plugin_context.cluster_config
is_obproxy = opt["component"].startswith("obproxy")
if is_obproxy:
intersection = list({'oceanbase', 'oceanbase-ce'}.intersection(set(cluster_config.depends)))
if not intersection:
stdio.warn('observer config not in the depends.')
return
ob_component = intersection[0]
global_config = cluster_config.get_depend_config(ob_component)
else:
global_config = cluster_config.get_global_conf()
cursor = opt['cursor']
opt['_enable_static_typing_engine'] = None
if '_enable_static_typing_engine' in global_config:
stdio.verbose('load engine from config')
opt['_enable_static_typing_engine'] = global_config['_enable_static_typing_engine']
else:
try:
sql = "select value from oceanbase.__all_virtual_sys_parameter_stat where name like '_enable_static_typing_engine';"
stdio.verbose('execute sql: {}'.format(sql))
cursor.execute(sql)
ret = cursor.fetchone()
stdio.verbose('query engine ret: {}'.format(ret))
if ret:
opt['_enable_static_typing_engine'] = ret.get('value')
except:
stdio.exception('')
stdio.verbose('_enable_static_typing_engine: {}'.format(opt['_enable_static_typing_engine']))
return plugin_context.return_true()
......@@ -21,25 +21,93 @@
from __future__ import absolute_import, division, print_function
import os
import sys
import re
from glob import glob
from mysqltest_lib import case_filter, succtest
from mysqltest_lib.psmallsource import psmall_source
from mysqltest_lib.psmalltest import psmall_test
import tool
from mysqltest_lib import succtest
def get_variable_from_python_file(file_path, var_name, default_file=None, default_value=None, stdio=None):
global_vars = {}
try:
stdio and stdio.verbose('read variable {} from {}'.format(var_name, file_path))
exec(open(file_path).read(), global_vars, global_vars)
except Exception as e:
stdio and stdio.warn(str(e))
if default_file:
try:
default_path = os.path.join(os.path.dirname(__file__), 'mysqltest_lib', default_file)
stdio and stdio.verbose('read variable {} from {}'.format(var_name, file_path))
exec(open(default_path).read(), global_vars, global_vars)
except Exception as ex:
stdio and stdio.warn(str(ex))
return global_vars.get(var_name, default_value)
def find_tag_test_with_file_pat(file_pattern, flag_pattern, tag, filelist):
for test in glob(file_pattern):
if "test_suite/" in test:
if os.path.dirname(test).split('/')[-2] == tag:
filelist.append(test)
continue
test_file = tool.FileUtil.open(test, 'rb')
line_num = 0
line = test_file.readline().decode('utf-8', 'ignore')
while line and line_num <= 30:
line_num += 1
matchobj = re.search(flag_pattern, line)
if matchobj:
tag_set = line.split(':')[1].split(',')
for tag_tmp in tag_set:
tag_t = tag_tmp.strip()
if tag.lower() == tag_t.lower():
filelist.append(test)
line = test_file.readline().decode('utf-8', 'ignore')
def find_tag_tests(opt, flag_pattern, tags):
filelist = []
for tag in tags:
test_pattern = os.path.join(opt['test_dir'], "*.test")
find_tag_test_with_file_pat(test_pattern, flag_pattern, tag, filelist)
test_pattern = os.path.join(opt['suite_dir'], "*/t/*.test")
find_tag_test_with_file_pat(test_pattern, flag_pattern, tag, filelist)
return filelist
def test_name(test_file):
if "test_suite/" in test_file:
suite_name = os.path.dirname(test_file).split('/')[-2]
base_name = os.path.basename(test_file).rsplit('.')[0]
return suite_name + '.' + base_name
else:
base_name = os.path.basename(test_file).rsplit('.')[0]
return base_name
def check_test(plugin_context, opt, *args, **kwargs):
stdio = plugin_context.stdio
cluster_config = plugin_context.cluster_config
tags = []
regress_suites = []
if opt.get('tags'):
tags = opt['tags'].split(',')
if opt.get('regress_suite'):
regress_suites = opt['regress_suite'].split(',')
test_set = []
has_test_point = False
basename = lambda path: os.path.basename(path)
dirname =lambda path: os.path.dirname(path)
if 'all' in opt and opt['all'] and os.path.isdir(os.path.realpath(opt['suite_dir'])):
opt['suite'] = ','.join(os.listdir(os.path.realpath(opt['suite_dir'])))
if 'psmall' in opt and opt['psmall']:
test_set = psmall_test
opt['source_limit'] = psmall_source
test_set = get_variable_from_python_file(
opt.get('psmall_test'), 'psmall_test', default_file='psmalltest.py', default_value=[], stdio=stdio)
opt['source_limit'] = get_variable_from_python_file(
opt.get('psmall_source'), 'psmall_source', default_file='psmallsource.py', default_value={}, stdio=stdio)
has_test_point = True
elif 'suite' not in opt or not opt['suite']:
if 'test_set' in opt and opt['test_set']:
test_set = opt['test_set'].split(',')
......@@ -64,17 +132,79 @@ def check_test(plugin_context, opt, *args, **kwargs):
opt['test_pattern'] = '*.test'
pat = os.path.join(path, opt['test_pattern'])
test_set_tmp = [suitename + '.' + basename(test).rsplit('.', 1)[0] for test in glob(pat)]
test_set.extend(test_set_tmp)
if "all" in opt and opt["all"]:
pat = os.path.join(opt['test_dir'], "*.test")
test_set_t = [basename(test).rsplit('.', 1)[0] for test in glob(pat)]
test_set.extend(test_set_t)
if opt["cluster_mode"]:
opt["filter"] = opt["cluster_mode"]
else:
opt["filter"] = 'c'
if opt.get("java"):
opt["filter"] = 'j'
if opt.get("ps"):
opt["filter"] = opt["filter"] + 'p'
opt['ps_protocol'] = True
if opt["component"].startswith("obproxy"):
opt["filter"] = 'proxy'
else:
test_zone = cluster_config.get_server_conf(opt['test_server'])['zone']
query = 'select zone, count(*) as a from oceanbase.__all_virtual_zone_stat group by region order by a desc limit 1'
try:
stdio.verbose('execute sql: {}'.format(query))
cursor = opt['cursor']
cursor.execute(query)
ret = cursor.fetchone()
except:
msg = 'execute sql exception: %s' % query
raise Exception(msg)
primary_zone = ret.get('zone', '')
if test_zone != primary_zone:
opt["filter"] = 'slave'
if regress_suites:
suite2tags = get_variable_from_python_file(opt.get('regress_suite_map'), 'suite2tags', default_file='regress_suite_map.py', default_value={}, stdio=stdio)
composite_suite = get_variable_from_python_file(opt.get('regress_suite_map'), 'composite_suite', default_file='regress_suite_map.py', default_value={}, stdio=stdio)
for suitename in regress_suites:
if suitename in composite_suite.keys():
regress_suite_list = composite_suite[suitename].split(',')
else:
regress_suite_list = [suitename]
for name in regress_suite_list:
if name in suite2tags.keys():
if suite2tags[name]:
tags.extend(suite2tags[name].split(','))
else:
tags.append(name)
tags = list(set(tags))
if tags:
stdio.verbose('running mysqltest by tag, all tags: {}'.format(tags))
support_test_tags = get_variable_from_python_file(
opt.get('test_tags'), 'test_tags', default_file='test_tags.py', default_value=[], stdio=stdio)
support_test_tags = list(set(support_test_tags).union(set(os.listdir(os.path.join(opt["suite_dir"])))))
diff_tags = list(set(tags).difference(set(support_test_tags)))
if len(diff_tags) > 0:
stdio.error('%s not in test_tags' % ','.join(diff_tags))
return plugin_context.return_false()
test_set_by_tag = [test_name(test) for test in find_tag_tests(opt, r"#[ \t]*tags[ \t]*:", tags)]
if has_test_point:
test_set = list(set(test_set).intersection(set(test_set_by_tag)))
else:
test_set = list(set(test_set_by_tag))
has_test_point = True
stdio.verbose('filter mode: {}'.format(opt["filter"]))
# exclude somt tests.
if 'exclude' not in opt or not opt['exclude']:
opt['exclude'] = []
test_set = filter(lambda k: k not in opt['exclude'], test_set)
if 'filter' in opt and opt['filter']:
exclude_list = getattr(case_filter, '%s_list' % opt['filter'], [])
if opt.get('case_filter'):
exclude_list = get_variable_from_python_file(opt['case_filter'], var_name='%s_list' % opt['filter'],
default_file='case_filter.py', default_value=[], stdio=stdio)
else:
exclude_list = []
test_set = filter(lambda k: k not in exclude_list, test_set)
##有all参数时重新排序,保证运行case的顺序
if 'all' in opt and opt['all'] == 'all':
test_set_suite = filter(lambda k: '.' in k, test_set)
......@@ -86,11 +216,30 @@ def check_test(plugin_context, opt, *args, **kwargs):
test_set = filter(lambda k: k not in succtest.succ_filter, test_set)
else:
test_set = sorted(test_set)
slb_host = opt.get('slb_host')
exec_id = opt.get('exec_id')
use_slb = all([slb_host is not None, exec_id is not None])
slices = opt.get('slices')
slice_idx = opt.get('slice_idx')
use_slices = all([slices is not None, slice_idx is not None])
if not use_slb and use_slices:
slices = int(slices)
slice_idx = int(slice_idx)
test_set = test_set[slice_idx::slices]
if 'mode' in opt and opt['mode'] != 'both':
if opt['mode'] == 'oracle':
not_run = '_mysql'
# test_set = filter(lambda k: not k.endswith(not_run), test_set)
test_set = filter(lambda k: k.endswith('_oracle'), test_set)
if opt['mode'] == 'mysql':
not_run = '_oracle'
test_set = filter(lambda k: not k.endswith(not_run), test_set)
opt['test_set'] = list(set(test_set))
if 'slices' in opt and opt['slices'] and 'slice_idx' in opt and opt['slice_idx']:
slices = int(opt['slices'])
slice_idx = int(opt['slice_idx'])
test_set = test_set[slice_idx::slices]
opt['test_set'] = test_set
if opt.get('reboot_cases'):
reboot_cases = get_variable_from_python_file(opt['reboot_cases'], var_name='reboot_cases',
default_file='rebootcases.py', default_value=[], stdio=stdio)
opt['reboot_cases'] = list(set(test_set).intersection(set(reboot_cases)))
else:
opt['reboot_cases'] = []
return plugin_context.return_true(test_set=test_set)
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import os
def collect_log(plugin_context, env, test_name=None, *args, **kwargs):
cluster_config = plugin_context.cluster_config
clients = plugin_context.clients
stdio = plugin_context.stdio
if not env.get('collect_log', False):
stdio.verbose('collect_log is False')
return
if test_name is None:
case_results = env.get('case_results', [])
if case_results:
test_name = case_results[-1].get('name')
if test_name is None:
stdio.verbose('Undefined: test_name')
return
log_pattern = env.get('log_pattern', '*.log')
if not env.get('log_dir'):
log_dir = os.path.join(env['var_dir'], 'log')
else:
log_dir = env['log_dir']
is_obproxy = env["component"].startswith("obproxy")
ob_component = env["component"]
if is_obproxy:
intersection = list({'oceanbase', 'oceanbase-ce'}.intersection(set(cluster_config.depends)))
if not intersection:
stdio.warn('observer config not in the depends.')
return
ob_component = intersection[0]
ob_services = cluster_config.get_depend_servers(ob_component)
proxy_services = cluster_config.servers
else:
ob_services = cluster_config.servers
proxy_services = []
collect_components = env.get('collect_components')
if not collect_components:
collect_components = [ob_component]
else:
collect_components = collect_components.split(',')
if ob_component in collect_components:
for server in ob_services:
if is_obproxy:
server_config = cluster_config.get_depend_config(ob_component, server)
else:
server_config = cluster_config.get_server_conf(server)
ip = server.ip
port = server_config.get('mysql_port', 0)
client = clients[server]
home_path = server_config['home_path']
remote_path = os.path.join(home_path, 'log', log_pattern)
local_path = os.path.join(log_dir, test_name, '{}:{}'.format(ip, port))
stdio.start_loading('Collect log for {}'.format(server.name))
sub_io = stdio.sub_io()
client.get_dir(local_path, os.path.join(home_path, 'core.*'), stdio=sub_io)
if client.get_dir(local_path, remote_path, stdio=sub_io):
stdio.stop_loading('succeed')
else:
stdio.stop_loading('fail')
if 'obproxy' in collect_components:
if not is_obproxy:
stdio.warn('No obproxy detected.')
return
for server in proxy_services:
server_config = cluster_config.get_server_conf(server)
ip = server.ip
port = server_config.get('listen_port', 0)
client = clients[server]
home_path = server_config['home_path']
remote_path = os.path.join(home_path, 'log')
local_path = os.path.join(log_dir, test_name, '{}:{}'.format(ip, port))
stdio.start_loading('Collect obproxy log for {}'.format(server.name))
if client.get_dir(local_path, remote_path):
stdio.stop_loading('succeed')
else:
stdio.stop_loading('fail')
\ No newline at end of file
此差异已折叠。
......@@ -28,7 +28,7 @@ global_ret = True
def destroy(plugin_context, *args, **kwargs):
def clean(server, path):
client = clients[server]
ret = client.execute_command('rm -fr %s/' % (path))
ret = client.execute_command('rm -fr %s/' % (path), timeout=-1)
if not ret:
global global_ret
global_ret = False
......
......@@ -4,4 +4,5 @@
mode: 755
- src_path: ./home/admin/obagent/conf
target_path: conf
type: dir
\ No newline at end of file
type: dir
install_method: cp
\ No newline at end of file
......@@ -29,6 +29,7 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
stdio = plugin_context.stdio
global_ret = True
force = getattr(plugin_context.options, 'force', False)
clean = getattr(plugin_context.options, 'clean', False)
stdio.start_loading('Initializes obagent work home')
for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server)
......@@ -37,7 +38,18 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
remote_home_path = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
stdio.verbose('%s init cluster work home', server)
if force:
need_clean = force
if clean and not force:
if client.execute_command('bash -c \'if [[ "$(ls -d {0} 2>/dev/null)" != "" && ! -O {0} ]]; then exit 0; else exit 1; fi\''.format(home_path)):
owner = client.execute_command("ls -ld %s | awk '{print $3}'" % home_path).stdout.strip()
global_ret = False
err_msg = ' {} is not empty, and the owner is {}'.format(home_path, owner)
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=err_msg))
continue
need_clean = True
if need_clean:
client.execute_command("pkill -9 -u `whoami` -f '^%s/bin/monagent -c conf/monagent.yaml'" % home_path)
ret = client.execute_command('rm -fr %s' % home_path)
if not ret:
global_ret = False
......@@ -55,10 +67,7 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.CREATE_FAILED.format(path=home_path)))
continue
if not (client.execute_command("bash -c 'mkdir -p %s/{run,bin,lib,conf,log}'" % (home_path)) \
and client.execute_command("cp -r %s/conf %s/" % (remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/bin ]; then ln -fs %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -fs %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))):
if not client.execute_command("bash -c 'mkdir -p %s/{run,bin,lib,conf,log}'" % home_path):
global_ret = False
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.PATH_ONLY.format(path=home_path)))
......
......@@ -226,7 +226,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
server_config[key] = ''
if isinstance(server_config[key], bool):
server_config[key] = str(server_config[key]).lower()
if server_config.get('crypto_method', 'plain').lower() == 'aes':
secret_key = generate_aes_b64_key()
crypto_path = server_config.get('crypto_path', 'conf/.config_secret.key')
......@@ -247,20 +247,8 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if not client.put_file(tf.name, path.replace(repository_dir, home_path)):
stdio.error(EC_OBAGENT_SEND_CONFIG_FAILED.format(server=server))
stdio.stop_loading('fail')
return
for path in glob(os.path.join(repository_dir, 'conf/*/*')):
if path.endswith('.yaml'):
continue
if os.path.isdir(path):
ret = client.put_dir(path, path.replace(repository_dir, home_path))
else:
ret = client.put_file(path, path.replace(repository_dir, home_path))
if not ret:
stdio.error(EC_OBAGENT_SEND_CONFIG_FAILED.format(server=server))
stdio.stop_loading('fail')
return
return
config = {
'log': {
'level': server_config.get('log_level', 'info'),
......@@ -287,7 +275,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if not client.put_file(tf.name, os.path.join(home_path, 'conf/monagent.yaml')):
stdio.error(EC_OBAGENT_SEND_CONFIG_FAILED.format(server=server))
stdio.stop_loading('fail')
return
return
log_path = '%s/log/monagent_stdout.log' % home_path
client.execute_command('cd %s;nohup %s/bin/monagent -c conf/monagent.yaml >> %s 2>&1 & echo $! > %s' % (home_path, home_path, log_path, remote_pid_path))
......
......@@ -38,16 +38,6 @@ def upgrade(plugin_context, search_py_script_plugin, apply_param_plugin, *args,
repository_dir = dest_repository.repository_dir
kwargs['repository_dir'] = repository_dir
for server in cluster_config.servers:
client = clients[server]
server_config = cluster_config.get_server_conf(server)
home_path = server_config['home_path']
remote_home_path = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
client.execute_command("bash -c 'mkdir -p %s/{bin,lib}'" % (home_path))
client.execute_command("ln -fs %s/bin/* %s/bin" % (remote_repository_dir, home_path))
client.execute_command("ln -fs %s/lib/* %s/lib" % (remote_repository_dir, home_path))
stop_plugin = search_py_script_plugin([cur_repository], 'stop')[cur_repository]
start_plugin = search_py_script_plugin([dest_repository], 'start')[dest_repository]
connect_plugin = search_py_script_plugin([dest_repository], 'connect')[dest_repository]
......
......@@ -243,18 +243,6 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
stdio.error(EC_OBAGENT_SEND_CONFIG_FAILED.format(server=server))
stdio.stop_loading('fail')
return
for path in glob(os.path.join(repository_dir, 'conf/*/*')):
if path.endswith('.yaml'):
continue
if os.path.isdir(path):
ret = client.put_dir(path, path.replace(repository_dir, home_path))
else:
ret = client.put_file(path, path.replace(repository_dir, home_path))
if not ret:
stdio.error(EC_OBAGENT_SEND_CONFIG_FAILED.format(server=server))
stdio.stop_loading('fail')
return
config = {
'log': {
......
......@@ -28,7 +28,7 @@ global_ret = True
def destroy(plugin_context, *args, **kwargs):
def clean(server, path):
client = clients[server]
ret = client.execute_command('rm -fr %s/' % (path))
ret = client.execute_command('rm -fr %s/' % (path), timeout=-1)
if not ret:
# pring stderror
global global_ret
......
......@@ -28,7 +28,9 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
stdio = plugin_context.stdio
global_ret = True
force = getattr(plugin_context.options, 'force', False)
clean = getattr(plugin_context.options, 'clean', False)
stdio.start_loading('Initializes obproxy work home')
for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server)
client = clients[server]
......@@ -36,15 +38,25 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
remote_home_path = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
stdio.verbose('%s init cluster work home', server)
if force:
need_clean = force
if clean and not force:
if client.execute_command('bash -c \'if [[ "$(ls -d {0} 2>/dev/null)" != "" && ! -O {0} ]]; then exit 0; else exit 1; fi\''.format(home_path)):
owner = client.execute_command("ls -ld %s | awk '{print $3}'" % home_path).stdout.strip()
global_ret = False
err_msg = ' {} is not empty, and the owner is {}'.format(home_path, owner)
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=err_msg))
continue
need_clean = True
if need_clean:
client.execute_command("pkill -9 -u `whoami` -f '^bash {home_path}/obproxyd.sh {home_path} {ip} {port} daemon$'".format(home_path=home_path, ip=server.ip, port=server_config.get('listen_port')))
client.execute_command("pkill -9 -u `whoami` -f '^%s/bin/obproxy --listen_port %s'" % (home_path, server_config.get('listen_port')))
ret = client.execute_command('rm -fr %s' % home_path)
if not ret:
global_ret = False
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=ret.stderr))
continue
if not (client.execute_command("bash -c 'mkdir -p %s/{run,bin,lib}'" % (home_path)) \
and client.execute_command("if [ -d %s/bin ]; then ln -fs %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -fs %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))):
if not client.execute_command("bash -c 'mkdir -p %s/{run,bin,lib}'" % home_path):
global_ret = False
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.NOT_EMPTY.format(path=home_path)))
......
......@@ -22,7 +22,7 @@ from __future__ import absolute_import, division, print_function
import os
import time
from copy import deepcopy
stdio = None
......@@ -49,7 +49,7 @@ def confirm_port(client, pid, port):
def confirm_command(client, pid, command):
command = command.replace(' ', '').strip()
if client.execute_command('bash -c \'cmd=`cat /proc/%s/cmdline`; if [ "$cmd" != "%s" ]; then exot 1; fi\'' % (pid, command)):
if client.execute_command('bash -c \'cmd=`cat /proc/%s/cmdline`; if [ "$cmd" != "%s" ]; then exit 1; fi\'' % (pid, command)):
return True
return False
......@@ -86,6 +86,26 @@ def obproxyd(home_path, client, ip, port):
return False
class EnvVariables(object):
def __init__(self, environments, client):
self.environments = environments
self.client = client
self.env_done = {}
def __enter__(self):
for env_key, env_value in self.environments.items():
self.env_done[env_key] = self.client.get_env(env_key)
self.client.add_env(env_key, env_value, True)
def __exit__(self, *args, **kwargs):
for env_key, env_value in self.env_done.items():
if env_value is not None:
self.client.add_env(env_key, env_value, True)
else:
self.client.del_env(env_key)
def start(plugin_context, local_home_path, repository_dir, need_bootstrap=False, *args, **kwargs):
global stdio
cluster_config = plugin_context.cluster_config
......@@ -152,13 +172,6 @@ def start(plugin_context, local_home_path, repository_dir, need_bootstrap=False,
server_config = cluster_config.get_server_conf(server)
home_path = server_config['home_path']
if client.execute_command("bash -c 'if [ -f %s/bin/obproxy ]; then exit 1; else exit 0; fi;'" % home_path):
remote_home_path = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
client.execute_command("bash -c 'mkdir -p %s/{bin,lib}'" % (home_path))
client.execute_command("ln -fs %s/bin/* %s/bin" % (remote_repository_dir, home_path))
client.execute_command("ln -fs %s/lib/* %s/lib" % (remote_repository_dir, home_path))
pid_path[server] = "%s/run/obproxy-%s-%s.pid" % (home_path, server.ip, server_config["listen_port"])
if use_parameter:
......@@ -187,6 +200,7 @@ def start(plugin_context, local_home_path, repository_dir, need_bootstrap=False,
clusters_cmd[server] = 'cd %s; %s' % (home_path, real_cmd[server])
for server in clusters_cmd:
environments = deepcopy(cluster_config.get_environments())
client = clients[server]
server_config = cluster_config.get_server_conf(server)
port = int(server_config["listen_port"])
......@@ -204,9 +218,10 @@ def start(plugin_context, local_home_path, repository_dir, need_bootstrap=False,
return plugin_context.return_false()
stdio.verbose('starting %s obproxy', server)
client.add_env('LD_LIBRARY_PATH', '%s/lib:' % server_config['home_path'], True)
ret = client.execute_command(clusters_cmd[server])
client.add_env('LD_LIBRARY_PATH', '', True)
if 'LD_LIBRARY_PATH' not in environments:
environments['LD_LIBRARY_PATH'] = '%s/lib:' % server_config['home_path']
with EnvVariables(environments, client):
ret = client.execute_command(clusters_cmd[server])
if not ret:
stdio.stop_loading('fail')
stdio.error('failed to start %s obproxy: %s' % (server, ret.stderr))
......
......@@ -38,16 +38,6 @@ def upgrade(plugin_context, search_py_script_plugin, apply_param_plugin, *args,
repository_dir = dest_repository.repository_dir
kwargs['repository_dir'] = repository_dir
for server in cluster_config.servers:
client = clients[server]
server_config = cluster_config.get_server_conf(server)
home_path = server_config['home_path']
remote_home_path = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
client.execute_command("bash -c 'mkdir -p %s/{bin,lib}'" % (home_path))
client.execute_command("ln -fs %s/bin/* %s/bin" % (remote_repository_dir, home_path))
client.execute_command("ln -fs %s/lib/* %s/lib" % (remote_repository_dir, home_path))
stop_plugin = search_py_script_plugin([cur_repository], 'stop')[cur_repository]
start_plugin = search_py_script_plugin([dest_repository], 'start')[dest_repository]
connect_plugin = search_py_script_plugin([dest_repository], 'connect')[dest_repository]
......
......@@ -29,11 +29,13 @@ from _errno import EC_OBSERVER_CAN_NOT_MIGRATE_IN
def parse_size(size):
_bytes = 0
if isinstance(size, str):
size = size.strip()
if not isinstance(size, str) or size.isdigit():
_bytes = int(size)
else:
units = {"B": 1, "K": 1<<10, "M": 1<<20, "G": 1<<30, "T": 1<<40}
match = re.match(r'([1-9][0-9]*)\s*([B,K,M,G,T])', size.upper())
match = re.match(r'^([1-9][0-9]*)\s*([B,K,M,G,T])$', size.upper())
_bytes = int(match.group(1)) * units[match.group(2)]
return _bytes
......@@ -59,6 +61,16 @@ def create_tenant(plugin_context, cursor, *args, **kwargs):
if not value:
value = default
return value
def get_parsed_option(key, default=''):
value = get_option(key=key, default=default)
try:
parsed_value = parse_size(value)
except:
stdio.exception("")
raise Exception("Invalid option {}: {}".format(key, value))
return parsed_value
def error(*arg, **kwargs):
stdio.error(*arg, **kwargs)
stdio.stop_loading('fail')
......@@ -70,6 +82,11 @@ def create_tenant(plugin_context, cursor, *args, **kwargs):
stdio = plugin_context.stdio
options = plugin_context.options
mode = get_option('mode', 'mysql').lower()
if not mode in ['mysql', 'oracle']:
error('No such tenant mode: %s.\n--mode must be `mysql` or `oracle`' % mode)
return
name = get_option('tenant_name', 'test')
unit_name = '%s_unit' % name
pool_name = '%s_pool' % name
......@@ -153,7 +170,7 @@ def create_tenant(plugin_context, cursor, *args, **kwargs):
exception('execute sql exception: %s' % sql)
return
units_id = set()
units_id = {}
res = cursor.fetchall()
for row in res:
if str(row['name']) == unit_name:
......@@ -162,7 +179,8 @@ def create_tenant(plugin_context, cursor, *args, **kwargs):
continue
for zone in str(row['zone_list']).replace(';', ',').split(','):
if zone in zones:
units_id.add(row['unit_config_id'])
unit_config_id = row['unit_config_id']
units_id[unit_config_id] = units_id.get(unit_config_id, 0) + 1
break
sql = 'select * from oceanbase.__all_unit_config order by name'
......@@ -178,8 +196,8 @@ def create_tenant(plugin_context, cursor, *args, **kwargs):
if str(row['name']) == unit_name:
unit_name += '1'
if row['unit_config_id'] in units_id:
cpu_total -= row['max_cpu']
mem_total -= row['max_memory']
cpu_total -= row['max_cpu'] * units_id[row['unit_config_id']]
mem_total -= row['max_memory'] * units_id[row['unit_config_id']]
# disk_total -= row['max_disk_size']
MIN_CPU = 2
......@@ -194,13 +212,18 @@ def create_tenant(plugin_context, cursor, *args, **kwargs):
if disk_total < MIN_DISK_SIZE:
return error('%s: resource not enough: disk space less than %s' % (zone_list, format_size(MIN_DISK_SIZE)))
try:
max_memory = get_parsed_option('max_memory', mem_total)
max_disk_size = get_parsed_option('max_disk_size', disk_total)
min_memory = get_parsed_option('min_memory', max_memory)
except Exception as e:
error(e)
return
max_cpu = get_option('max_cpu', cpu_total)
max_memory = parse_size(get_option('max_memory', mem_total))
max_iops = get_option('max_iops', MIN_IOPS)
max_disk_size = parse_size(get_option('max_disk_size', disk_total))
max_session_num = get_option('max_session_num', MIN_SESSION_NUM)
min_cpu = get_option('min_cpu', max_cpu)
min_memory = parse_size(get_option('min_memory', max_memory))
min_iops = get_option('min_iops', max_iops)
if cpu_total < max_cpu:
......@@ -258,7 +281,7 @@ def create_tenant(plugin_context, cursor, *args, **kwargs):
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
except:
exception('faild to crate pool, execute sql exception: %s' % sql)
exception('failed to create pool, execute sql exception: %s' % sql)
return
# create tenant
......@@ -274,8 +297,12 @@ def create_tenant(plugin_context, cursor, *args, **kwargs):
sql += ", default tablegroup ='%s'" % tablegroup
if locality:
sql += ", locality = '%s'" % locality
set_mode = "ob_compatibility_mode = '%s'" % mode
if variables:
sql += "set %s" % variables
sql += "set %s, %s" % (variables, set_mode)
else:
sql += "set %s" % set_mode
try:
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
......
......@@ -28,7 +28,7 @@ global_ret = True
def destroy(plugin_context, *args, **kwargs):
def clean(server, path):
client = clients[server]
ret = client.execute_command('rm -fr %s/' % (path))
ret = client.execute_command('rm -fr %s/' % (path), timeout=-1)
if not ret:
# print stderror
global global_ret
......
......@@ -4,4 +4,5 @@
mode: 755
- src_path: ./home/admin/oceanbase/etc
target_path: etc
type: dir
\ No newline at end of file
type: dir
install_method: cp
\ No newline at end of file
......@@ -34,6 +34,7 @@ def critical(*arg, **kwargs):
global_ret = False
stdio.error(*arg, **kwargs)
def init_dir(server, client, key, path, link_path=None):
if force:
ret = client.execute_command('rm -fr %s' % path)
......@@ -66,6 +67,7 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
stdio = plugin_context.stdio
servers_dirs = {}
force = getattr(plugin_context.options, 'force', False)
clean = getattr(plugin_context.options, 'clean', False)
stdio.verbose('option `force` is %s' % force)
stdio.start_loading('Initializes observer work home')
for server in cluster_config.servers:
......@@ -102,9 +104,20 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
'server': server,
'key': key,
}
stdio.verbose('%s initializes observer work home' % server)
if force:
need_clean = force
if clean and not force:
if client.execute_command('bash -c \'if [[ "$(ls -d {0} 2>/dev/null)" != "" && ! -O {0} ]]; then exit 0; else exit 1; fi\''.format(home_path)):
owner = client.execute_command("ls -ld %s | awk '{print $3}'" % home_path).stdout.strip()
err_msg = ' {} is not empty, and the owner is {}'.format(home_path, owner)
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=err_msg))
continue
need_clean = True
if need_clean:
client.execute_command(
"pkill -9 -u `whoami` -f '^%s/bin/observer -p %s'" % (home_path, server_config['mysql_port']))
ret = client.execute_command('rm -fr %s/*' % home_path)
if not ret:
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=ret.stderr))
......@@ -117,12 +130,10 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
continue
else:
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.CREATE_FAILED.format(path=home_path)))
ret = client.execute_command('bash -c "mkdir -p %s/{etc,admin,.conf,log,bin,lib}"' % home_path) \
and client.execute_command("if [ -d %s/bin ]; then ln -fs %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -fs %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))
ret = client.execute_command('bash -c "mkdir -p %s/{etc,admin,.conf,log,bin,lib}"' % home_path)
if ret:
data_path = server_config['data_dir']
if force:
if need_clean:
ret = client.execute_command('rm -fr %s/*' % data_path)
if not ret:
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='data dir', msg=InitDirFailedErrorMessage.PERMISSION_DENIED.format(path=data_path)))
......@@ -165,7 +176,6 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='data dir', msg=InitDirFailedErrorMessage.PATH_ONLY.format(path=data_path)))
else:
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.PERMISSION_DENIED.format(path=home_path)))
if global_ret:
stdio.stop_loading('succeed')
plugin_context.return_true()
......
此差异已折叠。
......@@ -20,7 +20,6 @@
from __future__ import absolute_import, division, print_function
import json
import time
import requests
......@@ -29,9 +28,11 @@ def config_url(ocp_config_server, appname, cid):
cfg_url = '%s&Action=ObRootServiceInfo&ObCluster=%s' % (ocp_config_server, appname)
proxy_cfg_url = '%s&Action=GetObProxyConfig&ObRegionGroup=%s' % (ocp_config_server, appname)
# 清除集群URL内容命令
cleanup_config_url_content = '%s&Action=DeleteObRootServiceInfoByClusterName&ClusterName=%s' % (ocp_config_server, appname)
cleanup_config_url_content = '%s&Action=DeleteObRootServiceInfoByClusterName&ClusterName=%s' % (
ocp_config_server, appname)
# 注册集群信息到Config URL命令
register_to_config_url = '%s&Action=ObRootServiceRegister&ObCluster=%s&ObClusterId=%s' % (ocp_config_server, appname, cid)
register_to_config_url = '%s&Action=ObRootServiceRegister&ObCluster=%s&ObClusterId=%s' % (
ocp_config_server, appname, cid)
return cfg_url, cleanup_config_url_content, register_to_config_url
......@@ -40,7 +41,7 @@ def get_port_socket_inode(client, port):
cmd = "bash -c 'cat /proc/net/{tcp,udp}' | awk -F' ' '{print $2,$10}' | grep '00000000:%s' | awk -F' ' '{print $2}' | uniq" % port
res = client.execute_command(cmd)
if not res or not res.stdout.strip():
return False
return []
return res.stdout.strip().split('\n')
......@@ -62,7 +63,6 @@ def stop(plugin_context, *args, **kwargs):
clients = plugin_context.clients
stdio = plugin_context.stdio
global_config = cluster_config.get_global_conf()
global_config = cluster_config.get_global_conf()
appname = global_config['appname'] if 'appname' in global_config else None
cluster_id = global_config['cluster_id'] if 'cluster_id' in global_config else None
obconfig_url = global_config['obconfig_url'] if 'obconfig_url' in global_config else None
......@@ -118,6 +118,14 @@ def stop(plugin_context, *args, **kwargs):
servers = tmp_servers
count -= 1
if count and servers:
if count == 5:
for server in servers:
data = servers[server]
server_config = cluster_config.get_server_conf(server)
client = clients[server]
client.execute_command(
"if [[ -d /proc/%s ]]; then pkill -9 -u `whoami` -f '%s/bin/observer -p %s';fi" %
(data['pid'], server_config['home_path'], server_config['mysql_port']))
time.sleep(3)
if servers:
......
此差异已折叠。
......@@ -212,7 +212,7 @@ def pre_test(plugin_context, cursor, odp_cursor, *args, **kwargs):
user = get_option('user', 'root')
password = get_option('password', '')
warehouses = get_option('warehouses', cpu_total * 20)
load_workers = get_option('load_workers', int(min(min_cpu, (max_memory >> 30) / 2)))
load_workers = get_option('load_workers', int(max(min(min_cpu, (max_memory >> 30) / 2), 1)))
terminals = get_option('terminals', min(cpu_total * 15, warehouses * 10))
run_mins = get_option('run_mins', 10)
test_only = get_option('test_only')
......
此差异已折叠。
requests==2.24.0
rpmfile==1.0.8
paramiko==2.10.1
paramiko==2.7.2
backports.lzma==0.0.14
MySQL-python==1.2.5
ruamel.yaml.clib==0.2.2
......@@ -11,3 +11,6 @@ enum34==1.1.6
progressbar==2.5
halo==0.0.30
pycryptodome==3.10.1
inspect2==0.1.2
six==1.16.0
pyinstaller==3.6
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册