From 5f33e4c80c05050e1687f4b581a08122ab6e9fef Mon Sep 17 00:00:00 2001 From: Rongfeng Fu Date: Mon, 28 Jun 2021 10:29:50 +0800 Subject: [PATCH] 1.0.1 (#13) * update example * 1.0.1 * 1.0.1 --- _cmd.py | 47 ++- _deploy.py | 1 + _mirror.py | 160 ++++++---- _plugin.py | 18 +- _repository.py | 119 ++++--- _rpm.py | 97 +++++- build.sh | 10 +- core.py | 300 ++++++++++++++---- example/distributed-example.yaml | 1 + example/distributed-with-obproxy-example.yaml | 8 +- example/local-example.yaml | 3 +- example/mini-distributed-example.yaml | 1 + ...mini-distributed-with-obproxy-example.yaml | 8 +- example/mini-local-example.yaml | 1 + example/mini-single-example.yaml | 1 + example/mini-single-with-obproxy-example.yaml | 8 +- example/single-example.yaml | 3 +- example/single-with-obproxy-example.yaml | 13 +- ob-deploy.spec | 108 +++++++ plugins/obproxy/3.1.0/bootstrap.py | 2 +- plugins/obproxy/3.1.0/connect.py | 16 +- plugins/obproxy/3.1.0/destroy.py | 4 +- plugins/obproxy/3.1.0/start.py | 17 +- plugins/obproxy/3.1.0/start_check.py | 41 ++- plugins/obproxy/3.1.0/stop.py | 2 +- plugins/obproxy/3.1.0/upgrade.py | 41 +++ plugins/oceanbase/3.1.0/bootstrap.py | 10 +- plugins/oceanbase/3.1.0/connect.py | 15 +- plugins/oceanbase/3.1.0/display.py | 33 +- plugins/oceanbase/3.1.0/init.py | 116 +++++-- plugins/oceanbase/3.1.0/parameter.yaml | 33 +- plugins/oceanbase/3.1.0/reload.py | 5 +- plugins/oceanbase/3.1.0/start.py | 5 + plugins/oceanbase/3.1.0/start_check.py | 144 ++++++--- plugins/oceanbase/3.1.0/stop.py | 10 +- plugins/oceanbase/3.1.0/upgrade.py | 54 ++++ profile/obd.sh | 4 +- tool.py | 41 ++- 38 files changed, 1182 insertions(+), 318 deletions(-) create mode 100644 ob-deploy.spec create mode 100644 plugins/obproxy/3.1.0/upgrade.py create mode 100644 plugins/oceanbase/3.1.0/upgrade.py diff --git a/_cmd.py b/_cmd.py index 61a88b9..d119fd2 100644 --- a/_cmd.py +++ b/_cmd.py @@ -35,7 +35,10 @@ from tool import DirectoryUtil, FileUtil ROOT_IO = IO(1) -VERSION = '1.0.0' +VERSION = '1.0.1' +REVISION = '' +BUILD_BRANCH = '' +BUILD_TIME = '' class BaseCommand(object): @@ -92,19 +95,17 @@ class ObdCommand(BaseCommand): version_fobj = FileUtil.open(version_path, 'a+', stdio=ROOT_IO) version_fobj.seek(0) version = version_fobj.read() - if VERSION.split('.') > version.split('.'): + if VERSION != version: obd_plugin_path = os.path.join(self.OBD_PATH, 'plugins') if DirectoryUtil.mkdir(self.OBD_PATH): root_plugin_path = '/usr/obd/plugins' if os.path.exists(root_plugin_path): - ROOT_IO.verbose('copy %s to %s' % (root_plugin_path, obd_plugin_path)) DirectoryUtil.copy(root_plugin_path, obd_plugin_path, ROOT_IO) obd_mirror_path = os.path.join(self.OBD_PATH, 'mirror') obd_remote_mirror_path = os.path.join(self.OBD_PATH, 'mirror/remote') if DirectoryUtil.mkdir(obd_mirror_path): root_remote_mirror = '/usr/obd/mirror/remote' if os.path.exists(root_remote_mirror): - ROOT_IO.verbose('copy %s to %s' % (root_remote_mirror, obd_remote_mirror_path)) DirectoryUtil.copy(root_remote_mirror, obd_remote_mirror_path, ROOT_IO) version_fobj.seek(0) version_fobj.truncate() @@ -375,10 +376,11 @@ class ClusterRedeployCommand(ClusterMirrorCommand): def __init__(self): super(ClusterRedeployCommand, self).__init__('redeploy', 'redeploy a cluster had started') + self.parser.add_option('-f', '--force-kill', action='store_true', help="force kill when observer is running") def _do_command(self, obd): if self.cmds: - return obd.redeploy_cluster(self.cmds[0]) + return obd.redeploy_cluster(self.cmds[0], self.opts) else: return self._show_help() @@ -419,6 +421,19 @@ class ClusterEditConfigCommand(ClusterMirrorCommand): return self._show_help() +class CLusterUpgradeCommand(ClusterMirrorCommand): + + def __init__(self): + super(CLusterUpgradeCommand, self).__init__('upgrade', 'upgrade cluster') + self.parser.add_option('-f', '--force', action='store_true', help="force upgrade") + self.parser.add_option('-c', '--components', type='string', help="the updated component list, use `,` interval") + + def _do_command(self, obd): + if self.cmds: + return obd.upgrade_cluster(self.cmds[0], self.opts) + else: + return self._show_help() + class ClusterMajorCommand(MajorCommand): def __init__(self): @@ -433,6 +448,7 @@ class ClusterMajorCommand(MajorCommand): self.register_command(ClusterRedeployCommand()) self.register_command(ClusterEditConfigCommand()) self.register_command(ClusterReloadCommand()) + self.register_command(CLusterUpgradeCommand()) class TestMirrorCommand(ObdCommand): @@ -492,6 +508,21 @@ class BenchMajorCommand(MajorCommand): super(BenchMajorCommand, self).__init__('bench', '') +class UpdateCommand(ObdCommand): + + def __init__(self): + super(UpdateCommand, self).__init__('update', 'update obd') + + def do_command(self): + if os.getuid() != 0: + ROOT_IO.error('You need to be root to perform this command.') + return False + return super(UpdateCommand, self).do_command() + + def _do_command(self, obd): + return obd.update_obd(VERSION) + + class MainCommand(MajorCommand): def __init__(self): @@ -499,11 +530,15 @@ class MainCommand(MajorCommand): self.register_command(MirrorMajorCommand()) self.register_command(ClusterMajorCommand()) self.register_command(TestMajorCommand()) + self.register_command(UpdateCommand()) self.parser.version = '''OceanBase Deploy: %s +REVISION: %s +BUILD_BRANCH: %s +BUILD_TIME: %s Copyright (C) 2021 OceanBase License GPLv3+: GNU GPL version 3 or later . This is free software: you are free to change and redistribute it. -There is NO WARRANTY, to the extent permitted by law.''' % (VERSION) +There is NO WARRANTY, to the extent permitted by law.''' % (VERSION, REVISION, BUILD_BRANCH, BUILD_TIME) self.parser._add_version_option() if __name__ == '__main__': diff --git a/_deploy.py b/_deploy.py index 81a7dee..a2f9cf9 100644 --- a/_deploy.py +++ b/_deploy.py @@ -94,6 +94,7 @@ class ClusterConfig(object): self.version = version self.tag = tag self.name = name + self.origin_package_hash = package_hash self.package_hash = package_hash self._temp_conf = {} self._default_conf = {} diff --git a/_mirror.py b/_mirror.py index b7a8554..bc632a0 100644 --- a/_mirror.py +++ b/_mirror.py @@ -30,6 +30,7 @@ import string import requests from glob import glob from enum import Enum +from copy import deepcopy from xml.etree import cElementTree try: from ConfigParser import ConfigParser @@ -37,7 +38,7 @@ except: from configparser import ConfigParser from _arch import getArchList, getBaseArch -from _rpm import Package +from _rpm import Package, PackageInfo from tool import ConfigUtil, FileUtil from _manager import Manager @@ -45,17 +46,31 @@ from _manager import Manager _KEYCRE = re.compile(r"\$(\w+)") _ARCH = getArchList() _RELEASE = None -for path in glob('/etc/*-release'): - with FileUtil.open(path) as f: - info = f.read() - m = re.search('VERSION_ID="(\d+)', info) - if m: - _RELEASE = m.group(1) - break +SUP_MAP = { + 'ubuntu': (([16], 7), ), + 'debian': (([9], 7), ), + 'opensuse-leap': (([15], 7), ), + 'sles': (([15, 2], 7), ), + 'fedora': (([33], 7), ), +} _SERVER_VARS = { 'basearch': getBaseArch(), - 'releasever': _RELEASE } +with FileUtil.open('/etc/os-release') as f: + for line in f.readlines(): + line = line.strip() + if not line: + continue + try: + k, v = line.split('=', 1) + _SERVER_VARS[k] = v.strip('"').strip("'") + except: + pass + if 'VERSION_ID' in _SERVER_VARS: + m = re.match('\d+', _SERVER_VARS['VERSION_ID']) + if m: + _RELEASE = m.group(0) +_SERVER_VARS['releasever'] = _RELEASE @@ -107,24 +122,25 @@ class MirrorRepository(object): class RemoteMirrorRepository(MirrorRepository): - class RemotePackageInfo(object): + class RemotePackageInfo(PackageInfo): def __init__(self, elem): - self.name = None - self.arch = None self.epoch = None - self.release = None - self.version = None self.location = (None, None) self.checksum = (None,None) # type,value self.openchecksum = (None,None) # type,value self.time = (None, None) + super(RemoteMirrorRepository.RemotePackageInfo, self).__init__(None, None, None, None, None) self._parser(elem) @property def md5(self): return self.checksum[1] + @md5.setter + def md5(self, value): + self.checksum = (self.checksum[0], value) + def __str__(self): url = self.location[1] if self.location[0]: @@ -152,13 +168,13 @@ class RemoteMirrorRepository(MirrorRepository): elif child_name == 'version': self.epoch = child.attrib.get('epoch') - self.version = child.attrib.get('ver') - self.release = child.attrib.get('rel') + self.set_version(child.attrib.get('ver')) + self.set_release(child.attrib.get('rel')) elif child_name == 'time': build = child.attrib.get('build') _file = child.attrib.get('file') - self.location = (_file, build) + self.time = (int(_file), int(build)) elif child_name == 'arch': self.arch = child.text @@ -227,13 +243,13 @@ class RemoteMirrorRepository(MirrorRepository): self._db = None self._repomds = None super(RemoteMirrorRepository, self).__init__(mirror_path, stdio=stdio) - self.baseurl = self.var_replace(meta_data['baseurl'], _SERVER_VARS) + self.baseurl = meta_data['baseurl'] self.gpgcheck = ConfigUtil.get_value_from_dict(meta_data, 'gpgcheck', 0, int) > 0 self.priority = 100 - ConfigUtil.get_value_from_dict(meta_data, 'priority', 99, int) if os.path.exists(mirror_path): self._load_repo_age() repo_age = ConfigUtil.get_value_from_dict(meta_data, 'repo_age', 0, int) - if repo_age > self.repo_age: + if repo_age > self.repo_age or int(time.time()) - 86400 > self.repo_age: self.repo_age = repo_age self.update_mirror() @@ -373,7 +389,7 @@ class RemoteMirrorRepository(MirrorRepository): file_name = pkg_info.location[1] file_path = os.path.join(self.mirror_path, file_name) self.stdio and getattr(self.stdio, 'verbose', print)('get RPM package by %s' % pkg_info) - if not os.path.exists(file_path) or os.stat(file_path)[8] < self.repo_age: + if not os.path.exists(file_path) or os.stat(file_path)[8] < pkg_info.time[1]: base_url = pkg_info.location[0] if pkg_info.location[0] else self.baseurl url = '%s/%s' % (base_url, pkg_info.location[1]) if not self.download_file(url, file_path, self.stdio): @@ -383,7 +399,7 @@ class RemoteMirrorRepository(MirrorRepository): def get_pkgs_info(self, **pattern): matchs = self.get_pkgs_info_with_score(**pattern) if matchs: - return [info for info in sorted(matchs, key=lambda x: x[1], reversed=True)] + return [info for info in sorted(matchs, key=lambda x: x[1], reverse=True)] return matchs def get_best_pkg_info(self, **pattern): @@ -393,19 +409,22 @@ class RemoteMirrorRepository(MirrorRepository): return None def get_exact_pkg_info(self, **pattern): - self.stdio and getattr(self.stdio, 'verbose', print)('check md5 in pattern or not') if 'md5' in pattern and pattern['md5']: + self.stdio and getattr(self.stdio, 'verbose', print)('md5 is %s' % pattern['md5']) return self.db[pattern['md5']] if pattern['md5'] in self.db else None - self.stdio and getattr(self.stdio, 'verbose', print)('check name in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('md5 is None') if 'name' not in pattern and not pattern['name']: + self.stdio and getattr(self.stdio, 'verbose', print)('name is None') return None name = pattern['name'] - self.stdio and getattr(self.stdio, 'verbose', print)('check arch in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('name is %s' % name) arch = getArchList(pattern['arch']) if 'arch' in pattern and pattern['arch'] else _ARCH - self.stdio and getattr(self.stdio, 'verbose', print)('check release in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('arch is %s' % arch) release = pattern['release'] if 'release' in pattern else None - self.stdio and getattr(self.stdio, 'verbose', print)('check version in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('release is %s' % release) version = pattern['version'] if 'version' in pattern else None + self.stdio and getattr(self.stdio, 'verbose', print)('version is %s' % version) + pkgs = [] for key in self.db: info = self.db[key] if info.name != name: @@ -416,25 +435,33 @@ class RemoteMirrorRepository(MirrorRepository): continue if version and version != info.version: continue - return info - return None + pkgs.append(info) + if pkgs: + pkgs.sort() + return pkgs[-1] + else: + return None def get_pkgs_info_with_score(self, **pattern): matchs = [] - self.stdio and getattr(self.stdio, 'verbose', print)('check md5 in pattern or not') if 'md5' in pattern and pattern['md5']: + self.stdio and getattr(self.stdio, 'verbose', print)('md5 is %s' % pattern['md5']) return [self.db[pattern['md5']], (0xfffffffff, )] if pattern['md5'] in self.db else matchs - self.stdio and getattr(self.stdio, 'verbose', print)('check name in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('md5 is None') if 'name' not in pattern and not pattern['name']: + self.stdio and getattr(self.stdio, 'verbose', print)('name is None') return matchs - self.stdio and getattr(self.stdio, 'verbose', print)('check arch in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('name is %s' % pattern['name']) if 'arch' in pattern and pattern['arch']: pattern['arch'] = getArchList(pattern['arch']) else: pattern['arch'] = _ARCH - self.stdio and getattr(self.stdio, 'verbose', print)('check version in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('arch is %s' % pattern['arch']) if 'version' in pattern and pattern['version']: pattern['version'] += '.' + else: + pattern['version'] = None + self.stdio and getattr(self.stdio, 'verbose', print)('version is %s' % pattern['version']) for key in self.db: info = self.db[key] if pattern['name'] in info.name: @@ -448,8 +475,7 @@ class RemoteMirrorRepository(MirrorRepository): if version and info_version.find(version) != 0: return [0 ,] - c = info.version.split('.') - c.insert(0, len(name) / len(info.name)) + c = [len(name) / len(info.name), info] return c @staticmethod @@ -524,6 +550,7 @@ class LocalMirrorRepository(MirrorRepository): continue self.db[key] = data except: + self.stdio.exception('') pass def _dump_db(self): @@ -533,6 +560,7 @@ class LocalMirrorRepository(MirrorRepository): pickle.dump(self.db, f) return True except: + self.stdio.exception('') pass return False @@ -587,7 +615,7 @@ class LocalMirrorRepository(MirrorRepository): def get_pkgs_info(self, **pattern): matchs = self.get_pkgs_info_with_score(**pattern) if matchs: - return [info for info in sorted(matchs, key=lambda x: x[1], reversed=True)] + return [info for info in sorted(matchs, key=lambda x: x[1], reverse=True)] return matchs def get_best_pkg_info(self, **pattern): @@ -597,19 +625,22 @@ class LocalMirrorRepository(MirrorRepository): return None def get_exact_pkg_info(self, **pattern): - self.stdio and getattr(self.stdio, 'verbose', print)('check md5 in pattern or not') if 'md5' in pattern and pattern['md5']: + self.stdio and getattr(self.stdio, 'verbose', print)('md5 is %s' % pattern['md5']) return self.db[pattern['md5']] if pattern['md5'] in self.db else None - self.stdio and getattr(self.stdio, 'verbose', print)('check name in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('md5 is None') if 'name' not in pattern and not pattern['name']: + self.stdio and getattr(self.stdio, 'verbose', print)('name is None') return None name = pattern['name'] - self.stdio and getattr(self.stdio, 'verbose', print)('check arch in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('name is %s' % name) arch = getArchList(pattern['arch']) if 'arch' in pattern and pattern['arch'] else _ARCH - self.stdio and getattr(self.stdio, 'verbose', print)('check release in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('arch is %s' % arch) release = pattern['release'] if 'release' in pattern else None - self.stdio and getattr(self.stdio, 'verbose', print)('check version in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('release is %s' % release) version = pattern['version'] if 'version' in pattern else None + self.stdio and getattr(self.stdio, 'verbose', print)('version is %s' % version) + pkgs = [] for key in self.db: info = self.db[key] if info.name != name: @@ -620,8 +651,12 @@ class LocalMirrorRepository(MirrorRepository): continue if version and version != info.version: continue - return info - return None + pkgs.append(info) + if pkgs: + pkgs.sort() + return pkgs[-1] + else: + return None def get_best_pkg_info_with_score(self, **pattern): matchs = self.get_pkgs_info_with_score(**pattern) @@ -631,20 +666,23 @@ class LocalMirrorRepository(MirrorRepository): def get_pkgs_info_with_score(self, **pattern): matchs = [] - self.stdio and getattr(self.stdio, 'verbose', print)('check md5 in pattern or not') if 'md5' in pattern and pattern['md5']: + self.stdio and getattr(self.stdio, 'verbose', print)('md5 is %s' % pattern['md5']) return [self.db[pattern['md5']], (0xfffffffff, )] if pattern['md5'] in self.db else matchs - self.stdio and getattr(self.stdio, 'verbose', print)('check name in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('md5 is None') if 'name' not in pattern and not pattern['name']: return matchs - self.stdio and getattr(self.stdio, 'verbose', print)('check arch in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('name is %s' % pattern['name']) if 'arch' in pattern and pattern['arch']: pattern['arch'] = getArchList(pattern['arch']) else: pattern['arch'] = _ARCH - self.stdio and getattr(self.stdio, 'verbose', print)('check version in pattern or not') + self.stdio and getattr(self.stdio, 'verbose', print)('arch is %s' % pattern['arch']) if 'version' in pattern and pattern['version']: pattern['version'] += '.' + else: + pattern['version'] = None + self.stdio and getattr(self.stdio, 'verbose', print)('version is %s' % pattern['version']) for key in self.db: info = self.db[key] if pattern['name'] in info.name: @@ -658,8 +696,7 @@ class LocalMirrorRepository(MirrorRepository): if version and info_version.find(version) != 0: return [0 ,] - c = info.version.split('.') - c.insert(0, len(name) / len(info.name)) + c = [len(name) / len(info.name), info] return c def get_info_list(self): @@ -685,6 +722,17 @@ class MirrorRepositoryManager(Manager): def get_remote_mirrors(self): mirrors = [] + server_vars = deepcopy(_SERVER_VARS) + linux_id = server_vars.get('ID') + if linux_id in SUP_MAP: + version = [int(vnum) for vnum in server_vars.get('VERSION_ID', '').split('.')] + for vid, elvid in SUP_MAP[linux_id]: + if version < vid: + break + server_vars['releasever'] = elvid + server_vars['releasever'] = str(elvid) + self.stdio and getattr(self.stdio, 'warn', print)('Use centos %s remote mirror repository for %s %s' % (server_vars['releasever'], linux_id, server_vars.get('VERSION_ID'))) + for path in glob(os.path.join(self.remote_path, '*.repo')): repo_age = os.stat(path)[8] with open(path, 'r') as confpp_obj: @@ -702,11 +750,14 @@ class MirrorRepositoryManager(Manager): meta_data[attr] = value if 'enabled' in meta_data and not meta_data['enabled']: continue + if 'baseurl' not in meta_data: + continue if 'name' not in meta_data: meta_data['name'] = section if 'repo_age' not in meta_data: meta_data['repo_age'] = repo_age - meta_data['name'] = RemoteMirrorRepository.var_replace(meta_data['name'], _SERVER_VARS) + meta_data['name'] = RemoteMirrorRepository.var_replace(meta_data['name'], server_vars) + meta_data['baseurl'] = RemoteMirrorRepository.var_replace(meta_data['baseurl'], server_vars) mirror_path = os.path.join(self.remote_path, meta_data['name']) mirror = RemoteMirrorRepository(mirror_path, meta_data, self.stdio) mirrors.append(mirror) @@ -720,11 +771,12 @@ class MirrorRepositoryManager(Manager): def get_exact_pkg(self, **pattern): only_info = 'only_info' in pattern and pattern['only_info'] mirrors = self.get_mirrors() + info = [None, None] for mirror in mirrors: - info = mirror.get_exact_pkg_info(**pattern) - if info: - return info if only_info else mirror.get_rpm_pkg_by_info(info) - return None + new_one = mirror.get_exact_pkg_info(**pattern) + if new_one and new_one > info[0]: + info = [new_one, mirror] + return info[0] if info[0] is None or only_info else info[1].get_rpm_pkg_by_info(info[0]) def get_best_pkg(self, **pattern): if 'fuzzy' not in pattern or not pattern['fuzzy']: @@ -750,7 +802,7 @@ class MirrorRepositoryManager(Manager): def add_local_mirror(self, src, force=False): self.stdio and getattr(self.stdio, 'verbose', print)('%s is file or not' % src) if not os.path.isfile(src): - self.stdio and getattr(self.stdio, 'error', print)('%s does not exist or no such file: %s' % src) + self.stdio and getattr(self.stdio, 'error', print)('No such file: %s' % (src)) return None try: self.stdio and getattr(self.stdio, 'verbose', print)('load %s to Package Object' % src) diff --git a/_plugin.py b/_plugin.py index d8ab0d8..0ff197f 100644 --- a/_plugin.py +++ b/_plugin.py @@ -27,6 +27,7 @@ from glob import glob from copy import deepcopy from _manager import Manager +from _rpm import Version from tool import ConfigUtil, DynamicLoading, YamlLoader @@ -51,10 +52,10 @@ class Plugin(object): raise NotImplementedError self.component_name = component_name self.plugin_path = plugin_path - self.version = version.split('.') + self.version = Version(version) def __str__(self): - return '%s-%s-%s' % (self.component_name, self.PLUGIN_TYPE.name.lower(), '.'.join(self.version)) + return '%s-%s-%s' % (self.component_name, self.PLUGIN_TYPE.name.lower(), self.version) @property def mirror_type(self): @@ -327,12 +328,18 @@ class ParamPlugin(Plugin): class InstallPlugin(Plugin): + class FileItemType(Enum): + + FILE = 0 + DIR = 1 + BIN = 2 + class FileItem(object): def __init__(self, src_path, target_path, _type): self.src_path = src_path self.target_path = target_path - self.type = _type if _type else 'file' + self.type = _type if _type else InstallPlugin.FileItemType.FILE PLUGIN_TYPE = PluginType.INSTALL FILES_MAP_YAML = 'file_map.yaml' @@ -357,7 +364,7 @@ class InstallPlugin(Plugin): self._file_map[k] = InstallPlugin.FileItem( k, ConfigUtil.get_value_from_dict(data, 'target_path', k), - ConfigUtil.get_value_from_dict(data, 'type', None) + getattr(InstallPlugin.FileItemType, ConfigUtil.get_value_from_dict(data, 'type', 'FILE').upper(), None) ) except: pass @@ -369,6 +376,7 @@ class InstallPlugin(Plugin): + class ComponentPluginLoader(object): PLUGIN_TYPE = None @@ -400,7 +408,7 @@ class ComponentPluginLoader(object): return plugins def get_best_plugin(self, version): - version = version.split('.') + version = Version(version) plugins = [] for plugin in self.get_plugins(): if plugin.version == version: diff --git a/_repository.py b/_repository.py index dbc6692..7e6e68b 100644 --- a/_repository.py +++ b/_repository.py @@ -22,13 +22,15 @@ from __future__ import absolute_import, division, print_function import os import sys +import time import hashlib from glob import glob -from _rpm import Package +from _rpm import Package, PackageInfo, Version from _arch import getBaseArch from tool import DirectoryUtil, FileUtil, YamlLoader from _manager import Manager +from _plugin import InstallPlugin class LocalPackage(Package): @@ -59,15 +61,18 @@ class LocalPackage(Package): def __init__(self, path, name, version, files, release=None, arch=None): self.name = name - self.version = version + self.set_version(version) + self.set_release(release if release else time.strftime("%Y%m%d%H%M%S", time.localtime(time.time()))) self.md5 = None - self.release = release if release else version self.arch = arch if arch else getBaseArch() self.headers = {} self.files = files self.path = path self.package() + def __hash__(self): + return hash(self.path) + def package(self): count = 0 dirnames = [] @@ -114,17 +119,19 @@ class LocalPackage(Package): return self.RpmObject(self.headers, self.files) -class Repository(object): +class Repository(PackageInfo): _DATA_FILE = '.data' def __init__(self, name, repository_dir, stdio=None): self.repository_dir = repository_dir - self.name = name - self.version = None - self.hash = None + super(Repository, self).__init__(name, None, None, None, None) self.stdio = stdio self._load() + + @property + def hash(self): + return self.md5 def __str__(self): return '%s-%s-%s' % (self.name, self.version, self.hash) @@ -142,16 +149,11 @@ class Repository(object): path = os.readlink(self.repository_dir) if os.path.islink(self.repository_dir) else self.repository_dir return os.path.join(path, Repository._DATA_FILE) - # 暂不清楚开源的rpm requirename是否仅有必须的依赖 - def require_list(self): - return [] - - # 暂不清楚开源的rpm requirename是否仅有必须的依赖 故先使用 ldd检查bin文件的形式检查依赖 def bin_list(self, plugin): files = [] if self.version and self.hash: for file_item in plugin.file_list(): - if file_item.type == 'bin': + if file_item.type == InstallPlugin.FileItemType.BIN: files.append(os.path.join(self.repository_dir, file_item.target_path)) return files @@ -173,13 +175,16 @@ class Repository(object): return self.version == other.version and self.hash == other.hash if isinstance(other, dict): return self.version == other['version'] and self.hash == other['hash'] + return super(Repository, self).__eq__(other) def _load(self): try: with open(self.data_file_path, 'r') as f: data = YamlLoader().load(f) - self.version = data['version'] - self.hash = data['hash'] + self.set_version(data.get('version')) + self.set_release(data.get('release')) + self.md5 = data.get('hash') + self.arch = data.get('arch') except: pass @@ -192,10 +197,10 @@ class Repository(object): path, _hash = os.path.split(path) path, version = os.path.split(path) if not self.version: - self.version = version + self.set_version(version) def _dump(self): - data = {'version': self.version, 'hash': self.hash} + data = {'version': self.version, 'hash': self.hash, 'release': self.release, 'arch': self.arch} try: with open(self.data_file_path, 'w') as f: YamlLoader().dump(data, f) @@ -213,8 +218,18 @@ class Repository(object): return True self.clear() try: - file_map = plugin.file_map with pkg.open() as rpm: + file_map = plugin.file_map + need_dirs = {} + need_files = {} + for src_path in file_map: + file_item = file_map[src_path] + if file_item.type == InstallPlugin.FileItemType.DIR: + if not src_path.endswith('/'): + src_path += '/' + need_dirs[src_path] = file_item.target_path + else: + need_files[src_path] = file_item.target_path files = {} links = {} dirnames = rpm.headers.get("dirnames") @@ -223,19 +238,29 @@ class Repository(object): filelinktos = rpm.headers.get("filelinktos") filemd5s = rpm.headers.get("filemd5s") filemodes = rpm.headers.get("filemodes") + dirs = sorted(need_dirs.keys(), reverse=True) + format_str = lambda s: s.decode(errors='replace') if isinstance(s, bytes) else s for i in range(len(basenames)): - path = os.path.join(dirnames[dirindexes[i]], basenames[i]) - if isinstance(path, bytes): - path = path.decode() - if not path.startswith('./'): - path = '.%s' % path + if not filemd5s[i] and not filelinktos[i]: + continue + dir_path = format_str(dirnames[dirindexes[i]]) + if not dir_path.startswith('./'): + dir_path = '.%s' % dir_path + file_name = format_str(basenames[i]) + path = os.path.join(dir_path, file_name) files[path] = i - for src_path in file_map: + if path not in need_files: + for n_dir in need_dirs: + if path.startswith(n_dir): + need_files[path] = os.path.join(n_dir, path[len(n_dir):]) + break + for src_path in need_files: if src_path not in files: raise Exception('%s not found in packge' % src_path) + target_path = os.path.join(self.repository_dir, need_files[src_path]) + if os.path.exists(target_path): + return idx = files[src_path] - file_item = file_map[src_path] - target_path = os.path.join(self.repository_dir, file_item.target_path) if filemd5s[idx]: fd = rpm.extractfile(src_path) self.stdio and getattr(self.stdio, 'verbose', print)('extract %s to %s' % (src_path, target_path)) @@ -248,11 +273,17 @@ class Repository(object): links[target_path] = filelinktos[idx] else: raise Exception('%s is directory' % src_path) + for link in links: - self.stdio and getattr(self.stdio, 'verbose', print)('link %s to %s' % (src_path, target_path)) + self.stdio and getattr(self.stdio, 'verbose', print)('link %s to %s' % (links[link], link)) os.symlink(links[link], link) - self.version = pkg.version - self.hash = pkg.md5 + for n_dir in need_dirs: + if not os.path.isdir(n_dir): + raise Exception('%s: No such dir: %s' % (pkg.path, n_dir)) + self.set_version(pkg.version) + self.set_release(pkg.release) + self.md5 = pkg.md5 + self.arch = pkg.arch if self._dump(): return True else: @@ -263,7 +294,10 @@ class Repository(object): return False def clear(self): - return DirectoryUtil.rm(self.repository_dir, self.stdio) and DirectoryUtil.mkdir(self.repository_dir, stdio=self.stdio) + if os.path.exists(self.repository_dir): + return DirectoryUtil.rm(self.repository_dir, self.stdio) and DirectoryUtil.mkdir(self.repository_dir, stdio=self.stdio) + return True + class ComponentRepository(object): @@ -300,12 +334,17 @@ class ComponentRepository(object): return repositories def get_repository_by_version(self, version, tag=None): + if tag: + return self.get_repository_by_tag(tag, version) + repository = self.get_repository_by_tag(self.name, version) + if repository: + return repository path_partten = os.path.join(self.repository_dir, version, tag if tag else '*') for path in glob(path_partten): - repository = Repository(self.name, path, self.stdio) - if repository.hash: - return repository - return None + n_repository = Repository(self.name, path, self.stdio) + if n_repository.hash and n_repository > repository: + repository = n_repository + return repository def get_repository_by_tag(self, tag, version=None): path_partten = os.path.join(self.repository_dir, version if version else '*', tag) @@ -316,15 +355,17 @@ class ComponentRepository(object): return None def get_repository(self, version=None, tag=None): + if tag: + return self.get_repository_by_tag(tag, version) if version: return self.get_repository_by_version(version, tag) - version = [] + version = None for rep_version in os.listdir(self.repository_dir): - rep_version = rep_version.split('.') + rep_version = Version(rep_version) if rep_version > version: version = rep_version if version: - return self.get_repository_by_version('.'.join(version), tag) + return self.get_repository_by_version(version, tag) return None @@ -363,8 +404,6 @@ class RepositoryManager(Manager): def get_repository(self, name, version=None, tag=None, instance=True): if version: return self.get_repository_by_version(name, version, tag) - if not tag: - tag = name if name not in self.component_repositoies: path = os.path.join(self.path, name) self.component_repositoies[name] = ComponentRepository(name, path, self.stdio) @@ -388,7 +427,7 @@ class RepositoryManager(Manager): self.repositories[path] = Repository(name, path, self.stdio) return self.repositories[path] repository = Repository(name, path, self.stdio) - repository.version = version + repository.set_version(version) return repository def create_tag_for_repository(self, repository, tag, force=False): diff --git a/_rpm.py b/_rpm.py index 007b532..33ca76d 100644 --- a/_rpm.py +++ b/_rpm.py @@ -21,6 +21,7 @@ from __future__ import absolute_import, division, print_function import os +import re import sys import rpmfile @@ -35,21 +36,105 @@ if sys.version_info.major == 2: from backports import lzma setattr(sys.modules['rpmfile'], 'lzma', getattr(sys.modules[__name__], 'lzma')) +class Version(str): -class Package(object): + def __init__(self, bytes_or_buffer, encoding=None, errors=None): + super(Version, self).__init__() + + @property + def __cmp_value__(self): + return [(int(_i), _s) for _i, _s in re.findall('(\d+)([^\.]*)', self.__str__())] + + def __gt__(self, value): + if value is None: + return True + return self.__cmp_value__ > self.__class__(value).__cmp_value__ + + def __ge__(self, value): + return self.__eq__(value) or self.__gt__(value) + + def __lt__(self, value): + if value is None: + return False + return self.__cmp_value__ < self.__class__(value).__cmp_value__ + + def __le__(self, value): + return self.__eq__(value) or self.__lt__(value) + +class Release(Version): + + @property + def __cmp_value__(self): + m = re.search('(\d+)', self.__str__()) + return int(m.group(0)) if m else -1 + + +class PackageInfo(object): + + def __init__(self, name, version, release, arch, md5): + self.name = name + self.set_version(version) + self.set_release(release) + self.arch = arch + self.md5 = md5 + + def set_version(self, version): + self.version = Version(str(version) if version else '') + + def set_release(self, release): + self.release = Release(str(release) if release else '') + + def __str__(self): + return 'name: %s\nversion: %s\nrelease:%s\narch: %s\nmd5: %s' % (self.name, self.version, self.release, self.arch, self.md5) + + @property + def __cmp_value__(self): + return [self.version, self.release] + + def __eq__(self, value): + if value is None: + return False + return self.md5 == value.md5 + + def __ne__(self, value): + return not self.__eq__(value) + + def __gt__(self, value): + return value is None or self.__cmp_value__ > value.__cmp_value__ + + def __ge__(self, value): + return value is None or self.__eq__(value) or self.__cmp_value__ >= value.__cmp_value__ + + def __lt__(self, value): + if value is None: + return False + return self.__cmp_value__ < value.__cmp_value__ + + def __le__(self, value): + if value is None: + return False + return self.__eq__(value) or self.__cmp_value__ <= value.__cmp_value__ + + +class Package(PackageInfo): def __init__(self, path): self.path = path with self.open() as rpm: - self.name = rpm.headers.get('name').decode() - self.version = rpm.headers.get('version').decode() - self.release = rpm.headers.get('release').decode() - self.arch = rpm.headers.get('arch').decode() - self.md5 = rpm.headers.get('md5').decode() + super(Package, self).__init__( + name = rpm.headers.get('name').decode(), + version = rpm.headers.get('version').decode(), + release = rpm.headers.get('release').decode(), + arch = rpm.headers.get('arch').decode(), + md5 = rpm.headers.get('md5').decode() + ) def __str__(self): return 'name: %s\nversion: %s\nrelease:%s\narch: %s\nmd5: %s' % (self.name, self.version, self.release, self.arch, self.md5) + def __hash__(self): + return hash(self.path) + @property def file_name(self): return '%s-%s-%s.%s.rpm' % (self.name, self.version, self.release, self.arch) diff --git a/build.sh b/build.sh index 92c2590..6c31f1b 100644 --- a/build.sh +++ b/build.sh @@ -1,4 +1,4 @@ -# /bin/bash +#!/bin/bash if [ `id -u` != 0 ] ; then echo "Please use root to run" fi @@ -25,6 +25,10 @@ cp -f /usr/obd/profile/obd.sh /etc/profile.d/obd.sh rm -fr /usr/obd/mirror/remote && mkdir -p /usr/obd/mirror/remote cd /usr/obd/mirror/remote && wget https://mirrors.aliyun.com/oceanbase/OceanBase.repo rm -fr /usr/bin/obd -echo -e "# /bin/bash\n$python_bin /usr/obd/_cmd.py \$*" > /usr/bin/obd +CID=`git log |head -n1 | awk -F' ' '{print $2}'` +BRANCH=`git branch | grep -e "^\*" | awk -F' ' '{print $2}'` +DATE=`date '+%b %d %Y %H:%M:%S'` +cat /usr/obd/_cmd.py | sed "s//$CID/" | sed "s//$BRANCH/" | sed "s//$DATE/" > /usr/obd/obd.py +echo -e "#!/bin/bash\n$python_bin /usr/obd/obd.py \$*" > /usr/bin/obd chmod +x /usr/bin/obd -echo -e 'Installation of obd finished successfully\nPlease source /etc/profile.d/obd.sh to enable it' +echo -e 'Installation of obd finished successfully\nPlease source /etc/profile.d/obd.sh to enable it' \ No newline at end of file diff --git a/core.py b/core.py index 5af07ab..064978c 100644 --- a/core.py +++ b/core.py @@ -35,8 +35,8 @@ from halo import Halo from ssh import SshClient, SshConfig from tool import ConfigUtil, FileUtil, DirectoryUtil, YamlLoader from _stdio import MsgLevel -from _mirror import MirrorRepositoryManager -from _plugin import PluginManager, PluginType +from _mirror import MirrorRepositoryManager, PackageInfo +from _plugin import PluginManager, PluginType, InstallPlugin from _repository import RepositoryManager, LocalPackage from _deploy import DeployManager, DeployStatus, DeployConfig, DeployConfigStatus @@ -159,7 +159,7 @@ class ObdHome(object): ssh_clients[server].connect() def search_plugin(self, repository, plugin_type, no_found_exit=True): - self._call_stdio('verbose', 'Search %s plugin for %s' % (plugin_type.name.lower(), repository)) + self._call_stdio('verbose', 'Search %s plugin for %s' % (plugin_type.name.lower(), repository.name)) plugin = self.plugin_manager.get_best_plugin(plugin_type, repository.name, repository.version) if plugin: self._call_stdio('verbose', 'Found for %s for %s-%s' % (plugin, repository.name, repository.version)) @@ -198,7 +198,7 @@ class ObdHome(object): self._call_stdio('warn', 'No such %s plugin for %s-%s' % (script_name, repository.name, repository.version)) return plugins - def search_components_from_mirrors(self, deploy_config, fuzzy_match=False, only_info=True): + def search_components_from_mirrors(self, deploy_config, fuzzy_match=False, only_info=True, update_if_need=None): pkgs = [] errors = [] repositories = [] @@ -209,20 +209,28 @@ class ObdHome(object): self._call_stdio('verbose', 'Get %s repository' % component) repository = self.repository_manager.get_repository(component, config.version, config.package_hash if config.package_hash else config.tag) - self._call_stdio('verbose', 'Check %s version for the repository' % repository) - if repository and repository.hash: - repositories.append(repository) - self._call_stdio('verbose', 'Use repository %s' % repository) - self._call_stdio('print', '%s-%s already installed' % (repository.name, repository.version)) - continue + if repository and not repository.hash: + repository = None self._call_stdio('verbose', 'Search %s package from mirror' % component) pkg = self.mirror_manager.get_best_pkg(name=component, version=config.version, md5=config.package_hash, fuzzy_match=fuzzy_match, only_info=only_info) - if pkg: - self._call_stdio('verbose', 'Package %s-%s is available.' % (pkg.name, pkg.version)) + if repository or pkg: + if pkg: + self._call_stdio('verbose', 'Found Package %s-%s-%s' % (pkg.name, pkg.version, pkg.md5)) + if repository: + if repository >= pkg or ( + ( + update_if_need is None and + not self._call_stdio('confirm', 'Found a higher version\n%s\nDo you want to use it?' % pkg) + ) or update_if_need is False + ): + repositories.append(repository) + self._call_stdio('verbose', 'Use repository %s' % repository) + self._call_stdio('print', '%s-%s already installed.' % (repository.name, repository.version)) + continue if config.version and pkg.version != config.version: - self._call_stdio('warn', 'No such package %s-%s. Use similar package %s-%s.' % (component, config.version, pkg.name, pkg.version)) + self._call_stdio('warn', 'No such package %s-%s. Use similar package %s-%s.' % (component, config.version, pkg.name, pkg.version)) else: - self._call_stdio('print', 'Package %s-%s is available' % (pkg.name, pkg.version)) + self._call_stdio('print', 'Package %s-%s is available.' % (pkg.name, pkg.version)) repository = self.repository_manager.get_repository(pkg.name, pkg.md5) if repository: repositories.append(repository) @@ -346,7 +354,7 @@ class ObdHome(object): if confirm('Modifying the version and hash of the component is not permitted.'): continue return False - pkgs, repositories, errors = self.search_components_from_mirrors(deploy_config) + pkgs, repositories, errors = self.search_components_from_mirrors(deploy_config, update_if_need=False) # Loading the parameter plugins that are available to the application self._call_stdio('start_loading', 'Search param plugin and load') for repository in repositories: @@ -460,7 +468,9 @@ class ObdHome(object): self._call_stdio('error', 'Failed to extract file from %s' % pkg.path) return None self._call_stdio('stop_loading', 'succeed') - self.repository_manager.create_tag_for_repository(repository, pkg.name) + head_repository = self.repository_manager.get_repository(pkg.name, pkg.version, pkg.name) + if repository > head_repository: + self.repository_manager.create_tag_for_repository(repository, pkg.name, True) repositories.append(repository) return install_plugins @@ -593,6 +603,7 @@ class ObdHome(object): # If the cluster states are consistent, the status value is returned. Else False is returned. def cluster_status_check(self, ssh_clients, deploy_config, repositories, ret_status={}): + self._call_stdio('start_loading', 'Cluster status check') status_plugins = self.search_py_script_plugin(repositories, 'status') component_status = {} for repository in repositories: @@ -610,6 +621,7 @@ class ObdHome(object): break else: continue + self._call_stdio('stop_loading', 'succeed') return False status = None for repository in component_status: @@ -618,7 +630,9 @@ class ObdHome(object): continue if status != component_status[repository]: self._call_stdio('verbose', 'Deploy status inconsistent') + self._call_stdio('stop_loading', 'succeed') return False + self._call_stdio('stop_loading', 'succeed') return status def deploy_cluster(self, name, opt=Values()): @@ -675,10 +689,15 @@ class ObdHome(object): # Get the installation plugins. Install locally install_plugins = self.get_install_plugin_and_install(repositories, pkgs) if not install_plugins: - self._call_stdio('print', 'You could try using -f to force remove directory') return False - self._call_stdio('print_list', repositories, ['Repository', 'Version', 'Md5'], lambda repository: [repository.name, repository.version, repository.hash], title='Packages') + self._call_stdio( + 'print_list', + repositories, + ['Repository', 'Version', 'Release', 'Md5'], + lambda repository: [repository.name, repository.version, repository.release, repository.hash], + title='Packages' + ) errors = [] self._call_stdio('verbose', 'Repository integrity check') @@ -701,7 +720,6 @@ class ObdHome(object): if unuse_lib_repo and not deploy_config.unuse_lib_repository: deploy_config.set_unuse_lib_repository(True) - lib_not_found_msg_func = 'error' if deploy_config.unuse_lib_repository else 'print' # Get the client ssh_clients = self.get_clients(deploy_config, repositories) @@ -711,7 +729,7 @@ class ObdHome(object): cluster_config = deploy_config.components[repository.name] # cluster files check self.servers_repository_install(ssh_clients, cluster_config.servers, repository, install_plugins[repository]) - # lib check + # lib check msg_lv = 'error' if deploy_config.unuse_lib_repository else 'warn' if not self.servers_repository_lib_check(ssh_clients, cluster_config.servers, repository, install_plugins[repository], msg_lv): need_lib_repositories.append(repository) @@ -793,6 +811,13 @@ class ObdHome(object): # Get the repository repositories = self.load_local_repositories(deploy_config, False) + start_check_plugins = self.search_py_script_plugin(repositories, 'start_check', False) + start_plugins = self.search_py_script_plugin(repositories, 'start') + connect_plugins = self.search_py_script_plugin(repositories, 'connect') + bootstrap_plugins = self.search_py_script_plugin(repositories, 'bootstrap') + display_plugins = self.search_py_script_plugin(repositories, 'display') + self._call_stdio('stop_loading', 'succeed') + # Get the client ssh_clients = self.get_clients(deploy_config, repositories) @@ -804,23 +829,18 @@ class ObdHome(object): self._call_stdio('print', 'Deploy "%s" is running' % name) return True + self._call_stdio('start_loading', 'Cluster param config check') # Check whether the components have the parameter plugins and apply the plugins self.search_param_plugin_and_apply(repositories, deploy_config) # Parameter check - self._call_stdio('verbose', 'Cluster param config check') errors = self.deploy_param_check(repositories, deploy_config) if errors: + self._call_stdio('stop_loading', 'fail') self._call_stdio('error', '\n'.join(errors)) return False - - start_check_plugins = self.search_py_script_plugin(repositories, 'start_check', False) - start_plugins = self.search_py_script_plugin(repositories, 'start') - connect_plugins = self.search_py_script_plugin(repositories, 'connect') - bootstrap_plugins = self.search_py_script_plugin(repositories, 'bootstrap') - display_plugins = self.search_py_script_plugin(repositories, 'display') self._call_stdio('stop_loading', 'succeed') - + strict_check = getattr(options, 'strict_check', False) success = True for repository in repositories: @@ -828,23 +848,17 @@ class ObdHome(object): continue cluster_config = deploy_config.components[repository.name] self._call_stdio('verbose', 'Call %s for %s' % (start_check_plugins[repository], repository)) - ret = start_check_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, cmd, options, self.stdio, alert_lv='error' if strict_check else 'warn') + ret = start_check_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, cmd, options, self.stdio, strict_check=strict_check) if not ret: success = False - if strict_check and success is False: + if success is False: # self._call_stdio('verbose', 'Starting check failed. Use --skip-check to skip the starting check. However, this may lead to a starting failure.') return False component_num = len(repositories) for repository in repositories: cluster_config = deploy_config.components[repository.name] - if not deploy_config.unuse_lib_repository: - for server in cluster_config.servers: - client = ssh_clients[server] - remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip() - remote_repository_path = repository.repository_dir.replace(self.home_path, remote_home_path) - client.add_env('LD_LIBRARY_PATH', '%s/lib:' % remote_repository_path, True) self._call_stdio('verbose', 'Call %s for %s' % (start_plugins[repository], repository)) ret = start_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, cmd, options, self.stdio, self.home_path, repository.repository_dir) @@ -854,11 +868,6 @@ class ObdHome(object): self._call_stdio('error', '%s start failed' % repository.name) break - if not deploy_config.unuse_lib_repository: - for server in cluster_config.servers: - client = ssh_clients[server] - client.add_env('LD_LIBRARY_PATH', '', True) - self._call_stdio('verbose', 'Call %s for %s' % (connect_plugins[repository], repository)) ret = connect_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, cmd, options, self.stdio) if ret: @@ -987,10 +996,10 @@ class ObdHome(object): connect_plugins = self.search_py_script_plugin(repositories, 'connect') display_plugins = self.search_py_script_plugin(repositories, 'display') + self._call_stdio('stop_loading', 'succeed') # Get the client ssh_clients = self.get_clients(deploy_config, repositories) - self._call_stdio('stop_loading', 'succeed') # Check the status for the deployed cluster component_status = {} @@ -1047,10 +1056,10 @@ class ObdHome(object): self.search_param_plugin_and_apply(repositories, deploy_config) stop_plugins = self.search_py_script_plugin(repositories, 'stop') + self._call_stdio('stop_loading', 'succeed') # Get the client ssh_clients = self.get_clients(deploy_config, repositories) - self._call_stdio('stop_loading', 'succeed') component_num = len(repositories) for repository in repositories: @@ -1074,12 +1083,15 @@ class ObdHome(object): deploy_info = deploy.deploy_info self._call_stdio('verbose', 'Check the deploy status') - if deploy_info.status == DeployStatus.STATUS_RUNNING and not self.stop_cluster(name): - return False + if deploy_info.status == DeployStatus.STATUS_RUNNING: + if deploy_info.config_status != DeployConfigStatus.UNCHNAGE: + self.reload_cluster(name) + if not self.stop_cluster(name): + return False return self.start_cluster(name) - def redeploy_cluster(self, name): - return self.destroy_cluster(name) and self.deploy_cluster(name) and self.start_cluster(name) + def redeploy_cluster(self, name, opt=Values()): + return self.destroy_cluster(name, opt) and self.deploy_cluster(name) and self.start_cluster(name) def destroy_cluster(self, name, opt=Values()): self._call_stdio('verbose', 'Get Deploy by name') @@ -1107,19 +1119,16 @@ class ObdHome(object): self.search_param_plugin_and_apply(repositories, deploy_config) plugins = self.search_py_script_plugin(repositories, 'destroy') + self._call_stdio('stop_loading', 'succeed') # Get the client ssh_clients = self.get_clients(deploy_config, repositories) - self._call_stdio('stop_loading', 'succeed') # Check the status for the deployed cluster component_status = {} cluster_status = self.cluster_status_check(ssh_clients, deploy_config, repositories, component_status) if cluster_status is False or cluster_status == 1: - force_kill = getattr(opt, 'force_kill', False) - msg_lv = 'warn' if force_kill else 'error' - self._call_stdio(msg_lv, 'Some of the servers in the cluster are running') - if force_kill: + if getattr(opt, 'force_kill', False): self._call_stdio('verbose', 'Try to stop cluster') status = deploy.deploy_info.status deploy.update_deploy_status(DeployStatus.STATUS_RUNNING) @@ -1128,13 +1137,13 @@ class ObdHome(object): self._call_stdio('error', 'Fail to stop cluster') return False else: - if self.stdio: - for repository in component_status: - cluster_status = component_status[repository] - for server in cluster_status: - if cluster_status[server] == 1: - self._call_stdio('print', '%s %s is running' % (server, repository.name)) - self._call_stdio('print', 'You could try using -f to force kill process') + self._call_stdio('error', 'Some of the servers in the cluster are running') + for repository in component_status: + cluster_status = component_status[repository] + for server in cluster_status: + if cluster_status[server] == 1: + self._call_stdio('print', '%s %s is running' % (server, repository.name)) + self._call_stdio('print', 'You could try using -f to force kill process') return False for repository in repositories: @@ -1149,6 +1158,156 @@ class ObdHome(object): return True return False + def upgrade_cluster(self, name, opts=Values()): + self._call_stdio('verbose', 'Get Deploy by name') + deploy = self.deploy_manager.get_deploy_config(name) + if not deploy: + self._call_stdio('error', 'No such deploy: %s.' % name) + return False + + deploy_info = deploy.deploy_info + self._call_stdio('verbose', 'Deploy status judge') + if deploy_info.status not in [DeployStatus.STATUS_DEPLOYED, DeployStatus.STATUS_STOPPED, DeployStatus.STATUS_RUNNING]: + self._call_stdio('error', 'Deploy "%s" does not need to be upgraded' % (name)) + return False + + components = getattr(opts, 'components', '') + components = components.split(',') if components else deploy_info.components + if not components: + self._call_stdio('print', 'No updates detected') + return True + for component in components: + if component not in deploy_info.components: + self._call_stdio('error', 'Not found %s in Deploy "%s" ' % (component, name)) + return False + + force = getattr(opts, 'force', False) + deploy_config = deploy.deploy_config + self._call_stdio('verbose', 'Search package for components...') + upgrade_components = {} + pkgs = [] + repositories = [] + for component in components: + config = deploy_config.components[component] + if config.origin_package_hash: + self._call_stdio('print', 'No updates detected for %s' % component) + continue + package_version = deploy_info.components[component]['version'] + package_hash = deploy_info.components[component]['hash'] + repository = self.repository_manager.get_repository(component, package_version, package_hash) + pkg = self.mirror_manager.get_best_pkg(name=component, version=config.version, tag=config.tag, fuzzy_match=False, only_info=False) + if pkg and pkg > repository: + upgrade_components[component] = { + 'repository': repository, + 'pkg': pkg + } + pkgs.append(pkg) + else: + self._call_stdio('print', 'No updates detected for %s' % repository) + if not upgrade_components: + self._call_stdio('print', 'Nothing to do.') + return True + + for component in upgrade_components: + repository = upgrade_components[component]['repository'] + pkg = upgrade_components[component]['pkg'] + if repository.version != pkg.version: + self._call_stdio('error', '%s %s-%s: available upgrade paths not found' % (component, repository.version, pkg.version)) + return False + + install_plugins = self.get_install_plugin_and_install(repositories, pkgs) + if not install_plugins: + return False + + errors = [] + self._call_stdio('verbose', 'Repository integrity check') + for repository in repositories: + if not repository.file_check(install_plugins[repository]): + errors.append('%s intstall failed' % repository.name) + if errors: + self._call_stdio('error', '\n'.join(errors)) + return False + + # Get the client + ssh_clients = self.get_clients(deploy_config, repositories) + + need_lib_repositories = [] + for repository in repositories: + cluster_config = deploy_config.components[repository.name] + # cluster files check + self.servers_repository_install(ssh_clients, cluster_config.servers, repository, install_plugins[repository]) + # lib check + msg_lv = 'error' if deploy_config.unuse_lib_repository else 'warn' + if not self.servers_repository_lib_check(ssh_clients, cluster_config.servers, repository, install_plugins[repository], msg_lv): + need_lib_repositories.append(repository) + + if need_lib_repositories: + if deploy_config.unuse_lib_repository: + # self._call_stdio('print', 'You could try using -U to work around the problem') + return False + self._call_stdio('print', 'Try to get lib-repository') + repositories_lib_map = self.install_lib_for_repositories(need_lib_repositories) + if repositories_lib_map is False: + self._call_stdio('error', 'Failed to install lib package for local') + return False + if self.servers_apply_lib_repository_and_check(ssh_clients, deploy_config, need_lib_repositories, repositories_lib_map): + self._call_stdio('error', 'Failed to install lib package for cluster servers') + return False + + # Change bin if cluster is not running + if deploy_info.status != DeployStatus.STATUS_RUNNING: + for repository in repositories: + deploy.use_model(repository.name, repository) + self._call_stdio('print', 'Upgrade to %s successful.' % (repository)) + return True + + component_status = {} + if DeployStatus.STATUS_RUNNING == deploy_info.status: + cluster_status = self.cluster_status_check(ssh_clients, deploy_config, repositories, component_status) + if cluster_status != 1: + self._call_stdio('print', 'Cluster status is not uniform, please use start or stop command before upgrade') + return False + + if deploy_info.config_status != DeployConfigStatus.UNCHNAGE: + self.reload_cluster(name) + self._call_stdio('start_loading', 'Get plugins for repositories') + upgrade_plugins = self.search_py_script_plugin(repositories, 'upgrade') + start_plugins = self.search_py_script_plugin(repositories, 'start') + connect_plugins = self.search_py_script_plugin(repositories, 'connect') + display_plugins = self.search_py_script_plugin(repositories, 'display') + stop_plugins = self.search_py_script_plugin(repositories, 'stop') + + # Check whether the components have the parameter plugins and apply the plugins + self.search_param_plugin_and_apply(repositories, deploy_config) + + # Parameter check + self._call_stdio('verbose', 'Cluster param configuration check') + errors = self.deploy_param_check(repositories, deploy_config) + if errors: + self._call_stdio('stop_loading', 'fail') + self._call_stdio('error', '\n'.join(errors)) + return False + self._call_stdio('stop_loading', 'succeed') + + component_num = len(repositories) + for repository in repositories: + cluster_config = deploy_config.components[repository.name] + self._call_stdio('verbose', 'Call %s for %s' % (upgrade_plugins[repository], repository)) + if upgrade_plugins[repository]( + deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, + stop_plugins[repository], + start_plugins[repository], + connect_plugins[repository], + display_plugins[repository], + local_home_path=self.home_path, + repository_dir=repository.repository_dir + ): + deploy.use_model(repository.name, repository) + self._call_stdio('print', 'Upgrade to %s successful.' % (repository)) + component_num -= 1 + + return component_num == 0 + def create_repository(self, options): force = getattr(options, 'force', False) necessary = ['name', 'version', 'path'] @@ -1177,7 +1336,7 @@ class ObdHome(object): path = os.path.join(repo_path, item.target_path) path = os.path.normcase(path) if not os.path.exists(path): - self._call_stdio('error', 'need file: %s ' % path) + self._call_stdio('error', 'need %s: %s ' % ('dir' if item.type == InstallPlugin.FileItemType.DIR else 'file', path)) success = False continue files[item.src_path] = path @@ -1264,10 +1423,10 @@ class ObdHome(object): # Check whether the components have the parameter plugins and apply the plugins self.search_param_plugin_and_apply(repositories, deploy_config) + self._call_stdio('stop_loading', 'succeed') # Get the client ssh_clients = self.get_clients(deploy_config, repositories) - self._call_stdio('stop_loading', 'succeed') # Check the status for the deployed cluster component_status = {} @@ -1374,3 +1533,22 @@ class ObdHome(object): self._call_stdio('print', 'Mysqltest passed') return True return False + + def update_obd(self, version): + component_name = 'ob-deploy' + plugin = self.plugin_manager.get_best_plugin(PluginType.INSTALL, component_name, '1.0.0') + if not plugin: + self._call_stdio('critical', 'OBD upgrade plugin not found') + return False + pkg = self.mirror_manager.get_best_pkg(name=component_name) + if not (pkg and pkg > PackageInfo(component_name, version, pkg.release, pkg.arch, '')): + self._call_stdio('print', 'No updates detected. OBD is already up to date.') + return False + + self._call_stdio('print', 'Found a higher version package for OBD\n%s' % pkg) + repository = self.repository_manager.create_instance_repository(pkg.name, pkg.version, pkg.md5) + repository.load_pkg(pkg, plugin) + if DirectoryUtil.copy(repository.repository_dir, '/', self.stdio): + self._call_stdio('print', 'Upgrade successful.\nCurrent version : %s' % pkg.version) + return True + return False diff --git a/example/distributed-example.yaml b/example/distributed-example.yaml index 11bcf1c..94bf4b4 100644 --- a/example/distributed-example.yaml +++ b/example/distributed-example.yaml @@ -26,6 +26,7 @@ oceanbase-ce: enable_syslog_recycle: true max_syslog_file_count: 4 cluster_id: 1 + root_password: # root user password # In this example , support multiple ob process in single node, so different process use different ports. # If deploy ob cluster in multiple nodes, the port and path setting can be same. z1: diff --git a/example/distributed-with-obproxy-example.yaml b/example/distributed-with-obproxy-example.yaml index e90beab..9456fb5 100644 --- a/example/distributed-with-obproxy-example.yaml +++ b/example/distributed-with-obproxy-example.yaml @@ -26,8 +26,8 @@ oceanbase-ce: enable_syslog_recycle: true max_syslog_file_count: 4 cluster_id: 1 - # observer cluster name, consistent with obproxy's cluster_name - appname: observer_cluster_name + root_password: # root user password, can be empty + proxyro_password: # proxyro user pasword, consistent with obproxy's observer_sys_password, can be empty # In this example , support multiple ob process in single node, so different process use different ports. # If deploy ob cluster in multiple nodes, the port and path setting can be same. z1: @@ -56,5 +56,5 @@ obproxy: # format: ip:mysql_port,ip:mysql_port rs_list: 192.168.1.2:2881;192.168.1.3:2881;192.168.1.4:2881 enable_cluster_checkout: false - # observer cluster name, consistent with oceanbase-ce's appname - cluster_name: observer_cluster_name \ No newline at end of file + obproxy_sys_password: # obproxy sys user password, can be empty + observer_sys_password: # proxyro user pasword, consistent with oceanbase-ce's proxyro_password, can be empty \ No newline at end of file diff --git a/example/local-example.yaml b/example/local-example.yaml index 2a74fa2..555dd79 100644 --- a/example/local-example.yaml +++ b/example/local-example.yaml @@ -17,4 +17,5 @@ oceanbase-ce: syslog_level: INFO enable_syslog_recycle: true max_syslog_file_count: 4 - cluster_id: 1 \ No newline at end of file + cluster_id: 1 + root_password: # root user password, can be empty \ No newline at end of file diff --git a/example/mini-distributed-example.yaml b/example/mini-distributed-example.yaml index 0742c6c..14479d4 100644 --- a/example/mini-distributed-example.yaml +++ b/example/mini-distributed-example.yaml @@ -39,6 +39,7 @@ oceanbase-ce: syslog_level: INFO enable_syslog_recycle: true max_syslog_file_count: 4 + root_password: # root user password, can be empty z1: mysql_port: 2881 rpc_port: 2882 diff --git a/example/mini-distributed-with-obproxy-example.yaml b/example/mini-distributed-with-obproxy-example.yaml index 5ccfdce..221d6a9 100644 --- a/example/mini-distributed-with-obproxy-example.yaml +++ b/example/mini-distributed-with-obproxy-example.yaml @@ -39,8 +39,8 @@ oceanbase-ce: syslog_level: INFO enable_syslog_recycle: true max_syslog_file_count: 4 - # observer cluster name, consistent with obproxy's cluster_name - appname: observer_cluster_name + root_password: # root user password, can be empty + proxyro_password: # proxyro user pasword, consistent with obproxy's observer_sys_password, can be empty z1: mysql_port: 2881 rpc_port: 2882 @@ -67,5 +67,5 @@ obproxy: # format: ip:mysql_port,ip:mysql_port rs_list: 192.168.1.2:2881;192.168.1.3:2881;192.168.1.4:2881 enable_cluster_checkout: false - # observer cluster name, consistent with oceanbase-ce's appname - cluster_name: observer_cluster_name \ No newline at end of file + obproxy_sys_password: # obproxy sys user password, can be empty + observer_sys_password: # proxyro user pasword, consistent with oceanbase-ce's proxyro_password, can be empty \ No newline at end of file diff --git a/example/mini-local-example.yaml b/example/mini-local-example.yaml index 399dc33..491f6b2 100755 --- a/example/mini-local-example.yaml +++ b/example/mini-local-example.yaml @@ -31,3 +31,4 @@ oceanbase-ce: syslog_level: INFO enable_syslog_recycle: true max_syslog_file_count: 4 + root_password: # root user password, can be empty diff --git a/example/mini-single-example.yaml b/example/mini-single-example.yaml index cccae8c..3c3813b 100755 --- a/example/mini-single-example.yaml +++ b/example/mini-single-example.yaml @@ -38,3 +38,4 @@ oceanbase-ce: syslog_level: INFO enable_syslog_recycle: true max_syslog_file_count: 4 + root_password: # root user password, can be empty diff --git a/example/mini-single-with-obproxy-example.yaml b/example/mini-single-with-obproxy-example.yaml index 74a63cb..c22a5e7 100644 --- a/example/mini-single-with-obproxy-example.yaml +++ b/example/mini-single-with-obproxy-example.yaml @@ -38,8 +38,8 @@ oceanbase-ce: syslog_level: INFO enable_syslog_recycle: true max_syslog_file_count: 4 - # observer cluster name, consistent with obproxy's cluster_name - appname: observer_cluster_name + root_password: # root user password, can be empty + proxyro_password: # proxyro user pasword, consistent with obproxy's observer_sys_password, can be empty obproxy: servers: - 192.168.1.2 @@ -51,5 +51,5 @@ obproxy: # format: ip:mysql_port,ip:mysql_port rs_list: 192.168.1.3:2881 enable_cluster_checkout: false - # observer cluster name, consistent with oceanbase-ce's appname - cluster_name: observer_cluster_name \ No newline at end of file + obproxy_sys_password: # obproxy sys user password, can be empty + observer_sys_password: # proxyro user pasword, consistent with oceanbase-ce's proxyro_password, can be empty \ No newline at end of file diff --git a/example/single-example.yaml b/example/single-example.yaml index fe1de55..bd6bade 100644 --- a/example/single-example.yaml +++ b/example/single-example.yaml @@ -24,4 +24,5 @@ oceanbase-ce: syslog_level: INFO enable_syslog_recycle: true max_syslog_file_count: 4 - cluster_id: 1 \ No newline at end of file + cluster_id: 1 + root_password: # root user password, can be empty \ No newline at end of file diff --git a/example/single-with-obproxy-example.yaml b/example/single-with-obproxy-example.yaml index f0ab712..91f679f 100644 --- a/example/single-with-obproxy-example.yaml +++ b/example/single-with-obproxy-example.yaml @@ -14,13 +14,10 @@ oceanbase-ce: # Please set devname as the network adaptor's name whose ip is in the setting of severs. # if set severs as "127.0.0.1", please set devname as "lo" # if current ip is 192.168.1.10, and the ip's network adaptor's name is "eth0", please use "eth0" + devname: eth0 mysql_port: 2881 rpc_port: 2882 zone: zone1 - # Please set devname as the network adaptor's name whose ip is in the setting of severs. - # if set severs as "127.0.0.1", please set devname as "lo" - # if current ip is 192.168.1.10, and the ip's network adaptor's name is "eth0", please use "eth0" - devname: eth0 # if current hardware's memory capacity is smaller than 50G, please use the setting of "mini-single-example.yaml" and do a small adjustment. memory_limit: 64G datafile_disk_percentage: 20 @@ -28,8 +25,8 @@ oceanbase-ce: enable_syslog_recycle: true max_syslog_file_count: 4 cluster_id: 1 - # observer cluster name, consistent with obproxy's cluster_name - appname: observer_cluster_name + root_password: # root user password, can be empty + proxyro_password: # proxyro user pasword, consistent with obproxy's observer_sys_password, can be empty obproxy: servers: - 192.168.1.2 @@ -41,5 +38,5 @@ obproxy: # format: ip:mysql_port,ip:mysql_port rs_list: 192.168.1.3:2881 enable_cluster_checkout: false - # observer cluster name, consistent with oceanbase-ce's appname - cluster_name: observer_cluster_name \ No newline at end of file + obproxy_sys_password: # obproxy sys user password, can be empty + observer_sys_password: # proxyro user pasword, consistent with oceanbase-ce's proxyro_password, can be empty \ No newline at end of file diff --git a/ob-deploy.spec b/ob-deploy.spec new file mode 100644 index 0000000..d5420ec --- /dev/null +++ b/ob-deploy.spec @@ -0,0 +1,108 @@ +Name: ob-deploy +Version: 1.0.1 +Release: 1%{?dist} +# if you want use the parameter of rpm_create on build time, +# uncomment below +Summary: ob-deploy +Group: Development/Tools +License: GPL +Url: git@github.com:oceanbase/obdeploy.git +BuildRoot: %_topdir/BUILDROOT +%define debug_package %{nil} +%define __os_install_post %{nil} + + +# uncomment below, if your building depend on other packages + +# uncomment below, if depend on other packages + +Autoreq: 0 +# BuildRequires: mariadb-devel + + +%description +# if you want publish current svn URL or Revision use these macros +ob-deploy + +%debug_package +# support debuginfo package, to reduce runtime package size + +# prepare your files +# OLDPWD is the dir of rpm_create running +# _prefix is an inner var of rpmbuild, +# can set by rpm_create, default is "/home/a" +# _lib is an inner var, maybe "lib" or "lib64" depend on OS + +# create dirs +%install +SRC_DIR=$OLDPWD +BUILD_DIR=$OLDPWD/rpmbuild +rm -fr cd $SRC_DIR/mirror/remote && mkdir -p $SRC_DIR/mirror/remote && cd $SRC_DIR/mirror/remote +wget https://mirrors.aliyun.com/oceanbase/OceanBase.repo +cd $SRC_DIR/ +rm -rf $BUILD_DIR build.log ${RPM_BUILD_ROOT} build dist obd.spec +CID=`git log |head -n1 | awk -F' ' '{print $2}'` +BRANCH=`git branch | grep -e "^\*" | awk -F' ' '{print $2}'` +DATE=`date '+%b %d %Y %H:%M:%S'` +cat _cmd.py | sed "s//$CID/" | sed "s//$BRANCH/" | sed "s//$DATE/" > obd.py +mkdir -p $BUILD_DIR/SOURCES ${RPM_BUILD_ROOT} +mkdir -p $BUILD_DIR/SOURCES/{site-packages} +mkdir -p ${RPM_BUILD_ROOT}/usr/bin +mkdir -p ${RPM_BUILD_ROOT}/usr/obd +pip install -r plugins-requirements3.txt --target=$BUILD_DIR/SOURCES/site-packages +pyinstaller --hidden-import=decimal --hidden-import=configparser -F obd.py +rm -f obd.py obd.spec +\cp -rf $SRC_DIR/dist/obd ${RPM_BUILD_ROOT}/usr/bin/obd +\cp -rf $SRC_DIR/plugins $BUILD_DIR/SOURCES/plugins +\rm -fr $BUILD_DIR/SOURCES/plugins/oceanbase-ce +\cp -rf $SRC_DIR/profile/ $BUILD_DIR/SOURCES/ +\cp -rf $SRC_DIR/mirror/ $BUILD_DIR/SOURCES/ +\cp -rf $BUILD_DIR/SOURCES/plugins ${RPM_BUILD_ROOT}/usr/obd/ +\cp -rf $BUILD_DIR/SOURCES/mirror ${RPM_BUILD_ROOT}/usr/obd/ +mkdir -p ${RPM_BUILD_ROOT}/etc/profile.d/ +\cp -rf $BUILD_DIR/SOURCES/profile/* ${RPM_BUILD_ROOT}/etc/profile.d/ +mkdir -p ${RPM_BUILD_ROOT}/usr/obd/lib/ +\cp -rf $BUILD_DIR/SOURCES/site-packages ${RPM_BUILD_ROOT}/usr/obd/lib/site-packages +cd ${RPM_BUILD_ROOT}/usr/obd/plugins && ln -s oceanbase oceanbase-ce + +# package infomation +%files +# set file attribute here +%defattr(-,root,root,0777) +# need not list every file here, keep it as this +/usr/bin/obd +/usr/obd/* +/etc/profile.d/* +## create an empy dir + + +## need bakup old config file, so indicate here + + +## or need keep old config file, so indicate with "noreplace" + +## indicate the dir for crontab + + +%post +# chkconfig: 2345 10 90 +# description: obd .... +chmod +x /usr/bin/obd +#mkdir -p /usr/obd/ && cp -rf /root/.obd/plugins /usr/obd/plugins +#chmod 744 /root/.obd/plugins/* +chmod -R 755 /usr/obd/* +chown -R root:root /usr/obd/* +find /usr/obd -type f -exec chmod 644 {} \; +echo -e 'Installation of obd finished successfully\nPlease source /etc/profile.d/obd.sh to enable it' +#/sbin/chkconfig --add obd +#/sbin/chkconfig obd on + +%changelog +* Mon jun 28 2021 obd 1.0.1 + - support configuration password + - Multi-level checks before start + - new features: obd cluster upgrade + - new features: obd update + - cancel the timeout limit for waiting for the cluster to initialize + - new configuration item for store log + - support SUSE,Ubuntu etc. \ No newline at end of file diff --git a/plugins/obproxy/3.1.0/bootstrap.py b/plugins/obproxy/3.1.0/bootstrap.py index 9b55c16..dccf1b4 100644 --- a/plugins/obproxy/3.1.0/bootstrap.py +++ b/plugins/obproxy/3.1.0/bootstrap.py @@ -30,7 +30,7 @@ def bootstrap(plugin_context, cursor, *args, **kwargs): if key in server_config and server_config[key]: try: sql = 'alter proxyconfig set %s = %%s' % key - value = server_config[key] + value = str(server_config[key]) stdio.verbose('execute sql: %s' % (sql % value)) cursor[server].execute(sql, [value]) except: diff --git a/plugins/obproxy/3.1.0/connect.py b/plugins/obproxy/3.1.0/connect.py index e114563..4392722 100644 --- a/plugins/obproxy/3.1.0/connect.py +++ b/plugins/obproxy/3.1.0/connect.py @@ -28,17 +28,22 @@ else: import pymysql as mysql -def _connect(ip, port, user): +stdio = None + + +def _connect(ip, port, user, password=''): + stdio.verbose('connect %s -P%s -u%s -p%s' % (ip, port, user, password)) if sys.version_info.major == 2: - db = mysql.connect(host=ip, user=user, port=int(port)) + db = mysql.connect(host=ip, user=user, port=int(port), passwd=str(password)) cursor = db.cursor(cursorclass=mysql.cursors.DictCursor) else: - db = mysql.connect(host=ip, user=user, port=int(port), cursorclass=mysql.cursors.DictCursor) + db = mysql.connect(host=ip, user=user, port=int(port), password=str(password), cursorclass=mysql.cursors.DictCursor) cursor = db.cursor() return db, cursor def connect(plugin_context, target_server=None, sys_root=True, *args, **kwargs): + global stdio count = 10 cluster_config = plugin_context.cluster_config stdio = plugin_context.stdio @@ -61,7 +66,8 @@ def connect(plugin_context, target_server=None, sys_root=True, *args, **kwargs): for server in servers: try: server_config = cluster_config.get_server_conf(server) - db, cursor = _connect(server.ip, server_config['listen_port'], user) + pwd_key = 'obproxy_sys_password' if sys_root else 'observer_sys_password' + db, cursor = _connect(server.ip, server_config['listen_port'], user, server_config.get(pwd_key, '') if count % 2 else '') dbs[server] = db cursors[server] = cursor except: @@ -70,7 +76,7 @@ def connect(plugin_context, target_server=None, sys_root=True, *args, **kwargs): servers = tmp_servers servers and time.sleep(3) - if count and servers: + if servers: stdio.stop_loading('fail') return plugin_context.return_false() else: diff --git a/plugins/obproxy/3.1.0/destroy.py b/plugins/obproxy/3.1.0/destroy.py index e27c381..58c0618 100644 --- a/plugins/obproxy/3.1.0/destroy.py +++ b/plugins/obproxy/3.1.0/destroy.py @@ -28,7 +28,7 @@ def destroy(plugin_context, *args, **kwargs): if not ret: # pring stderror global_ret = False - stdio.warn('fail to clean %s:%s', (server, path)) + stdio.warn('fail to clean %s:%s' % (server, path)) else: stdio.verbose('%s:%s cleaned' % (server, path)) cluster_config = plugin_context.cluster_config @@ -38,7 +38,7 @@ def destroy(plugin_context, *args, **kwargs): stdio.start_loading('obproxy work dir cleaning') for server in cluster_config.servers: server_config = cluster_config.get_server_conf(server) - stdio.verbose('%s work path cleaning', server) + stdio.verbose('%s work path cleaning' % server) clean(server, server_config['home_path']) if global_ret: stdio.stop_loading('succeed') diff --git a/plugins/obproxy/3.1.0/start.py b/plugins/obproxy/3.1.0/start.py index 6fd0423..8e06a08 100644 --- a/plugins/obproxy/3.1.0/start.py +++ b/plugins/obproxy/3.1.0/start.py @@ -29,7 +29,7 @@ stdio = None def get_port_socket_inode(client, port): port = hex(port)[2:].zfill(4).upper() - cmd = "cat /proc/net/{tcp,udp} | awk -F' ' '{print $2,$10}' | grep '00000000:%s' | awk -F' ' '{print $2}' | uniq" % port + cmd = "bash -c 'cat /proc/net/{tcp,udp}' | awk -F' ' '{print $2,$10}' | grep '00000000:%s' | awk -F' ' '{print $2}' | uniq" % port res = client.execute_command(cmd) if not res or not res.stdout.strip(): return False @@ -77,7 +77,7 @@ def is_started(client, remote_bin_path, port, home_path, command): return False return confirm_home_path(client, pid, home_path) and confirm_command(client, pid, command) -def start(plugin_context, home_path, repository_dir, *args, **kwargs): +def start(plugin_context, local_home_path, repository_dir, *args, **kwargs): global stdio cluster_config = plugin_context.cluster_config clients = plugin_context.clients @@ -99,11 +99,14 @@ def start(plugin_context, home_path, repository_dir, *args, **kwargs): if error: return plugin_context.return_false() + servers_remote_home_path = {} stdio.start_loading('Start obproxy') for server in cluster_config.servers: client = clients[server] remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip() - remote_bin_path[server] = bin_path.replace(home_path, remote_home_path) + servers_remote_home_path[server] = remote_home_path + remote_bin_path[server] = bin_path.replace(local_home_path, remote_home_path) + server_config = cluster_config.get_server_conf(server) pid_path[server] = "%s/run/obproxy-%s-%s.pid" % (server_config['home_path'], server.ip, server_config["listen_port"]) @@ -113,10 +116,11 @@ def start(plugin_context, home_path, repository_dir, *args, **kwargs): 'rs_list', 'cluster_name' ] + start_unuse = ['home_path', 'observer_sys_password', 'obproxy_sys_password'] get_value = lambda key: "'%s'" % server_config[key] if isinstance(server_config[key], str) else server_config[key] opt_str = [] for key in server_config: - if key != 'home_path' and key not in not_opt_str: + if key not in start_unuse and key not in not_opt_str: value = get_value(key) opt_str.append('%s=%s' % (key, value)) cmd = ['-o %s' % ','.join(opt_str)] @@ -138,14 +142,17 @@ def start(plugin_context, home_path, repository_dir, *args, **kwargs): if remote_pid: ret = client.execute_command('cat /proc/%s/cmdline' % remote_pid) if ret: - if ret.stdout.strip() == cmd: + if ret.stdout.replace('\0', '') == cmd.strip().replace(' ', ''): continue stdio.stop_loading('fail') stdio.error('%s:%s port is already used' % (server.ip, port)) return plugin_context.return_false() stdio.verbose('starting %s obproxy', server) + remote_repository_path = repository_dir.replace(local_home_path, remote_home_path) + client.add_env('LD_LIBRARY_PATH', '%s/lib:' % remote_repository_path, True) ret = client.execute_command(clusters_cmd[server]) + client.add_env('LD_LIBRARY_PATH', '', True) if not ret: stdio.stop_loading('fail') stdio.error('failed to start %s obproxy: %s' % (server, ret.stderr)) diff --git a/plugins/obproxy/3.1.0/start_check.py b/plugins/obproxy/3.1.0/start_check.py index a816d52..9954ff2 100644 --- a/plugins/obproxy/3.1.0/start_check.py +++ b/plugins/obproxy/3.1.0/start_check.py @@ -22,11 +22,12 @@ from __future__ import absolute_import, division, print_function stdio = None +success = True def get_port_socket_inode(client, port): port = hex(port)[2:].zfill(4).upper() - cmd = "cat /proc/net/{tcp,udp} | awk -F' ' '{print $2,$10}' | grep '00000000:%s' | awk -F' ' '{print $2}' | uniq" % port + cmd = "bash -c 'cat /proc/net/{tcp,udp}' | awk -F' ' '{print $2,$10}' | grep '00000000:%s' | awk -F' ' '{print $2}' | uniq" % port res = client.execute_command(cmd) if not res or not res.stdout.strip(): return False @@ -34,17 +35,36 @@ def get_port_socket_inode(client, port): return res.stdout.strip().split('\n') -def start_check(plugin_context, alert_lv='error', *args, **kwargs): +def start_check(plugin_context, strict_check=False, *args, **kwargs): + def alert(*arg, **kwargs): + global success + if strict_check: + success = False + stdio.error(*arg, **kwargs) + else: + stdio.warn(*arg, **kwargs) + def critical(*arg, **kwargs): + global success + success = False + stdio.error(*arg, **kwargs) global stdio cluster_config = plugin_context.cluster_config clients = plugin_context.clients stdio = plugin_context.stdio - success = True - alert = getattr(stdio, alert_lv) servers_port = {} + stdio.start_loading('Check before start obproxy') for server in cluster_config.servers: ip = server.ip client = clients[server] + server_config = cluster_config.get_server_conf(server) + port = int(server_config["listen_port"]) + prometheus_port = int(server_config["prometheus_listen_port"]) + remote_pid_path = "%s/run/obproxy-%s-%s.pid" % (server_config['home_path'], server.ip, server_config["listen_port"]) + remote_pid = client.execute_command("cat %s" % remote_pid_path).stdout.strip() + if remote_pid: + if client.execute_command('ls /proc/%s' % remote_pid): + continue + if ip not in servers_port: servers_port[ip] = {} ports = servers_port[ip] @@ -53,15 +73,18 @@ def start_check(plugin_context, alert_lv='error', *args, **kwargs): for key in ['listen_port', 'prometheus_listen_port']: port = int(server_config[key]) if port in ports: - alert('%s: %s port is used for %s\'s %s' % (server, port, ports[port]['server'], ports[port]['key'])) - success = False + alert_f = alert if key == 'prometheus_listen_port' else critical + alert_f('Configuration conflict %s: %s port is used for %s\'s %s' % (server, port, ports[port]['server'], ports[port]['key'])) continue ports[port] = { 'server': server, 'key': key } if get_port_socket_inode(client, port): - alert('%s:%s port is already used' % (ip, port)) - success = False + critical('%s:%s port is already used' % (ip, port)) + if success: - plugin_context.return_true() \ No newline at end of file + stdio.stop_loading('succeed') + plugin_context.return_true() + else: + stdio.stop_loading('fail') \ No newline at end of file diff --git a/plugins/obproxy/3.1.0/stop.py b/plugins/obproxy/3.1.0/stop.py index 8b0e96e..2bc9243 100644 --- a/plugins/obproxy/3.1.0/stop.py +++ b/plugins/obproxy/3.1.0/stop.py @@ -28,7 +28,7 @@ stdio = None def get_port_socket_inode(client, port): port = hex(port)[2:].zfill(4).upper() - cmd = "cat /proc/net/{tcp,udp} | awk -F' ' '{print $2,$10}' | grep '00000000:%s' | awk -F' ' '{print $2}' | uniq" % port + cmd = "bash -c 'cat /proc/net/{tcp,udp}' | awk -F' ' '{print $2,$10}' | grep '00000000:%s' | awk -F' ' '{print $2}' | uniq" % port res = client.execute_command(cmd) inode = res.stdout.strip() if not res or not inode: diff --git a/plugins/obproxy/3.1.0/upgrade.py b/plugins/obproxy/3.1.0/upgrade.py new file mode 100644 index 0000000..ebd52bd --- /dev/null +++ b/plugins/obproxy/3.1.0/upgrade.py @@ -0,0 +1,41 @@ +# coding: utf-8 +# OceanBase Deploy. +# Copyright (C) 2021 OceanBase +# +# This file is part of OceanBase Deploy. +# +# OceanBase Deploy is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# OceanBase Deploy is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with OceanBase Deploy. If not, see . + + +from __future__ import absolute_import, division, print_function + + + + +def upgrade(plugin_context, stop_plugin, start_plugin, connect_plugin, display_plugin, *args, **kwargs): + components = plugin_context.components + clients = plugin_context.clients + cluster_config = plugin_context.cluster_config + cmd = plugin_context.cmd + options = plugin_context.options + stdio = plugin_context.stdio + + if not stop_plugin(components, clients, cluster_config, cmd, options, stdio, *args, **kwargs): + return + if not start_plugin(components, clients, cluster_config, cmd, options, stdio, *args, **kwargs): + return + + ret = connect_plugin(components, clients, cluster_config, cmd, options, stdio, *args, **kwargs) + if ret and display_plugin(components, clients, cluster_config, cmd, options, stdio, ret.get_return('cursor'), *args, **kwargs): + return plugin_context.return_true() diff --git a/plugins/oceanbase/3.1.0/bootstrap.py b/plugins/oceanbase/3.1.0/bootstrap.py index 447f6b7..b675b10 100644 --- a/plugins/oceanbase/3.1.0/bootstrap.py +++ b/plugins/oceanbase/3.1.0/bootstrap.py @@ -55,13 +55,19 @@ def bootstrap(plugin_context, cursor, *args, **kwargs): sql = 'grant select on oceanbase.* to proxyro IDENTIFIED BY "%s"' % value stdio.verbose(sql) cursor.execute(sql) + if global_conf.get('root_password'): + sql = 'alter user "root" IDENTIFIED BY "%s"' % global_conf.get('root_password') + stdio.verbose('execute sql: %s' % sql) + cursor.execute(sql) stdio.stop_loading('succeed') plugin_context.return_true() except: stdio.exception('') try: - cursor.execute('select * from oceanbase.__all_server') - servers = cursor.fetchall() + cursor.execute('select * from oceanbase.__all_rootservice_event_history where module = "bootstrap" and event = "bootstrap_succeed"') + event = cursor.fetchall() + if not event: + raise Exception('Not found bootstrap_succeed event') stdio.stop_loading('succeed') plugin_context.return_true() except: diff --git a/plugins/oceanbase/3.1.0/connect.py b/plugins/oceanbase/3.1.0/connect.py index 51bc6fb..5b47259 100644 --- a/plugins/oceanbase/3.1.0/connect.py +++ b/plugins/oceanbase/3.1.0/connect.py @@ -28,17 +28,23 @@ else: import pymysql as mysql -def _connect(ip, port): +stdio = None + + +def _connect(ip, port, password=''): + user = 'root' + stdio.verbose('connect %s -P%s -u%s -p%s' % (ip, port, user, password)) if sys.version_info.major == 2: - db = mysql.connect(host=ip, user='root', port=port) + db = mysql.connect(host=ip, user=user, port=int(port), passwd=str(password)) cursor = db.cursor(cursorclass=mysql.cursors.DictCursor) else: - db = mysql.connect(host=ip, user='root', port=port, cursorclass=mysql.cursors.DictCursor) + db = mysql.connect(host=ip, user=user, port=int(port), password=str(password), cursorclass=mysql.cursors.DictCursor) cursor = db.cursor() return db, cursor def connect(plugin_context, target_server=None, *args, **kwargs): + global stdio count = 10 cluster_config = plugin_context.cluster_config stdio = plugin_context.stdio @@ -54,7 +60,8 @@ def connect(plugin_context, target_server=None, *args, **kwargs): for server in servers: try: server_config = cluster_config.get_server_conf(server) - db, cursor = _connect(server.ip, server_config['mysql_port']) + password = server_config.get('root_password', '') if count % 2 else '' + db, cursor = _connect(server.ip, server_config['mysql_port'], password) stdio.stop_loading('succeed') return plugin_context.return_true(connect=db, cursor=cursor) except: diff --git a/plugins/oceanbase/3.1.0/display.py b/plugins/oceanbase/3.1.0/display.py index 5ae5281..b565f92 100644 --- a/plugins/oceanbase/3.1.0/display.py +++ b/plugins/oceanbase/3.1.0/display.py @@ -26,22 +26,23 @@ from prettytable import PrettyTable def display(plugin_context, cursor, *args, **kwargs): - count = 10 stdio = plugin_context.stdio stdio.start_loading('Wait for observer init') - while count > 0: - try: - cursor.execute('select * from oceanbase.__all_server') - servers = cursor.fetchall() - if servers: - stdio.print_list(servers, ['ip', 'version', 'port', 'zone', 'status'], - lambda x: [x['svr_ip'], x['build_version'].split('_')[0], x['inner_port'], x['zone'], x['status']], title='observer') - stdio.stop_loading('succeed') - return plugin_context.return_true() - except Exception as e: - if e.args[0] != 1146: - raise e - count -= 1 - time.sleep(3) - stdio.stop_loading('fail', 'observer need bootstarp') + try: + while True: + try: + cursor.execute('select * from oceanbase.__all_server') + servers = cursor.fetchall() + if servers: + stdio.print_list(servers, ['ip', 'version', 'port', 'zone', 'status'], + lambda x: [x['svr_ip'], x['build_version'].split('_')[0], x['inner_port'], x['zone'], x['status']], title='observer') + stdio.stop_loading('succeed') + return plugin_context.return_true() + except Exception as e: + if e.args[0] != 1146: + raise e + time.sleep(3) + except: + stdio.stop_loading('fail', 'observer need bootstarp') + stdio.exception('') plugin_context.return_false() diff --git a/plugins/oceanbase/3.1.0/init.py b/plugins/oceanbase/3.1.0/init.py index 9115c15..92b9f78 100644 --- a/plugins/oceanbase/3.1.0/init.py +++ b/plugins/oceanbase/3.1.0/init.py @@ -19,60 +19,136 @@ from __future__ import absolute_import, division, print_function +import os + + +stdio = None +force = False +global_ret = True + + +def critical(*arg, **kwargs): + global global_ret + global_ret = False + stdio.error(*arg, **kwargs) + +def init_dir(server, client, key, path, link_path=None): + if force: + ret = client.execute_command('rm -fr %s/*' % path) + if not ret: + critical('fail to initialize %s %s path: %s permission denied' % (server, key, ret.stderr)) + return False + else: + if client.execute_command('mkdir -p %s' % path): + ret = client.execute_command('ls %s' % (path)) + if not ret or ret.stdout.strip(): + critical('fail to initialize %s %s path: %s is not empty' % (server, key, path)) + return False + else: + critical('fail to initialize %s %s path: create %s failed' % (server, key, path)) + return False + ret = client.execute_command('mkdir -p %s' % path) + if ret: + if link_path: + client.execute_command("if [ '%s' -ef '%s' ]; then ln -sf %s %s; fi" % (path, link_path, path, link_path)) + return True + else: + critical('fail to initialize %s %s path: %s permission denied' % (server, key, ret.stderr)) + return False def init(plugin_context, *args, **kwargs): + global stdio, force cluster_config = plugin_context.cluster_config clients = plugin_context.clients stdio = plugin_context.stdio - global_ret = True + servers_dirs = {} force = getattr(plugin_context.options, 'force', False) stdio.verbose('option `force` is %s' % force) for server in cluster_config.servers: + ip = server.ip + if ip not in servers_dirs: + servers_dirs[ip] = {} + dirs = servers_dirs[ip] server_config = cluster_config.get_server_conf(server) client = clients[server] home_path = server_config['home_path'] - stdio.print('%s initializes cluster work home', server) + if 'data_dir' not in server_config: + server_config['data_dir'] = '%s/store' % home_path + if 'clog_dir' not in server_config: + server_config['clog_dir'] = '%s/clog' % server_config['data_dir'] + if 'ilog_dir' not in server_config: + server_config['ilog_dir'] = '%s/ilog' % server_config['data_dir'] + if 'slog_dir' not in server_config: + server_config['slog_dir'] = '%s/slog' % server_config['data_dir'] + for key in ['home_path', 'data_dir', 'clog_dir', 'ilog_dir', 'slog_dir']: + path = server_config[key] + if path in dirs: + critical('Configuration conflict %s: %s is used for %s\'s %s' % (server, path, dirs[path]['server'], dirs[path]['key'])) + continue + dirs[path] = { + 'server': server, + 'key': key, + } + + stdio.print('%s initializes cluster work home' % server) if force: ret = client.execute_command('rm -fr %s/*' % home_path) if not ret: - global_ret = False - stdio.error('failed to initialize %s home path: %s' % (server, ret.stderr)) + critical('failed to initialize %s home path: %s' % (server, ret.stderr)) continue else: if client.execute_command('mkdir -p %s' % home_path): ret = client.execute_command('ls %s' % (home_path)) if not ret or ret.stdout.strip(): - global_ret = False - stdio.error('fail to init %s home path: %s is not empty' % (server, home_path)) + critical('fail to init %s home path: %s is not empty' % (server, home_path)) continue else: - stdio.error('fail to init %s home path: create %s failed' % (server, home_path)) - ret = client.execute_command('mkdir -p %s/{etc,admin,.conf,log}' % home_path) + critical('fail to init %s home path: create %s failed' % (server, home_path)) + ret = client.execute_command('bash -c "mkdir -p %s/{etc,admin,.conf,log}"' % home_path) if ret: - data_path = server_config['data_dir'] if 'data_dir' in server_config else '%s/store' % home_path + data_path = server_config['data_dir'] if force: ret = client.execute_command('rm -fr %s/*' % data_path) if not ret: - global_ret = False - stdio.error('fail to init %s data path: %s permission denied' % (server, ret.stderr)) + critical('fail to init %s data path: %s permission denied' % (server, ret.stderr)) continue else: if client.execute_command('mkdir -p %s' % data_path): ret = client.execute_command('ls %s' % (data_path)) if not ret or ret.stdout.strip(): - global_ret = False - stdio.error('fail to init %s data path: %s is not empty' % (server, data_path)) + critical('fail to init %s data path: %s is not empty' % (server, data_path)) continue else: - stdio.error('fail to init %s data path: create %s failed' % (server, data_path)) - ret = client.execute_command('mkdir -p %s/{sstable,clog,ilog,slog}' % data_path) + critical('fail to init %s data path: create %s failed' % (server, data_path)) + ret = client.execute_command('mkdir -p %s/sstable' % data_path) if ret: - data_path != '%s/store' % home_path and client.execute_command('ln -sf %s %s/store' % (data_path, home_path)) + link_path = '%s/store' % home_path + client.execute_command("if [ '%s' -ef '%s' ]; then ln -sf %s %s; fi" % (path, link_path, path, link_path)) + for key in ['clog', 'ilog', 'slog']: + # init_dir(server, client, key, server_config['%s_dir' % key], os.path.join(data_path, key)) + log_dir = server_config['%s_dir' % key] + if force: + ret = client.execute_command('rm -fr %s/*' % log_dir) + if not ret: + critical('fail to init %s %s dir: %s permission denied' % (server, key, ret.stderr)) + continue + else: + if client.execute_command('mkdir -p %s' % log_dir): + ret = client.execute_command('ls %s' % (log_dir)) + if not ret or ret.stdout.strip(): + critical('fail to init %s %s dir: %s is not empty' % (server, key, log_dir)) + continue + else: + critical('fail to init %s %s dir: create %s failed' % (server, key, log_dir)) + ret = client.execute_command('mkdir -p %s' % log_dir) + if ret: + link_path = '%s/%s' % (data_path, key) + client.execute_command("if [ '%s' -ef '%s' ]; then ln -sf %s %s; fi" % (path, link_path, path, link_path)) + else: + critical('failed to initialize %s %s dir' % (server, key)) else: - global_ret = False - stdio.error('failed to initialize %s date path', server) + critical('failed to initialize %s date path' % (server)) else: - global_ret = False - stdio.error('fail to init %s home path: %s permission denied' % (server, ret.stderr)) + critical('fail to init %s home path: %s permission denied' % (server, ret.stderr)) global_ret and plugin_context.return_true() diff --git a/plugins/oceanbase/3.1.0/parameter.yaml b/plugins/oceanbase/3.1.0/parameter.yaml index e6a3a49..a807c72 100644 --- a/plugins/oceanbase/3.1.0/parameter.yaml +++ b/plugins/oceanbase/3.1.0/parameter.yaml @@ -21,6 +21,27 @@ need_redeploy: true description_en: the directory for the data file description_local: 存储sstable等数据的目录 +- name: clog_dir + type: STRING + min_value: NULL + max_value: NULL + need_redeploy: true + description_en: the directory for the clog file + description_local: 存储clog数据的目录 +- name: slog_dir + type: STRING + min_value: NULL + max_value: NULL + need_redeploy: true + description_en: the directory for the slog file + description_local: 存储slog数据的目录 +- name: ilog_dir + type: STRING + min_value: NULL + max_value: NULL + need_redeploy: true + description_en: the directory for the ilog file + description_local: 存储ilog数据的目录 - name: devname type: STRING min_value: NULL @@ -2129,7 +2150,15 @@ default: '' min_value: NULL max_value: NULL - section: LOAD_BALANCE need_restart: false description_en: password of observer proxyro user - description_local: proxyro用户的密码 \ No newline at end of file + description_local: proxyro用户的密码 +- name: root_password + require: false + type: STRING + default: '' + min_value: NULL + max_value: NULL + need_restart: false + description_en: password of observer root user + description_local: sys租户root用户的密码 \ No newline at end of file diff --git a/plugins/oceanbase/3.1.0/reload.py b/plugins/oceanbase/3.1.0/reload.py index d564c0d..d47025c 100644 --- a/plugins/oceanbase/3.1.0/reload.py +++ b/plugins/oceanbase/3.1.0/reload.py @@ -51,12 +51,13 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs): for key in global_change_conf: sql = '' try: - if key == 'proxyro_password': + if key in ['proxyro_password', 'root_password']: if global_change_conf[key] != servers_num: stdio.warn('Invalid: proxyro_password is not a single server configuration item') continue value = change_conf[server][key] - sql = 'alter user "proxyro" IDENTIFIED BY "%s"' % value + user = key.split('_')[0] + sql = 'alter user "%s" IDENTIFIED BY "%s"' % (user, value if value else '') stdio.verbose('execute sql: %s' % sql) cursor.execute(sql) continue diff --git a/plugins/oceanbase/3.1.0/start.py b/plugins/oceanbase/3.1.0/start.py index 20a0a4f..7a976c9 100644 --- a/plugins/oceanbase/3.1.0/start.py +++ b/plugins/oceanbase/3.1.0/start.py @@ -92,9 +92,11 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs): root_servers[zone] = '%s:%s:%s' % (server.ip, config['rpc_port'], config['mysql_port']) rs_list_opt = '-r \'%s\'' % ';'.join([root_servers[zone] for zone in root_servers]) + servers_remote_home_path = {} for server in cluster_config.servers: client = clients[server] remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip() + servers_remote_home_path[server] = remote_home_path remote_bin_path = bin_path.replace(local_home_path, remote_home_path) server_config = cluster_config.get_server_conf(server) @@ -154,7 +156,10 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs): for server in clusters_cmd: client = clients[server] stdio.verbose('starting %s observer', server) + remote_repository_path = repository_dir.replace(local_home_path, remote_home_path) + client.add_env('LD_LIBRARY_PATH', '%s/lib:' % remote_repository_path, True) ret = client.execute_command(clusters_cmd[server]) + client.add_env('LD_LIBRARY_PATH', '', True) if not ret: stdio.stop_loading('fail') stdio.error('failed to start %s observer: %s' % (server, ret.stderr)) diff --git a/plugins/oceanbase/3.1.0/start_check.py b/plugins/oceanbase/3.1.0/start_check.py index 77516de..f3ce6a4 100644 --- a/plugins/oceanbase/3.1.0/start_check.py +++ b/plugins/oceanbase/3.1.0/start_check.py @@ -40,7 +40,7 @@ def parse_size(size): def get_port_socket_inode(client, port): port = hex(port)[2:].zfill(4).upper() - cmd = "cat /proc/net/{tcp,udp} | awk -F' ' '{print $2,$10}' | grep '00000000:%s' | awk -F' ' '{print $2}' | uniq" % port + cmd = "bash -c 'cat /proc/net/{tcp,udp}' | awk -F' ' '{print $2,$10}' | grep '00000000:%s' | awk -F' ' '{print $2}' | uniq" % port res = client.execute_command(cmd) if not res or not res.stdout.strip(): return False @@ -48,23 +48,59 @@ def get_port_socket_inode(client, port): return res.stdout.strip().split('\n') -def start_check(plugin_context, alert_lv='error', *args, **kwargs): +def parse_size(size): + _bytes = 0 + if not isinstance(size, str) or size.isdigit(): + _bytes = int(size) + else: + units = {"B": 1, "K": 1<<10, "M": 1<<20, "G": 1<<30, "T": 1<<40} + match = re.match(r'([1-9][0-9]*)([B,K,M,G,T])', size) + _bytes = int(match.group(1)) * units[match.group(2)] + return _bytes + + +def formate_size(size): + units = ['B', 'K', 'M', 'G', 'T', 'P'] + idx = 0 + while idx < 5 and size >= 1024: + size /= 1024.0 + idx += 1 + return '%.1f%s' % (size, units[idx]) + + +def start_check(plugin_context, strict_check=False, *args, **kwargs): def alert(*arg, **kwargs): + global success + if strict_check: + success = False + stdio.error(*arg, **kwargs) + else: + stdio.warn(*arg, **kwargs) + def critical(*arg, **kwargs): global success success = False - alert_f(*arg, **kwargs) + stdio.error(*arg, **kwargs) global stdio cluster_config = plugin_context.cluster_config clients = plugin_context.clients stdio = plugin_context.stdio - alert_f = getattr(stdio, alert_lv) servers_clients = {} servers_port = {} servers_memory = {} servers_disk = {} + server_num = len(cluster_config.servers) + stdio.start_loading('Check before start observer') for server in cluster_config.servers: ip = server.ip client = clients[server] + server_config = cluster_config.get_server_conf_with_default(server) + home_path = server_config['home_path'] + remote_pid_path = '%s/run/observer.pid' % home_path + remote_pid = client.execute_command('cat %s' % remote_pid_path).stdout.strip() + if remote_pid: + if client.execute_command('ls /proc/%s' % remote_pid): + continue + servers_clients[ip] = client if ip not in servers_port: servers_disk[ip] = {} @@ -73,19 +109,18 @@ def start_check(plugin_context, alert_lv='error', *args, **kwargs): memory = servers_memory[ip] ports = servers_port[ip] disk = servers_disk[ip] - server_config = cluster_config.get_server_conf_with_default(server) stdio.verbose('%s port check' % server) for key in ['mysql_port', 'rpc_port']: port = int(server_config[key]) if port in ports: - alert('%s: %s port is used for %s\'s %s' % (server, port, ports[port]['server'], ports[port]['key'])) + critical('Configuration conflict %s: %s port is used for %s\'s %s' % (server, port, ports[port]['server'], ports[port]['key'])) continue ports[port] = { 'server': server, 'key': key } if get_port_socket_inode(client, port): - alert('%s:%s port is already used' % (ip, port)) + critical('%s:%s port is already used' % (ip, port)) if 'memory_limit' in server_config: memory['num'] += parse_size(server_config['memory_limit']) elif 'memory_limit_percentage' in server_config: @@ -93,54 +128,87 @@ def start_check(plugin_context, alert_lv='error', *args, **kwargs): else: memory['percentage'] += 80 data_path = server_config['data_dir'] if 'data_dir' in server_config else server_config['home_path'] - if data_path not in disk: - disk[data_path] = 0 - if 'datafile_disk_percentage' in server_config: - disk[data_path] += int(server_config['datafile_disk_percentage']) - else: - disk[data_path] += 90 + if not client.execute_command('ls %s/sstable/block_file' % data_path): + if data_path in disk: + critical('Same Path: %s in %s and %s' % (data_path, server, disk[data_path]['server'])) + continue + disk[data_path] = { + 'need': 90, + 'server': server + } + if 'datafile_size' in server_config and server_config['datafile_size']: + disk[data_path]['need'] = server_config['datafile_size'] + elif 'datafile_disk_percentage' in server_config and server_config['datafile_disk_percentage']: + disk[data_path]['need'] = int(server_config['datafile_disk_percentage']) for ip in servers_clients: client = servers_clients[ip] - ret = client.execute_command('cat /proc/sys/fs/aio-max-nr') - if not ret or not ret.stdout.strip().isdigit(): - alert('(%s) failed to get fs.aio-max-nr' % ip) - elif int(ret.stdout) < 1048576: - alert('(%s) fs.aio-max-nr must not be less than 1048576 (Current value: %s)' % (ip, ret.stdout.strip())) + ret = client.execute_command('cat /proc/sys/fs/aio-max-nr /proc/sys/fs/aio-nr') + if not ret: + alert('(%s) failed to get fs.aio-max-nr and fs.aio-nr' % ip) + else: + try: + max_nr, nr = ret.stdout.strip().split('\n') + max_nr, nr = int(max_nr), int(nr) + need = server_num * 20000 + if need > max_nr - nr: + critical('(%s) Insufficient AIO remaining (Avail: %s, Need: %s), The recommended value of fs.aio-max-nr is 1048576' % (ip, max_nr - nr, need)) + elif int(max_nr) < 1048576: + alert('(%s) The recommended value of fs.aio-max-nr is 1048576 (Current value: %s)' % (ip, max_nr)) + except: + alert('(%s) failed to get fs.aio-max-nr and fs.aio-nr' % ip) + stdio.exception('') ret = client.execute_command('ulimit -n') if not ret or not ret.stdout.strip().isdigit(): alert('(%s) failed to get open files number' % ip) - elif int(ret.stdout) < 655350: - alert('(%s) open files number must not be less than 655350 (Current value: %s)' % (ip, ret.stdout.strip())) + else: + max_of = int(ret.stdout) + need = server_num * 20000 + if need > max_of: + critical('(%s) open files number must not be less than %s (Current value: %s)' % (ip, need, max_of)) + elif max_of < 655350: + alert('(%s) The recommended number of open files is 655350 (Current value: %s)' % (ip, max_of)) # memory - if servers_memory[ip]['percentage'] > 100: - alert('(%s) not enough memory' % ip) - else: - ret = client.execute_command("free -b | grep Mem | awk -F' ' '{print $2, $4}'") - if ret: - total_memory, free_memory = ret.stdout.split(' ') - total_memory = int(total_memory) - free_memory = int(free_memory) - total_use = servers_memory[ip]['percentage'] * total_memory / 100 + servers_memory[ip]['num'] - if total_use > free_memory: - alert('(%s) not enough memory' % ip) + ret = client.execute_command("free -b | grep Mem | awk -F' ' '{print $2, $4}'") + if ret: + total_memory, free_memory = ret.stdout.split(' ') + total_memory = int(total_memory) + free_memory = int(free_memory) + total_use = servers_memory[ip]['percentage'] * total_memory / 100 + servers_memory[ip]['num'] + if total_use > free_memory: + critical('(%s) not enough memory. (Free: %s, Need: %s)' % (ip, formate_size(free_memory), formate_size(total_use))) # disk disk = {'/': 0} - ret = client.execute_command('df -h') + ret = client.execute_command('df --output=size,avail,target') if ret: - for v, p in re.findall('(\d+)%\s+(.+)', ret.stdout): - disk[p] = int(v) + for total, avail, path in re.findall('(\d+)\s+(\d+)\s+(.+)', ret.stdout): + disk[path] = { + 'toatl': int(total) << 10, + 'avail': int(avail) << 10, + 'need': 0 + } for path in servers_disk[ip]: kp = '/' for p in disk: if p in path: if len(p) > len(kp): kp = p - disk[kp] += servers_disk[ip][path] - if disk[kp] > 100: - alert('(%s) %s not enough disk space' % (ip, kp)) + need = servers_disk[ip][path]['need'] + if isinstance(need, int): + disk[kp]['need'] += disk[kp]['toatl'] * need / 100 + else: + disk[kp]['need'] += parse_size(need) + + for p in disk: + avail = disk[p]['avail'] + need = disk[p]['need'] + if need > avail: + critical('(%s) %s not enough disk space. (Avail: %s, Need: %s)' % (ip, kp, formate_size(avail), formate_size(need))) if success: - plugin_context.return_true() \ No newline at end of file + stdio.stop_loading('succeed') + plugin_context.return_true() + else: + stdio.stop_loading('fail') \ No newline at end of file diff --git a/plugins/oceanbase/3.1.0/stop.py b/plugins/oceanbase/3.1.0/stop.py index 3fc44c6..fd0cfb8 100644 --- a/plugins/oceanbase/3.1.0/stop.py +++ b/plugins/oceanbase/3.1.0/stop.py @@ -37,7 +37,7 @@ def config_url(ocp_config_server, appname, cid): def get_port_socket_inode(client, port): port = hex(port)[2:].zfill(4).upper() - cmd = "cat /proc/net/{tcp,udp} | awk -F' ' '{print $2,$10}' | grep '00000000:%s' | awk -F' ' '{print $2}' | uniq" % port + cmd = "bash -c 'cat /proc/net/{tcp,udp}' | awk -F' ' '{print $2,$10}' | grep '00000000:%s' | awk -F' ' '{print $2}' | uniq" % port res = client.execute_command(cmd) if not res or not res.stdout.strip(): return False @@ -86,16 +86,17 @@ def stop(plugin_context, *args, **kwargs): remote_pid = client.execute_command('cat %s' % remote_pid_path).stdout.strip() if remote_pid and client.execute_command('ps uax | egrep " %s " | grep -v grep' % remote_pid): stdio.verbose('%s observer[pid:%s] stopping ...' % (server, remote_pid)) - client.execute_command('kill -9 -%s; rm -f %s' % (remote_pid, remote_pid_path)) + client.execute_command('kill -9 %s' % (remote_pid)) servers[server] = { 'client': client, 'mysql_port': server_config['mysql_port'], 'rpc_port': server_config['rpc_port'], - 'pid': remote_pid + 'pid': remote_pid, + 'path': remote_pid_path } else: stdio.verbose('%s observer is not running ...' % server) - count = 10 + count = 30 check = lambda client, pid, port: confirm_port(client, pid, port) if count < 5 else get_port_socket_inode(client, port) time.sleep(1) while count and servers: @@ -109,6 +110,7 @@ def stop(plugin_context, *args, **kwargs): break data[key] = '' else: + client.execute_command('rm -f %s' % (data['path'])) stdio.verbose('%s observer is stopped', server) servers = tmp_servers count -= 1 diff --git a/plugins/oceanbase/3.1.0/upgrade.py b/plugins/oceanbase/3.1.0/upgrade.py new file mode 100644 index 0000000..4d9f7c4 --- /dev/null +++ b/plugins/oceanbase/3.1.0/upgrade.py @@ -0,0 +1,54 @@ +# coding: utf-8 +# OceanBase Deploy. +# Copyright (C) 2021 OceanBase +# +# This file is part of OceanBase Deploy. +# +# OceanBase Deploy is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# OceanBase Deploy is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with OceanBase Deploy. If not, see . + + +from __future__ import absolute_import, division, print_function + + + + +def upgrade(plugin_context, stop_plugin, start_plugin, connect_plugin, display_plugin, *args, **kwargs): + components = plugin_context.components + clients = plugin_context.clients + cluster_config = plugin_context.cluster_config + cmd = plugin_context.cmd + options = plugin_context.options + stdio = plugin_context.stdio + + zones_servers = {} + for server in cluster_config.servers: + config = cluster_config.get_server_conf(server) + zone = config['zone'] + if zone not in zones_servers: + zones_servers[zone] = [] + zones_servers[zone].append(server) + + all_servers = cluster_config.servers + for zone in zones_servers: + cluster_config.servers = zones_servers[zone] + stdio.print('upgrade zone "%s"' % zone) + if not stop_plugin(components, clients, cluster_config, cmd, options, stdio, *args, **kwargs): + return + if not start_plugin(components, clients, cluster_config, cmd, options, stdio, *args, **kwargs): + return + + cluster_config.servers = all_servers + ret = connect_plugin(components, clients, cluster_config, cmd, options, stdio, *args, **kwargs) + if ret and display_plugin(components, clients, cluster_config, cmd, options, stdio, ret.get_return('cursor'), *args, **kwargs): + return plugin_context.return_true() diff --git a/profile/obd.sh b/profile/obd.sh index 0152074..7c15fbc 100644 --- a/profile/obd.sh +++ b/profile/obd.sh @@ -10,8 +10,8 @@ function _obd_complete_func COMPREPLY=() cur="${COMP_WORDS[COMP_CWORD]}" prev="${COMP_WORDS[COMP_CWORD-1]}" - obd_cmd="mirror cluster test" - cluster_cmd="start deploy redeploy restart reload destroy stop edit-config list display" + obd_cmd="mirror cluster test update" + cluster_cmd="start deploy redeploy restart reload destroy stop edit-config list display upgrade" mirror_cmd="clone create list update" test_cmd="mysqltest" if [[ ${cur} == * ]] ; then diff --git a/tool.py b/tool.py index d98188f..259a784 100644 --- a/tool.py +++ b/tool.py @@ -27,8 +27,9 @@ import sys import stat import gzip import shutil +from ssh import LocalClient -from ruamel.yaml import YAML +from ruamel.yaml import YAML, YAMLContextManager, representer if sys.version_info.major == 2: from backports import lzma @@ -154,14 +155,14 @@ class DirectoryUtil(object): elif os.path.isdir(src_name): ret = DirectoryUtil.copy(src_name, dst_name, stdio) and ret else: - FileUtil.copy(src_name, dst_name) + FileUtil.copy(src_name, dst_name, stdio) for link_dest, dst_name in links: - DirectoryUtil.rm(dst_name, stdio) - os.symlink(link_dest, dst_name) + FileUtil.symlink(link_dest, dst_name, stdio) return ret @staticmethod def mkdir(path, mode=0o755, stdio=None): + stdio and getattr(stdio, 'verbose', print)('mkdir %s' % path) try: os.makedirs(path, mode=mode) return True @@ -180,6 +181,7 @@ class DirectoryUtil(object): @staticmethod def rm(path, stdio=None): + stdio and getattr(stdio, 'verbose', print)('rm %s' % path) try: if os.path.exists(path): if os.path.islink(path): @@ -209,6 +211,7 @@ class FileUtil(object): @staticmethod def copy(src, dst, stdio=None): + stdio and getattr(stdio, 'verbose', print)('copy %s %s' % (src, dst)) if os.path.exists(src) and os.path.exists(dst) and os.path.samefile(src, dst): info = "`%s` and `%s` are the same file" % (src, dst) if stdio: @@ -233,21 +236,39 @@ class FileUtil(object): try: if os.path.islink(src): - os.symlink(os.readlink(src), dst) + FileUtil.symlink(os.readlink(src), dst) return True - with FileUtil.open(src, 'rb') as fsrc: - with FileUtil.open(dst, 'wb') as fdst: + with FileUtil.open(src, 'rb') as fsrc, FileUtil.open(dst, 'wb') as fdst: FileUtil.copy_fileobj(fsrc, fdst) + os.chmod(dst, os.stat(src).st_mode) return True except Exception as e: - if stdio: + if int(getattr(e, 'errno', -1)) == 26: + if LocalClient.execute_command('/usr/bin/cp -f %s %s' % (src, dst), stdio=stdio): + return True + elif stdio: getattr(stdio, 'exception', print)('copy error') else: raise e return False + @staticmethod + def symlink(src, dst, stdio=None): + stdio and getattr(stdio, 'verbose', print)('link %s %s' % (src, dst)) + try: + if DirectoryUtil.rm(dst, stdio): + os.symlink(src, dst) + return True + except Exception as e: + if stdio: + getattr(stdio, 'exception', print)('link error') + else: + raise e + return False + @staticmethod def open(path, _type='r', stdio=None): + stdio and getattr(stdio, 'verbose', print)('open %s for %s' % (path, _type)) if os.path.exists(path): if os.path.isfile(path): return open(path, _type) @@ -269,6 +290,7 @@ class FileUtil(object): @staticmethod def unzip(source, ztype=None, stdio=None): + stdio and getattr(stdio, 'verbose', print)('unzip %s' % source) if not ztype: ztype = source.split('.')[-1] try: @@ -287,6 +309,7 @@ class FileUtil(object): @staticmethod def rm(path, stdio=None): + stdio and getattr(stdio, 'verbose', print)('rm %s' % path) if not os.path.exists(path): return True try: @@ -306,6 +329,8 @@ class YamlLoader(YAML): def __init__(self, stdio=None, typ=None, pure=False, output=None, plug_ins=None): super(YamlLoader, self).__init__(typ=typ, pure=pure, output=output, plug_ins=plug_ins) self.stdio = stdio + if not self.Representer.yaml_multi_representers and self.Representer.yaml_representers: + self.Representer.yaml_multi_representers = self.Representer.yaml_representers def load(self, stream): try: -- GitLab