diff --git a/_cmd.py b/_cmd.py index 07c2d02fa2fe7f77b20d01898fa7a7673e44247f..ae249e52232664e1b75b2ca90cabf3590cf6c283 100644 --- a/_cmd.py +++ b/_cmd.py @@ -25,15 +25,17 @@ import os import sys import time import logging +import textwrap from logging import handlers from uuid import uuid1 as uuid -from optparse import OptionParser, OptionGroup, BadOptionError, Option +from optparse import OptionParser, OptionGroup, BadOptionError, Option, IndentedHelpFormatter from core import ObdHome from _stdio import IO from log import Logger from tool import DirectoryUtil, FileUtil, COMMAND_ENV from _errno import DOC_LINK_MSG, LockError +from _environ import ENV_DEV_MODE ROOT_IO = IO(1) @@ -49,7 +51,32 @@ FORBIDDEN_VARS = (CONST_OBD_HOME, CONST_OBD_INSTALL_PRE) OBD_HOME_PATH = os.path.join(os.environ.get(CONST_OBD_HOME, os.getenv('HOME')), '.obd') COMMAND_ENV.load(os.path.join(OBD_HOME_PATH, '.obd_environ'), ROOT_IO) -DEV_MODE = "OBD_DEV_MODE" + + +class OptionHelpFormatter(IndentedHelpFormatter): + + def format_option(self, option): + result = [] + opts = self.option_strings[option] + opt_width = self.help_position - self.current_indent - 2 + if len(opts) > opt_width: + opts = "%*s%s\n" % (self.current_indent, "", opts) + indent_first = self.help_position + else: # start help on same line as opts + opts = "%*s%-*s " % (self.current_indent, "", opt_width, opts) + indent_first = 0 + result.append(opts) + if option.help: + help_text = self.expand_default(option) + help_lines = help_text.split('\n') + if len(help_lines) == 1: + help_lines = textwrap.wrap(help_text, self.help_width) + result.append("%*s%s\n" % (indent_first, "", help_lines[0])) + result.extend(["%*s%s\n" % (self.help_position, "", line) + for line in help_lines[1:]]) + elif opts[-1] != "\n": + result.append("\n") + return "".join(result) class AllowUndefinedOptionParser(OptionParser): @@ -66,12 +93,15 @@ class AllowUndefinedOptionParser(OptionParser): add_help_option=True, prog=None, epilog=None, - allow_undefine=True): + allow_undefine=True, + undefine_warn=True + ): OptionParser.__init__( self, usage, option_list, option_class, version, conflict_handler, description, formatter, add_help_option, prog, epilog ) self.allow_undefine = allow_undefine + self.undefine_warn = undefine_warn def warn(self, msg, file=None): if self.IS_TTY: @@ -88,7 +118,7 @@ class AllowUndefinedOptionParser(OptionParser): key = e.opt_str value = value[len(key)+1:] setattr(values, key.strip('-').replace('-', '_'), value if value != '' else True) - return self.warn(e) + self.undefine_warn and self.warn(e) else: raise e @@ -101,7 +131,7 @@ class AllowUndefinedOptionParser(OptionParser): key = e.opt_str value = value[len(key)+1:] setattr(values, key.strip('-').replace('-', '_'), value if value != '' else True) - return self.warn(e) + self.undefine_warn and self.warn(e) else: raise e @@ -148,7 +178,7 @@ class BaseCommand(object): self.parser.exit(1) def _mk_usage(self): - return self.parser.format_help() + return self.parser.format_help(OptionHelpFormatter()) class ObdCommand(BaseCommand): @@ -163,7 +193,7 @@ class ObdCommand(BaseCommand): version_fobj.seek(0) version = version_fobj.read() if VERSION != version: - for part in ['plugins', 'config_parser', 'mirror/remote']: + for part in ['plugins', 'config_parser', 'optimize', 'mirror/remote']: obd_part_dir = os.path.join(self.OBD_PATH, part) if DirectoryUtil.mkdir(self.OBD_PATH): root_part_path = os.path.join(self.OBD_INSTALL_PRE, 'usr/obd/', part) @@ -177,14 +207,11 @@ class ObdCommand(BaseCommand): @property def dev_mode(self): - return COMMAND_ENV.get(DEV_MODE) == "1" + return COMMAND_ENV.get(ENV_DEV_MODE) == "1" def parse_command(self): - self.parser.allow_undefine = self.dev_mode - return super(ObdCommand, self).parse_command() - - def parse_command(self): - self.parser.allow_undefine = self.dev_mode + if self.parser.allow_undefine != True: + self.parser.allow_undefine = self.dev_mode return super(ObdCommand, self).parse_command() def do_command(self): @@ -196,11 +223,7 @@ class ObdCommand(BaseCommand): log_dir = os.path.join(self.OBD_PATH, 'log') DirectoryUtil.mkdir(log_dir) log_path = os.path.join(log_dir, 'obd') - logger = Logger('obd') - handler = handlers.TimedRotatingFileHandler(log_path, when='midnight', interval=1, backupCount=30) - handler.setFormatter(logging.Formatter("[%%(asctime)s.%%(msecs)03d] [%s] [%%(levelname)s] %%(message)s" % trace_id, "%Y-%m-%d %H:%M:%S")) - logger.addHandler(handler) - ROOT_IO.trace_logger = logger + ROOT_IO.init_trace_logger(log_path, 'obd', trace_id) obd = ObdHome(self.OBD_PATH, self.dev_mode, ROOT_IO) ROOT_IO.track_limit += 1 ROOT_IO.verbose('cmd: %s' % self.cmds) @@ -294,7 +317,7 @@ class DevModeEnableCommand(HiddenObdCommand): super(DevModeEnableCommand, self).__init__('enable', 'Enable Dev Mode') def _do_command(self, obd): - if COMMAND_ENV.set(DEV_MODE, "1", save=True, stdio=obd.stdio): + if COMMAND_ENV.set(ENV_DEV_MODE, "1", save=True, stdio=obd.stdio): obd.stdio.print("Dev Mode: ON") return True return False @@ -306,7 +329,7 @@ class DevModeDisableCommand(HiddenObdCommand): super(DevModeDisableCommand, self).__init__('disable', 'Disable Dev Mode') def _do_command(self, obd): - if COMMAND_ENV.set(DEV_MODE, "0", save=True, stdio=obd.stdio): + if COMMAND_ENV.set(ENV_DEV_MODE, "0", save=True, stdio=obd.stdio): obd.stdio.print("Dev Mode: OFF") return True return False @@ -434,6 +457,11 @@ class MirrorListCommand(ObdCommand): def __init__(self): super(MirrorListCommand, self).__init__('list', 'List mirrors.') + def init(self, cmd, args): + super(MirrorListCommand, self).init(cmd, args) + self.parser.set_usage('%s [section name] [options]\n\nExample: %s local' % (self.prev_cmd, self.prev_cmd)) + return self + def show_pkg(self, name, pkgs): ROOT_IO.print_list( pkgs, @@ -469,6 +497,7 @@ class MirrorListCommand(ObdCommand): lambda x: [x.section_name, x.mirror_type.value, x.enabled, time.strftime("%Y-%m-%d %H:%M", time.localtime(x.repo_age))], title='Mirror Repository List' ) + ROOT_IO.print("Use `obd mirror list
` for more details") return True @@ -588,6 +617,25 @@ class ClusterCheckForOCPChange(ClusterMirrorCommand): return self._show_help() +class DemoCommand(ClusterMirrorCommand): + + def __init__(self): + super(DemoCommand, self).__init__('demo', 'Quickly start') + self.parser.add_option('-c', '--components', type='string', help="List the components. Multiple components are separated with commas. [oceanbase-ce,obproxy-ce,obagent,prometheus,grafana]\nExample: \nstart oceanbase-ce: obd demo -c oceanbase-ce\n" + + "start -c oceanbase-ce V3.2.3: obd demo -c oceanbase-ce --oceanbase-ce.version=3.2.3\n" + + "start oceanbase-ce and obproxy-ce: obd demo -c oceanbase-ce,obproxy-ce", default='oceanbase-ce,obproxy-ce,obagent,prometheus,grafana') + self.parser.allow_undefine = True + self.parser.undefine_warn = False + + def _do_command(self, obd): + setattr(self.opts, 'mini', True) + setattr(self.opts, 'force', True) + setattr(self.opts, 'clean', True) + setattr(self.opts, 'force', True) + setattr(self.opts, 'force_delete', True) + return obd.demo(self.opts) + + class ClusterAutoDeployCommand(ClusterMirrorCommand): def __init__(self): @@ -931,6 +979,7 @@ class MySQLTestCommand(TestMirrorCommand): self.parser.add_option('--log-pattern', type='string', help='The pattern for collected servers log ', default='*.log') self.parser.add_option('--cluster-mode', type='string', help="The mode of mysqltest") self.parser.add_option('--disable-reboot', action='store_true', help='Never reboot during test.', default=False) + self.parser.add_option('--fast-reboot', action='store_true', help='Reboot using snapshots.', default=False) def _do_command(self, obd): if self.cmds: @@ -955,14 +1004,16 @@ class SysBenchCommand(TestMirrorCommand): self.parser.add_option('--sysbench-script-dir', type='string', help='The directory of the sysbench lua script file. [/usr/sysbench/share/sysbench]', default='/usr/sysbench/share/sysbench') self.parser.add_option('--table-size', type='int', help='Number of data initialized per table. [20000]', default=20000) self.parser.add_option('--tables', type='int', help='Number of initialization tables. [30]', default=30) - self.parser.add_option('--threads', type='int', help='Number of threads to use. [32]', default=16) + self.parser.add_option('--threads', type='int', help='Number of threads to use. [16]', default=16) self.parser.add_option('--time', type='int', help='Limit for total execution time in seconds. [60]', default=60) self.parser.add_option('--interval', type='int', help='Periodically report intermediate statistics with a specified time interval in seconds. 0 disables intermediate reports. [10]', default=10) self.parser.add_option('--events', type='int', help='Limit for total number of events.') self.parser.add_option('--rand-type', type='string', help='Random numbers distribution {uniform,gaussian,special,pareto}.') self.parser.add_option('--percentile', type='int', help='Percentile to calculate in latency statistics. Available values are 1-100. 0 means to disable percentile calculations.') - self.parser.add_option('--skip-trx', dest='{on/off}', type='string', help='Open or close a transaction in a read-only test. ') + self.parser.add_option('--skip-trx', type='string', help='Open or close a transaction in a read-only test. {on/off}') self.parser.add_option('-O', '--optimization', type='int', help='optimization level {0/1}', default=1) + self.parser.add_option('-S', '--skip-cluster-status-check', action='store_true', help='Skip cluster status check', default=False) + self.parser.add_option('--mysql-ignore-errors', type='string', help='list of errors to ignore, or "all". ', default='1062') def _do_command(self, obd): if self.cmds: @@ -993,6 +1044,7 @@ class TPCHCommand(TestMirrorCommand): self.parser.add_option('--dss-config', type='string', help='Directory for dists.dss. [/usr/tpc-h-tools/tpc-h-tools]', default='/usr/tpc-h-tools/tpc-h-tools/') self.parser.add_option('-O', '--optimization', type='int', help='Optimization level {0/1}. [1]', default=1) self.parser.add_option('--test-only', action='store_true', help='Only testing SQLs are executed. No initialization is executed.') + self.parser.add_option('-S', '--skip-cluster-status-check', action='store_true', help='Skip cluster status check', default=False) def _do_command(self, obd): if self.cmds: @@ -1001,6 +1053,41 @@ class TPCHCommand(TestMirrorCommand): return self._show_help() +class TPCDSCommand(TestMirrorCommand): + + def __init__(self): + super(TPCDSCommand, self).__init__('tpcds', 'Run a TPC-DS test for a deployment.') + self.parser.add_option('--component', type='string', help='Components for a test.') + self.parser.add_option('--test-server', type='string', help='The server for a test. By default, the first root server in the component is the test server.') + self.parser.add_option('--user', type='string', help='Username for a test.') + self.parser.add_option('--password', type='string', help='Password for a test.') + self.parser.add_option('-t', '--tenant', type='string', help='Tenant for a test. [test]', default='test') + self.parser.add_option('--mode', type='string', help='Tenant compatibility mode. {mysql,oracle} [mysql]', default='mysql') + self.parser.add_option('--database', type='string', help='Database for a test. [test]', default='test') + self.parser.add_option('--obclient-bin', type='string', help='OBClient bin path. [obclient]', default='obclient') + self.parser.add_option('--tool-dir', type='string', help='tpc-ds tool dir. [/usr/tpc-ds-tools]') + self.parser.add_option('--dsdgen-bin', type='string', help='dsdgen bin path. [$TOOL_DIR/bin/dsdgen]') + self.parser.add_option('--idx-file', type='string', help='tpcds.idx file path. [$TOOL_DIR/bin/tpcds.idx]') + self.parser.add_option('--dsqgen-bin', type='string', help='dsqgen bin path. [$TOOL_DIR/bin/dsqgen]') + self.parser.add_option('--query-templates-dir', type='string', help='Query templates dir. [$TOOL_DIR/query_templates]') + self.parser.add_option('-s', '--scale', type='int', help='Set Scale Factor (SF) to . [1] ', default=1) + self.parser.add_option('--disable-generate', '--dg', action='store_true', help='Do not generate test data.') + self.parser.add_option('-p', '--generate-parallel', help='Generate data parallel number. [0]', default=0) + self.parser.add_option('--tmp-dir', type='string', help='The temporary directory for executing TPC-H. [./tmp]', default='./tmp') + self.parser.add_option('--ddl-path', type='string', help='Directory for DDL files.') + self.parser.add_option('--sql-path', type='string', help='Directory for SQL files.') + self.parser.add_option('--create-foreign-key', '--fk', action='store_true', help='create foreign key.') + self.parser.add_option('--foreign-key-file', '--fk-file', action='store_true', help='SQL file for creating foreign key.') + self.parser.add_option('--remote-dir', type='string', help='Directory for the data file on target observers. Make sure that you have read and write access to the directory when you start observer.') + self.parser.add_option('--test-only', action='store_true', help='Only testing SQLs are executed. No initialization is executed.') + + def _do_command(self, obd): + if self.cmds: + return obd.tpcds(self.cmds[0], self.opts) + else: + return self._show_help() + + class TPCCCommand(TestMirrorCommand): def __init__(self): @@ -1024,6 +1111,7 @@ class TPCCCommand(TestMirrorCommand): self.parser.add_option('--run-mins', type='int', help='To run for specified minutes.[10]', default=10) self.parser.add_option('--test-only', action='store_true', help='Only testing SQLs are executed. No initialization is executed.') self.parser.add_option('-O', '--optimization', type='int', help='Optimization level {0/1/2}. [1] 0 - No optimization. 1 - Optimize some of the parameters which do not need to restart servers. 2 - Optimize all the parameters and maybe RESTART SERVERS for better performance.', default=1) + self.parser.add_option('-S', '--skip-cluster-status-check', action='store_true', help='Skip cluster status check', default=False) def _do_command(self, obd): if self.cmds: @@ -1040,6 +1128,7 @@ class TestMajorCommand(MajorCommand): self.register_command(SysBenchCommand()) self.register_command(TPCHCommand()) self.register_command(TPCCCommand()) + # self.register_command(TPCDSCommand()) class DbConnectCommand(HiddenObdCommand): @@ -1054,8 +1143,7 @@ class DbConnectCommand(HiddenObdCommand): self.parser.add_option('-c', '--component', type='string', help='The component used by database connection.') self.parser.add_option('-s', '--server', type='string', help='The server used by database connection. The first server in the configuration will be used by default') - self.parser.add_option('-u', '--user', type='string', help='The username used by d' - 'atabase connection. [root]', default='root') + self.parser.add_option('-u', '--user', type='string', help='The username used by database connection. [root]', default='root') self.parser.add_option('-p', '--password', type='string', help='The password used by database connection.') self.parser.add_option('-t', '--tenant', type='string', help='The tenant used by database connection. [sys]', default='sys') self.parser.add_option('-D', '--database', type='string', help='The database name used by database connection.') @@ -1068,6 +1156,30 @@ class DbConnectCommand(HiddenObdCommand): return self._show_help() +class DoobaCommand(HiddenObdCommand): + + def init(self, cmd, args): + super(DoobaCommand, self).init(cmd, args) + self.parser.set_usage('%s [options]' % self.prev_cmd) + return self + + def __init__(self): + super(DoobaCommand, self).__init__('dooba', 'A curses powerful tool for OceanBase admin, more than a monitor') + self.parser.add_option('-c', '--component', type='string', help='The component used by database connection.') + self.parser.add_option('-s', '--server', type='string', + help='The server used by database connection. The first server in the configuration will be used by default') + self.parser.add_option('-u', '--user', type='string', help='The username used by database connection. [root]', + default='root') + self.parser.add_option('-p', '--password', type='string', help='The password used by database connection.') + self.parser.add_option('--dooba-bin', type='string', help='Dooba bin path.') + + def _do_command(self, obd): + if self.cmds: + return obd.dooba(self.cmds[0], self.opts) + else: + return self._show_help() + + class CommandsCommand(HiddenObdCommand): def init(self, cmd, args): @@ -1093,6 +1205,7 @@ class ToolCommand(HiddenMajorCommand): super(ToolCommand, self).__init__('tool', 'Tools') self.register_command(DbConnectCommand()) self.register_command(CommandsCommand()) + self.register_command(DoobaCommand()) class BenchMajorCommand(MajorCommand): @@ -1121,6 +1234,7 @@ class MainCommand(MajorCommand): def __init__(self): super(MainCommand, self).__init__('obd', '') self.register_command(DevModeMajorCommand()) + self.register_command(DemoCommand()) self.register_command(MirrorMajorCommand()) self.register_command(ClusterMajorCommand()) self.register_command(RepositoryMajorCommand()) diff --git a/_deploy.py b/_deploy.py index 37f2430193e2056e961ada92c024a5b4a7a3b9c2..cb32fa37dbefaf6e88cc0a2d89f9f81a9c039c60 100644 --- a/_deploy.py +++ b/_deploy.py @@ -22,8 +22,10 @@ from __future__ import absolute_import, division, print_function import os import re +import sys import pickle import getpass +import hashlib from copy import deepcopy from enum import Enum @@ -33,12 +35,12 @@ from tool import ConfigUtil, FileUtil, YamlLoader, OrderedDict, COMMAND_ENV from _manager import Manager from _repository import Repository from _stdio import SafeStdio +from _environ import ENV_BASE_DIR yaml = YamlLoader() DEFAULT_CONFIG_PARSER_MANAGER = None ENV = 'env' -BASE_DIR_KEY = "OBD_DEPLOY_BASE_DIR" class ParserError(Exception): @@ -383,6 +385,30 @@ class ClusterConfig(object): self._depends = {} self.parser = parser self._has_package_pattern = None + self._object_hash = None + + if sys.version_info.major == 2: + def __hash__(self): + if self._object_hash is None: + m_sum = hashlib.md5() + m_sum.update(str(self.package_hash).encode('utf-8')) + m_sum.update(str(self.get_global_conf()).encode('utf-8')) + for server in self.servers: + m_sum.update(str(self.get_server_conf(server)).encode('utf-8')) + m_sum.update(str(self.depends).encode('utf-8')) + self._object_hash = int(''.join(['%03d' % ord(v) for v in m_sum.digest()])) + return self._object_hash + else: + def __hash__(self): + if self._object_hash is None: + m_sum = hashlib.md5() + m_sum.update(str(self.package_hash).encode('utf-8')) + m_sum.update(str(self.get_global_conf()).encode('utf-8')) + for server in self.servers: + m_sum.update(str(self.get_server_conf(server)).encode('utf-8')) + m_sum.update(str(self.depends).encode('utf-8')) + self._object_hash = (int(''.join(['%03d' % v for v in m_sum.digest()]))) + return self._object_hash def __eq__(self, other): if not isinstance(other, self.__class__): @@ -446,6 +472,9 @@ class ClusterConfig(object): raise Exception('Circular Dependency: %s and %s' % (self.name, name)) self._depends[name] = cluster_conf + def add_depend_component(self, depend_component_name): + return self._deploy_config.add_depend_for_component(self.name, depend_component_name, save=False) + def del_depend(self, name, component_name): if component_name in self._depends: del self._depends[component_name] @@ -468,6 +497,8 @@ class ClusterConfig(object): return False if server not in self._server_conf: return False + if self._temp_conf and key in self._temp_conf: + value = self._temp_conf[key].param_type(value).value if not self._deploy_config.update_component_server_conf(self.name, server, key, value, save): return False self._server_conf[server][key] = value @@ -478,6 +509,8 @@ class ClusterConfig(object): def update_global_conf(self, key, value, save=True): if self._deploy_config is None: return False + if self._temp_conf and key in self._temp_conf: + value = self._temp_conf[key].param_type(value).value if not self._deploy_config.update_component_global_conf(self.name, key, value, save): return False self._update_global_conf(key, value) @@ -488,7 +521,8 @@ class ClusterConfig(object): def _update_global_conf(self, key, value): self._original_global_conf[key] = value - self._global_conf[key] = value + if self._global_conf: + self._global_conf[key] = value def update_rsync_list(self, rsync_list, save=True): if self._deploy_config is None: @@ -558,6 +592,13 @@ class ClusterConfig(object): self._global_conf = None self._clear_cache_server() + def _apply_temp_conf(self, conf): + if self._temp_conf: + for key in conf: + if key in self._temp_conf: + conf[key] = self._temp_conf[key].param_type(conf[key]).value + return conf + def get_temp_conf_item(self, key): if self._temp_conf: return self._temp_conf.get(key) @@ -613,7 +654,9 @@ class ClusterConfig(object): if self._global_conf is None: self._global_conf = deepcopy(self._default_conf) self._global_conf.update(self._get_include_config('config', {})) - self._global_conf.update(self._original_global_conf) + if self._original_global_conf: + self._global_conf.update(self._original_global_conf) + self._global_conf = self._apply_temp_conf(self._global_conf) return self._global_conf def _add_base_dir(self, path): @@ -622,7 +665,7 @@ class ClusterConfig(object): path = os.path.join(self._base_dir, path) else: raise Exception("`{}` need to use absolute paths. If you want to use relative paths, please enable developer mode " - "and set environment variables {}".format(RsyncConfig.RSYNC, BASE_DIR_KEY)) + "and set environment variables {}".format(RsyncConfig.RSYNC, ENV_BASE_DIR)) return path @property @@ -717,9 +760,9 @@ class ClusterConfig(object): if server not in self._server_conf: return None if self._cache_server[server] is None: - conf = deepcopy(self._inner_config.get(server.name, {})) + conf = self._apply_temp_conf(deepcopy(self._inner_config.get(server.name, {}))) conf.update(self.get_global_conf()) - conf.update(self._server_conf[server]) + conf.update(self._apply_temp_conf(self._server_conf[server])) self._cache_server[server] = conf return self._cache_server[server] @@ -788,7 +831,7 @@ class DeployConfig(SafeStdio): self.stdio = stdio self._ignore_include_error = False if self.config_parser_manager is None: - raise ParserError('ConfigParaserManager Not Set') + raise ParserError('ConfigParserManager Not Set') self._load() @property @@ -853,32 +896,35 @@ class DeployConfig(SafeStdio): return False def _load(self): - with open(self.yaml_path, 'rb') as f: - depends = {} - self._src_data = self.yaml_loader.load(f) - for key in self._src_data: - if key == 'user': - self.set_user_conf(UserConfig( - ConfigUtil.get_value_from_dict(self._src_data[key], 'username'), - ConfigUtil.get_value_from_dict(self._src_data[key], 'password'), - ConfigUtil.get_value_from_dict(self._src_data[key], 'key_file'), - ConfigUtil.get_value_from_dict(self._src_data[key], 'port', 0, int), - ConfigUtil.get_value_from_dict(self._src_data[key], 'timeout', 0, int), - )) - elif key == 'unuse_lib_repository': - self.unuse_lib_repository = self._src_data['unuse_lib_repository'] - elif key == 'auto_create_tenant': - self.auto_create_tenant = self._src_data['auto_create_tenant'] - elif issubclass(type(self._src_data[key]), dict): - self._add_component(key, self._src_data[key]) - depends[key] = self._src_data[key].get('depends', []) - for comp in depends: - conf = self.components[comp] - for name in depends[comp]: - if name == comp: - continue - if name in self.components: - conf.add_depend(name, self.components[name]) + try: + with open(self.yaml_path, 'rb') as f: + depends = {} + self._src_data = self.yaml_loader.load(f) + for key in self._src_data: + if key == 'user': + self.set_user_conf(UserConfig( + ConfigUtil.get_value_from_dict(self._src_data[key], 'username'), + ConfigUtil.get_value_from_dict(self._src_data[key], 'password'), + ConfigUtil.get_value_from_dict(self._src_data[key], 'key_file'), + ConfigUtil.get_value_from_dict(self._src_data[key], 'port', 0, int), + ConfigUtil.get_value_from_dict(self._src_data[key], 'timeout', 0, int), + )) + elif key == 'unuse_lib_repository': + self.unuse_lib_repository = self._src_data['unuse_lib_repository'] + elif key == 'auto_create_tenant': + self.auto_create_tenant = self._src_data['auto_create_tenant'] + elif issubclass(type(self._src_data[key]), dict): + self._add_component(key, self._src_data[key]) + depends[key] = self._src_data[key].get('depends', []) + for comp in depends: + conf = self.components[comp] + for name in depends[comp]: + if name == comp: + continue + if name in self.components: + conf.add_depend(name, self.components[name]) + except: + pass if not self.user: self.set_user_conf(UserConfig()) @@ -889,7 +935,7 @@ class DeployConfig(SafeStdio): def load_include_file(self, path): if not os.path.isabs(path): raise Exception("`{}` need to use absolute path. If you want to use relative paths, please enable developer mode " - "and set environment variables {}".format('include', BASE_DIR_KEY)) + "and set environment variables {}".format('include', ENV_BASE_DIR)) if os.path.isfile(path): with open(path, 'rb') as f: return self.yaml_loader.load(f) @@ -909,7 +955,7 @@ class DeployConfig(SafeStdio): if parser: inner_config = parser.extract_inner_config(cluster_config, src_data) self.inner_config.update_component_config(component_name, inner_config) - + def _dump_inner_config(self): if self.inner_config: self._separate_config() diff --git a/_environ.py b/_environ.py new file mode 100644 index 0000000000000000000000000000000000000000..a544ac8f4f0a125d6a65acfd573f65b0c338b838 --- /dev/null +++ b/_environ.py @@ -0,0 +1,33 @@ +# coding: utf-8 +# OceanBase Deploy. +# Copyright (C) 2021 OceanBase +# +# This file is part of OceanBase Deploy. +# +# OceanBase Deploy is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# OceanBase Deploy is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with OceanBase Deploy. If not, see . + + +from __future__ import absolute_import, division, print_function + +# obd dev mode. {0/1} +ENV_DEV_MODE = "OBD_DEV_MODE" + +# base path which will be used by runtime dependencies sync and include config. {absolute path style} +ENV_BASE_DIR = "OBD_DEPLOY_BASE_DIR" + +# the installation mode of remote repository. {cp/ln} +ENV_REPO_INSTALL_MODE = "OBD_REPO_INSTALL_MODE" + +# disable rsync mode even if the rsync exists. {0/1} +ENV_DISABLE_RSYNC = "OBD_DISABLE_RSYNC" diff --git a/_errno.py b/_errno.py index c1eb871e1bfd6ba907a8dd9ee25ad719a425ec44..cf22afedb711470073ac63da04a03f5957fb9b34 100644 --- a/_errno.py +++ b/_errno.py @@ -50,7 +50,7 @@ class InitDirFailedErrorMessage(object): DOC_LINK = '' -DOC_LINK_MSG = 'See {}'.format(DOC_LINK if DOC_LINK else "https://open.oceanbase.com/docs/obd-cn/V1.4.0/10000000000436999 .") +DOC_LINK_MSG = 'See {}'.format(DOC_LINK if DOC_LINK else "https://www.oceanbase.com/product/ob-deployer/error-codes .") EC_CONFIG_CONFLICT_PORT = OBDErrorCode(1000, 'Configuration conflict {server1}:{port} port is used for {server2}\'s {key}') EC_CONFLICT_PORT = OBDErrorCode(1001, '{server}:{port} port is already used') @@ -76,4 +76,4 @@ EC_OBAGENT_RELOAD_FAILED = OBDErrorCode(4000, 'Fail to reload {server}') EC_OBAGENT_SEND_CONFIG_FAILED = OBDErrorCode(4001, 'Fail to send config file to {server}') # WARN CODE -WC_ULIMIT_CHECK = OBDErrorCode(1007, '({server}) The recommended number of {key} is {need} (Current value: %s)') \ No newline at end of file +WC_ULIMIT_CHECK = OBDErrorCode(1007, '({server}) The recommended number of {key} is {need} (Current value: {now})') \ No newline at end of file diff --git a/_optimize.py b/_optimize.py new file mode 100644 index 0000000000000000000000000000000000000000..3df8c4eb956f26957ce6488649efa4d5989214e4 --- /dev/null +++ b/_optimize.py @@ -0,0 +1,142 @@ +# coding: utf-8 +# OceanBase Deploy. +# Copyright (C) 2021 OceanBase +# +# This file is part of OceanBase Deploy. +# +# OceanBase Deploy is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# OceanBase Deploy is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with OceanBase Deploy. If not, see . + + +from __future__ import absolute_import, division, print_function + +import enum +import os + +from _manager import Manager +from _rpm import Version +from tool import YamlLoader, FileUtil, DynamicLoading + +yaml_loader = YamlLoader() + + +class OptimizeManager(Manager): + + RELATIVE_PATH = "optimize/" + + def __init__(self, home_path, loader=yaml_loader, stdio=None): + self.loader = loader + self.components = {} + self._parser = None + self._optimize_config = None + super(OptimizeManager, self).__init__(home_path, stdio=stdio) + + @property + def optimize_config(self): + if not self._parser: + raise Exception("Optimize parser not load") + return self._parser.optimize_config + + def load_config(self, path, stdio=None): + self._optimize_config = None + with FileUtil.open(path, 'rb') as f: + config = self.loader.load(f) + parser_version = config.get("optimize_version", None) + parser = self._get_parser(version=parser_version) + self._parser = parser + self._load_default_optimizers(parser, stdio=stdio) + self._optimize_config = parser.load(config) + + def _search_yaml_file(self, component, version, yaml_name, stdio=None): + component_dir = os.path.join(self.path, component) + if not os.path.exists(component_dir): + stdio.verbose("no optimize config for component {}".format(component)) + return None + yaml_file = os.path.join(component_dir, version, yaml_name) + if not os.path.exists(yaml_file): + stdio.verbose( + 'yaml file {} not found, try to get earlier version.'.format(yaml_file)) + final_version = Version('') + versions = sorted([Version(v) for v in os.listdir(component_dir)], reverse=True) + for v in versions: + yaml_file = os.path.join(component_dir, v, yaml_name) + if os.path.exists(yaml_file) and v <= version: + self.stdio.verbose('find earlier version yaml file: {}'.format(yaml_file)) + break + else: + yaml_file = os.path.join(component_dir, final_version, yaml_name) + stdio.verbose('try to use top yaml file: {}'.format(yaml_file)) + if not os.path.exists(yaml_file): + stdio.verbose('No such yaml file: {}'.format(yaml_file)) + return None + return yaml_file + + def load_default_config(self, test_name, stdio=None): + self._optimize_config = None + parser = self._get_parser() + self._load_default_optimizers(parser, stdio=stdio) + yaml_name = '{}.yaml'.format(test_name) + for component, version in self.components.items(): + config_path = self._search_yaml_file(component, version, yaml_name, stdio=stdio) + if config_path: + with FileUtil.open(config_path, 'rb', stdio=stdio) as f: + config = self.loader.load(f) + parser.load_config_by_component(component, config, stdio=stdio) + self._parser = parser + + def _load_default_optimizers(self, parser, stdio=None): + yaml_name = 'optimizer.yaml' + for component, version in self.components.items(): + optimizer_path = self._search_yaml_file(component, version, yaml_name, stdio=stdio) + if optimizer_path: + with FileUtil.open(optimizer_path, 'rb') as f: + config = self.loader.load(f) + parser.load_optimizer_by_component(component, config, stdio=stdio) + + @staticmethod + def _get_latest_version(path): + latest_version = Version('') + for name in os.listdir(path): + latest_version = max(latest_version, Version(name)) + return latest_version + + def _get_parser(self, version=None): + if self._parser: + return self._parser + module_name = 'optimize_parser' + class_name = 'OptimizeParser' + file_name = '{}.py'.format(module_name) + parser_base = os.path.join(self.path, module_name) + if version is None: + version = self._get_latest_version(parser_base) + lib_path = os.path.join(parser_base, version) + path = os.path.join(lib_path, file_name) + if os.path.isfile(path): + DynamicLoading.add_lib_path(lib_path) + self.stdio.verbose('load optimize parser: {}'.format(path)) + module = DynamicLoading.import_module(module_name, self.stdio) + try: + self._parser = getattr(module, class_name)() + return self._parser + except: + self.stdio.exception("") + return None + finally: + DynamicLoading.remove_lib_path(lib_path) + else: + self.stdio.verbose('No such optimize parser: {}'.format(path)) + return None + + def register_component(self, name, version): + self.stdio.verbose('register component {}-{} to optimize manager'.format(name, version)) + self.components[name] = Version(version) diff --git a/_plugin.py b/_plugin.py index 3d8c12266ecad59709021a0f94369523579063a4..7d714a21e556d5cb955ff0f0792eacbcb604c8da 100644 --- a/_plugin.py +++ b/_plugin.py @@ -30,7 +30,7 @@ from copy import deepcopy from _manager import Manager from _rpm import Version from ssh import ConcurrentExecutor -from tool import ConfigUtil, DynamicLoading, YamlLoader +from tool import ConfigUtil, DynamicLoading, YamlLoader, FileUtil yaml = YamlLoader() @@ -38,9 +38,11 @@ yaml = YamlLoader() class PluginType(Enum): + # 插件类型 = 插件加载类 START = 'StartPlugin' PARAM = 'ParamPlugin' INSTALL = 'InstallPlugin' + SNAP_CONFIG = 'SnapConfigPlugin' PY_SCRIPT = 'PyScriptPlugin' @@ -125,7 +127,7 @@ class PluginContext(object): self.options = options self.dev_mode = dev_mode self.stdio = stdio - self.concurrent_exector = ConcurrentExecutor(32) + self.concurrent_executor = ConcurrentExecutor(32) self._return = PluginReturn() def get_return(self): @@ -265,18 +267,28 @@ class PyScriptPlugin(ScriptPlugin): # def init(self, components, ssh_clients, cluster_config, cmd, options, stdio, *arg, **kwargs): # pass +class Null(object): + + def __init__(self): + pass + class ParamPlugin(Plugin): + class ConfigItemType(object): TYPE_STR = None + NULL = Null() def __init__(self, s): try: self._origin = s self._value = 0 + self.value = self.NULL self._format() + if self.value == self.NULL: + self.value = self._origin except: raise Exception("'%s' is not %s" % (self._origin, self._type_str)) @@ -401,10 +413,48 @@ class ParamPlugin(Plugin): else: self._value = [] + class Dict(ConfigItemType): + + def _format(self): + if self._origin: + if not isinstance(self._origin, dict): + raise Exception("Invalid Value") + self._value = self._origin + else: + self._value = self.value = {} + + class List(ConfigItemType): + + def _format(self): + if self._origin: + if not isinstance(self._origin, list): + raise Exception("Invalid value: {} is not a list.".format(self._origin)) + self._value = self._origin + else: + self._value = self.value = [] + + class StringOrKvList(ConfigItemType): + + def _format(self): + if self._origin: + if not isinstance(self._origin, list): + raise Exception("Invalid value: {} is not a list.".format(self._origin)) + for item in self._origin: + if not item: + continue + if not isinstance(item, (str, dict)): + raise Exception("Invalid value: {} should be string or key-value format.".format(item)) + if isinstance(item, dict): + if len(item.keys()) != 1: + raise Exception("Invalid value: {} should be single key-value format".format(item)) + self._value = self._origin + else: + self._value = self.value = [] + class Double(ConfigItemType): def _format(self): - self._value = float(self._origin) if self._origin else 0 + self.value = self._value = float(self._origin) if self._origin else 0 class Boolean(ConfigItemType): @@ -413,10 +463,15 @@ class ParamPlugin(Plugin): self._value = self._origin else: _origin = str(self._origin).lower() - if _origin.isdigit() or _origin in ['true', 'false']: + if _origin == 'true': + self._value = True + elif _origin == 'false': + self._value = False + elif _origin.isdigit(): self._value = bool(self._origin) else: - raise Exception('%s is not Boolean') + raise Exception('%s is not Boolean' % _origin) + self.value = self._value class Integer(ConfigItemType): @@ -426,15 +481,15 @@ class ParamPlugin(Plugin): self._origin = 0 else: _origin = str(self._origin) - if _origin.isdigit(): - self._value = int(_origin) - else: - raise Exception('%s is not Integer') + try: + self.value = self._value = int(_origin) + except: + raise Exception('%s is not Integer' % _origin) class String(ConfigItemType): def _format(self): - self._value = str(self._origin) if self._origin else '' + self.value = self._value = str(self._origin) if self._origin else '' class ConfigItem(object): @@ -519,29 +574,35 @@ class ParamPlugin(Plugin): 'MOMENT': ParamPlugin.Moment, 'TIME': ParamPlugin.Time, 'CAPACITY': ParamPlugin.Capacity, - 'STRING_LIST': ParamPlugin.StringList + 'STRING_LIST': ParamPlugin.StringList, + 'DICT': ParamPlugin.Dict, + 'LIST': ParamPlugin.List, + 'PARAM_LIST': ParamPlugin.StringOrKvList } self._src_data = {} with open(self.def_param_yaml_path, 'rb') as f: configs = yaml.load(f) for conf in configs: - param_type = ConfigUtil.get_value_from_dict(conf, 'type', 'STRING').upper() - if param_type in TYPES: - param_type = TYPES[param_type] - else: - param_type = ParamPlugin.String - - self._src_data[conf['name']] = ParamPlugin.ConfigItem( - name=conf['name'], - param_type=param_type, - default=ConfigUtil.get_value_from_dict(conf, 'default', None), - min_value=ConfigUtil.get_value_from_dict(conf, 'min_value', None), - max_value=ConfigUtil.get_value_from_dict(conf, 'max_value', None), - modify_limit=ConfigUtil.get_value_from_dict(conf, 'modify_limit', None), - require=ConfigUtil.get_value_from_dict(conf, 'require', False), - need_restart=ConfigUtil.get_value_from_dict(conf, 'need_restart', False), - need_redeploy=ConfigUtil.get_value_from_dict(conf, 'need_redeploy', False) - ) + try: + param_type = ConfigUtil.get_value_from_dict(conf, 'type', 'STRING').upper() + if param_type in TYPES: + param_type = TYPES[param_type] + else: + param_type = ParamPlugin.String + + self._src_data[conf['name']] = ParamPlugin.ConfigItem( + name=conf['name'], + param_type=param_type, + default=ConfigUtil.get_value_from_dict(conf, 'default', None), + min_value=ConfigUtil.get_value_from_dict(conf, 'min_value', None), + max_value=ConfigUtil.get_value_from_dict(conf, 'max_value', None), + modify_limit=ConfigUtil.get_value_from_dict(conf, 'modify_limit', None), + require=ConfigUtil.get_value_from_dict(conf, 'require', False), + need_restart=ConfigUtil.get_value_from_dict(conf, 'need_restart', False), + need_redeploy=ConfigUtil.get_value_from_dict(conf, 'need_redeploy', False) + ) + except: + pass except: pass return self._src_data @@ -590,6 +651,40 @@ class ParamPlugin(Plugin): return self._params_default +class SnapConfigPlugin(Plugin): + + PLUGIN_TYPE = PluginType.SNAP_CONFIG + CONFIG_YAML = 'snap_config.yaml' + FLAG_FILE = CONFIG_YAML + _KEYCRE = re.compile(r"\$(\w+)") + + def __init__(self, component_name, plugin_path, version, dev_mode): + super(SnapConfigPlugin, self).__init__(component_name, plugin_path, version, dev_mode) + self.config_path = os.path.join(self.plugin_path, self.CONFIG_YAML) + self._config = None + self._file_hash = None + + def __hash__(self): + if self._file_hash is None: + self._file_hash = int(''.join(['%03d' % (ord(v) if isinstance(v, str) else v) for v in FileUtil.checksum(self.config_path)])) + return self._file_hash + + @property + def config(self): + if self._config is None: + with open(self.config_path, 'rb') as f: + self._config = yaml.load(f) + return self._config + + @property + def backup(self): + return self.config.get('backup', []) + + @property + def clean(self): + return self.config.get('clean', []) + + class InstallPlugin(Plugin): class FileItemType(Enum): @@ -621,6 +716,7 @@ class InstallPlugin(Plugin): self.file_map_path = os.path.join(self.plugin_path, self.FILES_MAP_YAML) self._file_map = {} self._file_map_data = None + self._check_value = None @classmethod def var_replace(cls, string, var): @@ -644,6 +740,12 @@ class InstallPlugin(Plugin): return ''.join(done) + @property + def check_value(self): + if self._check_value is None: + self._check_value = os.path.getmtime(self.file_map_path) + return self._check_value + @property def file_map_data(self): if self._file_map_data is None: diff --git a/_repository.py b/_repository.py index 9088c34ed003d07f0988b514d5f6c4a6b1cef5f0..600471757ee4dc7f1492ceda00b3af374beda1c6 100644 --- a/_repository.py +++ b/_repository.py @@ -25,13 +25,14 @@ import sys import time import hashlib from glob import glob +from multiprocessing import cpu_count +from multiprocessing.pool import Pool from _rpm import Package, PackageInfo, Version from _arch import getBaseArch from tool import DirectoryUtil, FileUtil, YamlLoader from _manager import Manager from _plugin import InstallPlugin -from ssh import LocalClient class LocalPackage(Package): @@ -122,15 +123,7 @@ class LocalPackage(Package): filelinktos.append(os.readlink(target_path)) filemodes.append(-24065) else: - ret = LocalClient().execute_command('md5sum {}'.format(target_path)) - if ret: - m_value = ret.stdout.strip().split(' ')[0].encode('utf-8') - else: - m = hashlib.md5() - with open(target_path, 'rb') as f: - m.update(f.read()) - m_value = m.hexdigest().encode(sys.getdefaultencoding()) - # raise Exception('Failed to get md5sum for {}, error: {}'.format(target_path, ret.stderr)) + m_value = FileUtil.checksum(target_path) m_sum.update(m_value) filemd5s.append(m_value) filelinktos.append('') @@ -149,6 +142,73 @@ class LocalPackage(Package): return self.RpmObject(self.headers, self.files) +class ExtractFileInfo(object): + + def __init__(self, src_path, target_path, mode): + self.src_path = src_path + self.target_path = target_path + self.mode = mode + + +class ParallerExtractWorker(object): + + def __init__(self, pkg, files, stdio=None): + self.pkg = pkg + self.files = files + self.stdio = stdio + + @staticmethod + def extract(worker): + with worker.pkg.open() as rpm: + for info in worker.files: + if os.path.exists(info.target_path): + continue + fd = rpm.extractfile(info.src_path) + with FileUtil.open(info.target_path, 'wb', stdio=worker.stdio) as f: + FileUtil.copy_fileobj(fd, f) + if info.mode != 0o744: + os.chmod(info.target_path, info.mode) + + +class ParallerExtractor(object): + + MAX_PARALLER = cpu_count() if cpu_count() else 8 + + def __init__(self, pkg, files, stdio=None): + self.pkg = pkg + self.files = files + self.stdio = stdio + + def extract(self): + workers = [] + file_num = len(self.files) + paraler = int(min(self.MAX_PARALLER, file_num)) + size = min(100, int(file_num / paraler)) + size = int(max(10, size)) + index = 0 + while index < file_num: + p_index = index + size + workers.append(ParallerExtractWorker( + self.pkg, + self.files[index:p_index], + stdio=self.stdio + )) + index = p_index + + pool = Pool(processes=paraler) + try: + results = pool.map(ParallerExtractWorker.extract, workers) + for r in results: + if not r: + return False + except KeyboardInterrupt: + if pool: + pool.close() + pool = None + finally: + pool and pool.close() + + class Repository(PackageInfo): _DATA_FILE = '.data' @@ -251,7 +311,7 @@ class Repository(PackageInfo): self.stdio and getattr(self.stdio, 'print', '%s is a shadow repository' % self) return False hash_path = os.path.join(self.repository_dir, '.hash') - if self.hash == pkg.md5 and self.file_check(plugin): + if self.hash == pkg.md5 and self.file_check(plugin) and self.install_time > plugin.check_value: return True self.clear() try: @@ -291,6 +351,8 @@ class Repository(PackageInfo): if path.startswith(n_dir): need_files[path] = os.path.join(need_dirs[n_dir], path[len(n_dir):]) break + + need_extract_files = [] for src_path in need_files: if src_path not in files: raise Exception('%s not found in packge' % src_path) @@ -299,17 +361,17 @@ class Repository(PackageInfo): return idx = files[src_path] if filemd5s[idx]: - fd = rpm.extractfile(src_path) - self.stdio and getattr(self.stdio, 'verbose', print)('extract %s to %s' % (src_path, target_path)) - with FileUtil.open(target_path, 'wb', stdio=self.stdio) as f: - FileUtil.copy_fileobj(fd, f) - mode = filemodes[idx] & 0x1ff - if mode != 0o744: - os.chmod(target_path, mode) + need_extract_files.append(ExtractFileInfo( + src_path, + target_path, + filemodes[idx] & 0x1ff + )) elif filelinktos[idx]: links[target_path] = filelinktos[idx] else: raise Exception('%s is directory' % src_path) + + ParallerExtractor(pkg, need_extract_files, stdio=self.stdio).extract() for link in links: self.stdio and getattr(self.stdio, 'verbose', print)('link %s to %s' % (links[link], link)) diff --git a/_stdio.py b/_stdio.py index 949607d24d2515c05e2e8994539d81c8efacf5c7..67ac4c3408a4e7ba7937a61175bf53afbffe90b9 100644 --- a/_stdio.py +++ b/_stdio.py @@ -23,9 +23,13 @@ from __future__ import absolute_import, division, print_function import os import signal import sys +import fcntl import traceback import inspect2 import six +import logging +from copy import deepcopy +from logging import handlers from enum import Enum from halo import Halo, cursor @@ -35,6 +39,8 @@ from progressbar import AdaptiveETA, Bar, SimpleProgress, ETA, FileTransferSpeed from types import MethodType from inspect2 import Parameter +from log import Logger + if sys.version_info.major == 3: raw_input = input @@ -55,6 +61,87 @@ class BufferIO(object): return s +class SysStdin(object): + + NONBLOCK = False + STATS = None + FD = None + + @classmethod + def fileno(cls): + if cls.FD is None: + cls.FD = sys.stdin.fileno() + return cls.FD + + @classmethod + def stats(cls): + if cls.STATS is None: + cls.STATS = fcntl.fcntl(cls.fileno(), fcntl.F_GETFL) + return cls.STATS + + @classmethod + def nonblock(cls): + if cls.NONBLOCK is False: + fcntl.fcntl(cls.fileno(), fcntl.F_SETFL, cls.stats() | os.O_NONBLOCK) + cls.NONBLOCK = True + + @classmethod + def block(cls): + if cls.NONBLOCK: + fcntl.fcntl(cls.fileno(), fcntl.F_SETFL, cls.stats()) + cls.NONBLOCK = True + + @classmethod + def readline(cls, blocked=False): + if blocked: + cls.block() + else: + cls.nonblock() + return cls._readline() + + @classmethod + def read(cls, blocked=False): + return ''.join(cls.readlines(blocked=blocked)) + + @classmethod + def readlines(cls, blocked=False): + if blocked: + cls.block() + else: + cls.nonblock() + return cls._readlines() + + @classmethod + def _readline(cls): + if cls.NONBLOCK: + try: + for line in sys.stdin: + return line + except IOError: + return '' + finally: + cls.block() + else: + return sys.stdin.readline() + + @classmethod + def _readlines(cls): + if cls.NONBLOCK: + lines = [] + try: + for line in sys.stdin: + lines.append(line) + except IOError: + pass + finally: + cls.block() + return lines + else: + return sys.stdin.readlines() + + + + class FormtatText(object): @staticmethod @@ -234,11 +321,11 @@ class IO(object): WARNING_PREV = FormtatText.warning('[WARN]') ERROR_PREV = FormtatText.error('[ERROR]') IS_TTY = sys.stdin.isatty() + INPUT = SysStdin def __init__(self, level, msg_lv=MsgLevel.DEBUG, - trace_logger=None, use_cache=False, track_limit=0, root_io=None, @@ -246,7 +333,11 @@ class IO(object): ): self.level = level self.msg_lv = msg_lv - self.trace_logger = trace_logger + self.log_path = None + self.trace_id = None + self.log_name = 'default' + self.log_path = None + self._trace_logger = None self._log_cache = [] if use_cache else None self._root_io = root_io self.track_limit = track_limit @@ -257,6 +348,34 @@ class IO(object): self._cur_out_obj = self._out_obj self._before_critical = None + def init_trace_logger(self, log_path, log_name=None, trace_id=None): + if self._trace_logger is None: + self.log_path = log_path + if trace_id: + self.trace_id = trace_id + if log_name: + self.log_name = log_name + + def __getstate__(self): + state = {} + for key in self.__dict__: + state[key] = self.__dict__[key] + for key in ['_trace_logger', 'sync_obj', '_out_obj', '_cur_out_obj', '_before_critical']: + state[key] = None + return state + + @property + def trace_logger(self): + if self.log_path and self._trace_logger is None: + self._trace_logger = Logger(self.log_name) + handler = handlers.TimedRotatingFileHandler(self.log_path, when='midnight', interval=1, backupCount=30) + if self.trace_id: + handler.setFormatter(logging.Formatter("[%%(asctime)s.%%(msecs)03d] [%s] [%%(levelname)s] %%(message)s" % self.trace_id, "%Y-%m-%d %H:%M:%S")) + else: + handler.setFormatter(logging.Formatter("[%%(asctime)s.%%(msecs)03d] [%%(levelname)s] %%(message)s", "%Y-%m-%d %H:%M:%S")) + self._trace_logger.addHandler(handler) + return self._trace_logger + @property def log_cache(self): if self._root_io: @@ -417,13 +536,17 @@ class IO(object): msg_lv = self.msg_lv key = "%s-%s" % (pid, msg_lv) if key not in self.sub_ios: - self.sub_ios[key] = self.__class__( + sub_io = self.__class__( self.level + 1, msg_lv=msg_lv, - trace_logger=self.trace_logger, track_limit=self.track_limit, root_io=self._root_io if self._root_io else self ) + sub_io.log_name = self.log_name + sub_io.log_path = self.log_path + sub_io.trace_id = self.trace_id + sub_io._trace_logger = self.trace_logger + self.sub_ios[key] = sub_io return self.sub_ios[key] def print_list(self, ary, field_names=None, exp=lambda x: x if isinstance(x, (list, tuple)) else [x], show_index=False, start=0, **kwargs): @@ -445,11 +568,18 @@ class IO(object): table.add_row(row) self.print(table) + def read(self, msg='', blocked=False): + if msg: + self._print(MsgLevel.INFO, msg) + return self.INPUT.read(blocked) + def confirm(self, msg): + msg = '%s [y/n]: ' % msg + self.print(msg, end='') if self.IS_TTY: while True: try: - ans = raw_input('%s [y/n]: ' % msg) + ans = raw_input() if ans == 'y': return True if ans == 'n': @@ -595,6 +725,8 @@ class StdIO(object): self._warn_func = getattr(self.io, "warn", print) def __getattr__(self, item): + if item.startswith('__'): + return super(StdIO, self).__getattribute__(item) if self.io is None: return FAKE_RETURN if item not in self._attrs: diff --git a/core.py b/core.py index 0c220697ed2e95758518ade2db780dd1813caf4a..d25f9a354e49b30cc5e2b6bde7febc61dc7bb0a1 100644 --- a/core.py +++ b/core.py @@ -22,33 +22,25 @@ from __future__ import absolute_import, division, print_function import re import os -import sys import time -import fcntl from optparse import Values +from copy import deepcopy import tempfile from subprocess import call as subprocess_call -from prettytable import PrettyTable -from halo import Halo from ssh import SshClient, SshConfig -from tool import ConfigUtil, FileUtil, DirectoryUtil, YamlLoader, timeout, COMMAND_ENV +from tool import ConfigUtil, FileUtil, DirectoryUtil, YamlLoader, timeout, COMMAND_ENV, OrderedDict from _stdio import MsgLevel from _rpm import Version from _mirror import MirrorRepositoryManager, PackageInfo from _plugin import PluginManager, PluginType, InstallPlugin -from _repository import RepositoryManager, LocalPackage -from _deploy import DeployManager, DeployStatus, DeployConfig, DeployConfigStatus, BASE_DIR_KEY, InnerConfigKeywords -from _lock import LockManager +from _deploy import DeployManager, DeployStatus, DeployConfig, DeployConfigStatus, Deploy from _repository import RepositoryManager, LocalPackage, Repository -from _deploy import ( - DeployManager, DeployStatus, - DeployConfig, DeployConfigStatus, - ParserError, Deploy -) from _errno import EC_SOME_SERVER_STOPED from _lock import LockManager +from _optimize import OptimizeManager +from _environ import ENV_REPO_INSTALL_MODE, ENV_BASE_DIR class ObdHome(object): @@ -65,8 +57,10 @@ class ObdHome(object): self._deploy_manager = None self._plugin_manager = None self._lock_manager = None + self._optimize_manager = None self.stdio = None self._stdio_func = None + self.ssh_clients = {} self.set_stdio(stdio) self.lock_manager.global_sh_lock() @@ -100,6 +94,12 @@ class ObdHome(object): self._lock_manager = LockManager(self.home_path, self.stdio) return self._lock_manager + @property + def optimize_manager(self): + if not self._optimize_manager: + self._optimize_manager = OptimizeManager(self.home_path, stdio=self.stdio) + return self._optimize_manager + def _obd_update_lock(self): self.lock_manager.global_ex_lock() @@ -112,7 +112,7 @@ class ObdHome(object): self._stdio_func = {} if not self.stdio: return - for func in ['start_loading', 'stop_loading', 'print', 'confirm', 'verbose', 'warn', 'exception', 'error', 'critical', 'print_list']: + for func in ['start_loading', 'stop_loading', 'print', 'confirm', 'verbose', 'warn', 'exception', 'error', 'critical', 'print_list', 'read']: self._stdio_func[func] = getattr(self.stdio, func, _print) def _call_stdio(self, func, msg, *arg, **kwarg): @@ -140,16 +140,23 @@ class ObdHome(object): return errors def get_clients(self, deploy_config, repositories): - ssh_clients = {} - self._call_stdio('start_loading', 'Open ssh connection') + servers = set() + user_config = deploy_config.user + if user_config not in self.ssh_clients: + self.ssh_clients[user_config] = {} + ssh_clients = self.ssh_clients[user_config] + for repository in repositories: cluster_config = deploy_config.components[repository.name] - # ssh check - self.ssh_clients_connect(ssh_clients, cluster_config.servers, deploy_config.user) - self._call_stdio('stop_loading', 'succeed') + for server in cluster_config.servers: + if server not in ssh_clients: + servers.add(server) + if servers: + self.ssh_clients_connect(servers, ssh_clients, user_config) return ssh_clients - def ssh_clients_connect(self, ssh_clients, servers, user_config): + def ssh_clients_connect(self, servers, ssh_clients, user_config): + self._call_stdio('start_loading', 'Open ssh connection') for server in servers: if server not in ssh_clients: ssh_clients[server] = SshClient( @@ -164,6 +171,8 @@ class ObdHome(object): self.stdio ) ssh_clients[server].connect() + self._call_stdio('stop_loading', 'succeed') + return ssh_clients def search_plugin(self, repository, plugin_type, no_found_exit=True): self._call_stdio('verbose', 'Search %s plugin for %s' % (plugin_type.name.lower(), repository.name)) @@ -273,9 +282,13 @@ class ObdHome(object): not self._call_stdio('confirm', 'Found a higher version\n%s\nDo you want to use it?' % pkg) ) or update_if_need is False ): - repositories.append(repository) - self._call_stdio('verbose', 'Use repository %s' % repository) - self._call_stdio('print', '%s-%s already installed.' % (repository.name, repository.version)) + if pkg and repository.release == pkg.release: + pkgs.append(pkg) + self._call_stdio('verbose', '%s as same as %s, Use package %s' % (pkg, repository, pkg)) + else: + repositories.append(repository) + self._call_stdio('verbose', 'Use repository %s' % repository) + self._call_stdio('print', '%s-%s already installed.' % (repository.name, repository.version)) continue if config.version and pkg.version != config.version: self._call_stdio('warn', 'No such package %s-%s-%s. Use similar package %s-%s-%s.' % (component, config.version, config.release, pkg.name, pkg.version, pkg.release)) @@ -357,12 +370,22 @@ class ObdHome(object): if deploy_config.components[component_name].servers != deploy.deploy_config.components[component_name].servers: return True return False + if not self.stdio: + raise IOError("IO Not Found") + self._call_stdio('verbose', 'Get Deploy by name') deploy = self.deploy_manager.get_deploy_config(name) param_plugins = {} repositories, pkgs = [], [] is_deployed = deploy and deploy.deploy_info.status not in [DeployStatus.STATUS_CONFIGURED, DeployStatus.STATUS_DESTROYED] is_started = deploy and deploy.deploy_info.status in [DeployStatus.STATUS_RUNNING, DeployStatus.STATUS_STOPPED] + user_input = self._call_stdio('read', '') + if not user_input and not self.stdio.IS_TTY: + time.sleep(0.1) + user_input = self._call_stdio('read', '') + if not user_input: + self._call_stdio('error', 'Input is empty') + return False initial_config = '' if deploy: try: @@ -371,17 +394,23 @@ class ObdHome(object): path = deploy.deploy_config.yaml_path else: path = Deploy.get_temp_deploy_yaml_path(deploy.config_dir) - self._call_stdio('verbose', 'Load %s' % path) - with open(path, 'r') as f: - initial_config = f.read() + if user_input: + initial_config = user_input + else: + self._call_stdio('verbose', 'Load %s' % path) + with open(path, 'r') as f: + initial_config = f.read() except: self._call_stdio('exception', '') msg = 'Save deploy "%s" configuration' % name else: - if not self.stdio: - return False - if not self._call_stdio('confirm', 'No such deploy: %s. Create?' % name): - return False + if user_input: + initial_config = user_input + else: + if not self.stdio: + return False + if not initial_config and not self._call_stdio('confirm', 'No such deploy: %s. Create?' % name): + return False msg = 'Create deploy "%s" configuration' % name if is_deployed: repositories = self.load_local_repositories(deploy.deploy_info) @@ -405,10 +434,11 @@ class ObdHome(object): self.lock_manager.set_try_times(-1) config_status = DeployConfigStatus.UNCHNAGE while True: - tf.seek(0) - self._call_stdio('verbose', '%s %s' % (EDITOR, tf.name)) - subprocess_call([EDITOR, tf.name]) - self._call_stdio('verbose', 'Load %s' % tf.name) + if not user_input: + tf.seek(0) + self._call_stdio('verbose', '%s %s' % (EDITOR, tf.name)) + subprocess_call([EDITOR, tf.name]) + self._call_stdio('verbose', 'Load %s' % tf.name) try: deploy_config = DeployConfig( tf.name, yaml_loader=YamlLoader(self.stdio), @@ -416,8 +446,10 @@ class ObdHome(object): inner_config=deploy.deploy_config.inner_config if deploy else None ) deploy_config.allow_include_error() + if not deploy_config.get_base_dir(): + deploy_config.set_base_dir('/', save=False) except Exception as e: - if confirm(e): + if not user_input and confirm(e): continue break @@ -432,6 +464,8 @@ class ObdHome(object): elif is_deployed: if deploy_config.components.keys() != deploy.deploy_config.components.keys() or is_server_list_change(deploy_config): if not self._call_stdio('confirm', 'Modifications to the deployment architecture take effect after you redeploy the architecture. Are you sure that you want to start a redeployment? '): + if user_input: + return False continue config_status = DeployConfigStatus.NEED_REDEPLOY @@ -449,6 +483,8 @@ class ObdHome(object): break if comp_attr_changed: if not self._call_stdio('confirm', 'Modifications to the version, release or hash of the component take effect after you redeploy the cluster. Are you sure that you want to start a redeployment? '): + if user_input: + return False continue config_status = DeployConfigStatus.NEED_REDEPLOY @@ -462,6 +498,8 @@ class ObdHome(object): break if rsync_conf_changed: if not self._call_stdio('confirm', 'Modifications to the rsync config of a deployed cluster take effect after you redeploy the cluster. Are you sure that you want to start a redeployment? '): + if user_input: + return False continue config_status = DeployConfigStatus.NEED_REDEPLOY @@ -522,6 +560,8 @@ class ObdHome(object): errors.append('[%s] %s: %s' % (component_name, server, str(e))) if errors: self._call_stdio('print', '\n'.join(errors)) + if user_input: + return False if self._call_stdio('confirm', 'Modifications take effect after a redeployment. Are you sure that you want to start a redeployment?'): config_status = DeployConfigStatus.NEED_REDEPLOY elif self._call_stdio('confirm', 'Continue to edit?'): @@ -561,7 +601,7 @@ class ObdHome(object): ret = True if deploy: if deploy.deploy_info.status == DeployStatus.STATUS_RUNNING or ( - config_status == DeployConfigStatus.NEED_REDEPLOY and is_deployed + config_status == DeployConfigStatus.NEED_REDEPLOY and is_deployed ): msg += deploy.effect_tip() except Exception as e: @@ -871,12 +911,14 @@ class ObdHome(object): # Check whether the components have the parameter plugins and apply the plugins self.search_param_plugin_and_apply(repositories, deploy_config) - # Parameter check - errors = self.deploy_param_check(repositories, deploy_config) - if errors: - self._call_stdio('stop_loading', 'fail') - self._call_stdio('error', '\n'.join(errors)) - return False + if not getattr(opt, 'skip_param_check', False): + # Parameter check + errors = self.deploy_param_check(repositories, deploy_config) + if errors: + self._call_stdio('stop_loading', 'fail') + self._call_stdio('error', '\n'.join(errors)) + return False + self._call_stdio('stop_loading', 'succeed') # Get the client @@ -885,11 +927,12 @@ class ObdHome(object): gen_config_plugins = self.search_py_script_plugin(repositories, 'generate_config') component_num = len(repositories) + auto_depend = getattr(opt, 'auto_depend', False) for repository in repositories: cluster_config = deploy_config.components[repository.name] self._call_stdio('verbose', 'Call %s for %s' % (gen_config_plugins[repository], repository)) - ret = gen_config_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, [], opt, self.stdio, deploy_config) + ret = gen_config_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, [], opt, self.stdio, deploy_config, auto_depend=auto_depend) if ret: component_num -= 1 @@ -984,6 +1027,26 @@ class ObdHome(object): return component_num == 0 + def sort_repository_by_depend(self, repositories, deploy_config): + sorted_repositories = [] + sorted_componets = {} + while repositories: + temp_repositories = [] + for repository in repositories: + cluster_config = deploy_config.components.get(repository.name) + for componet_name in cluster_config.depends: + if componet_name not in sorted_componets: + temp_repositories.append(repository) + break + else: + sorted_componets[repository.name] = 1 + sorted_repositories.append(repository) + if len(temp_repositories) == len(repositories): + sorted_repositories += temp_repositories + break + repositories = temp_repositories + return sorted_repositories + def change_deploy_config_style(self, name, options=Values()): self._call_stdio('verbose', 'Get Deploy by name') deploy = self.deploy_manager.get_deploy_config(name) @@ -1016,6 +1079,20 @@ class ObdHome(object): else: components = deploy_config.components.keys() + self._call_stdio('start_loading', 'Load param plugin') + + # Get the repository + if deploy_info.status not in [DeployStatus.STATUS_CONFIGURED, DeployStatus.STATUS_DESTROYED]: + repositories = self.load_local_repositories(deploy_info) + else: + repositories = [] + for component_name in components: + repositories.append(self.repository_manager.get_repository_allow_shadow(component_name, '100000.0')) + + # Check whether the components have the parameter plugins and apply the plugins + self.search_param_plugin_and_apply(repositories, deploy_config) + self._call_stdio('stop_loading', 'succeed') + self._call_stdio('start_loading', 'Change style') try: parsers = {} @@ -1035,6 +1112,72 @@ class ObdHome(object): self._call_stdio('stop_loading', 'fail') return False + def demo(self, opt=Values()): + name = 'demo' + self._call_stdio('verbose', 'Get Deploy by name') + deploy = self.deploy_manager.get_deploy_config(name) + if deploy: + self._call_stdio('verbose', 'Get deploy info') + deploy_info = deploy.deploy_info + self._call_stdio('verbose', 'judge deploy status') + if deploy_info.status == DeployStatus.STATUS_DEPLOYED: + if not self.destroy_cluster(name): + return False + elif deploy_info.status not in [DeployStatus.STATUS_CONFIGURED, DeployStatus.STATUS_DESTROYED]: + self._call_stdio('error', 'Deploy "%s" is %s. You could not deploy an %s cluster.' % (name, deploy_info.status.value, deploy_info.status.value)) + return False + + components = set() + for component_name in getattr(opt, 'components', '').split(','): + if component_name: + components.add(component_name) + if not components: + self._call_stdio('error', 'Use `-c/--components` to set in the components to be deployed') + return + global_key = 'global' + home_path_key = 'home_path' + global_config = {home_path_key: os.getenv('HOME')} + opt_config = {} + for key in opt.__dict__: + tmp = key.split('.', 1) + if len(tmp) == 1: + if key == home_path_key: + global_config[key] = opt.__dict__[key] + else: + component_name = tmp[0] + if component_name not in components: + component_name = component_name.replace('_', '-') + if component_name not in opt_config: + opt_config[component_name] = {global_key: {}} + if tmp[1] in ['version', 'tag', 'package_hash', 'release']: + _config = opt_config[component_name] + else: + _config = opt_config[component_name][global_key] + _config[tmp[1]] = opt.__dict__[key] + + configs = OrderedDict() + for component_name in components: + configs[component_name] = { + 'servers': ['127.0.0.1'], + global_key: deepcopy(global_config) + } + configs[component_name][global_key][home_path_key] = os.path.join(configs[component_name][global_key][home_path_key], component_name) + if component_name in opt_config: + configs[component_name][global_key].update(opt_config[component_name][global_key]) + del opt_config[component_name][global_key] + configs[component_name].update(opt_config[component_name]) + + with tempfile.NamedTemporaryFile(suffix=".yaml", mode='w') as tf: + yaml_loader = YamlLoader(self.stdio) + yaml_loader.dump(configs, tf) + setattr(opt, 'config', tf.name) + setattr(opt, 'skip_param_check', True) + setattr(opt, 'auto_depend', True) + if not self.genconfig(name, opt): + return False + setattr(opt, 'config', '') + return self.deploy_cluster(name, opt) and self.start_cluster(name, [], opt) + def deploy_cluster(self, name, opt=Values()): self._call_stdio('verbose', 'Get Deploy by name') deploy = self.deploy_manager.get_deploy_config(name) @@ -1080,13 +1223,21 @@ class ObdHome(object): self._call_stdio('error', '%s\'s servers list is empty.' % component_name) return False - if self.dev_mode: - base_dir = COMMAND_ENV.get(BASE_DIR_KEY, '') + install_mode = COMMAND_ENV.get(ENV_REPO_INSTALL_MODE) + if not install_mode: + install_mode = 'cp' if self.dev_mode else 'ln' + + if install_mode == 'cp': deploy_config.enable_cp_install_mode(save=False) - else: - base_dir = '' + elif install_mode == 'ln': deploy_config.enable_ln_install_mode(save=False) - deploy_config.set_base_dir(base_dir, save=False) + else: + self._call_stdio('error', 'Invalid repository install mode: {}'.format(install_mode)) + return False + + if self.dev_mode: + base_dir = COMMAND_ENV.get(ENV_BASE_DIR, '') + deploy_config.set_base_dir(base_dir, save=False) # Check the best suitable mirror for the components and installation plugins. Install locally repositories, install_plugins = self.search_components_from_mirrors_and_install(deploy_config) @@ -1341,9 +1492,13 @@ class ObdHome(object): self._call_stdio('print', 'Deploy "%s" is running' % name) return True + repositories = self.sort_repository_by_depend(repositories, deploy_config) + strict_check = getattr(options, 'strict_check', False) success = True + repository_dir_map = {} for repository in repositories: + repository_dir_map[repository.name] = repository.repository_dir if repository.name not in components: continue if repository not in start_check_plugins: @@ -1359,6 +1514,8 @@ class ObdHome(object): return False component_num = len(components) + display_repositories = [] + connect_ret = {} for repository in repositories: if repository.name not in components: continue @@ -1373,7 +1530,7 @@ class ObdHome(object): update_deploy_status = update_deploy_status and start_all self._call_stdio('verbose', 'Call %s for %s' % (start_plugins[repository], repository)) - ret = start_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, cmd, options, self.stdio, self.home_path, repository.repository_dir) + ret = start_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, cmd, options, self.stdio, self.home_path, repository.repository_dir, repository_dir_map=repository_dir_map, deploy_name=deploy.name) if ret: need_bootstrap = ret.get_return('need_bootstrap') else: @@ -1385,15 +1542,18 @@ class ObdHome(object): if ret: db = ret.get_return('connect') cursor = ret.get_return('cursor') + connect_ret[repository] = ret.kwargs else: break if need_bootstrap and start_all: - self._call_stdio('print', 'Initialize cluster') + self._call_stdio('start_loading', 'Initialize cluster') self._call_stdio('verbose', 'Call %s for %s' % (bootstrap_plugins[repository], repository)) if not bootstrap_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, cmd, options, self.stdio, cursor): + self._call_stdio('stop_loading', 'fail') self._call_stdio('error', 'Cluster init failed') break + self._call_stdio('stop_loading', 'succeed') if repository in create_tenant_plugins: create_tenant_options = Values({"variables": "ob_tcp_invited_nodes='%'"}) self._call_stdio('verbose', 'Call %s for %s' % (bootstrap_plugins[repository], repository)) @@ -1402,9 +1562,12 @@ class ObdHome(object): if not start_all: component_num -= 1 continue - + display_repositories.append(repository) + + for repository in display_repositories: + cluster_config = deploy_config.components[repository.name] self._call_stdio('verbose', 'Call %s for %s' % (display_plugins[repository], repository)) - if display_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, cmd, options, self.stdio, cursor): + if display_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, cmd, options, self.stdio, **connect_ret[repository]): component_num -= 1 if component_num == 0: @@ -1597,7 +1760,7 @@ class ObdHome(object): self._call_stdio('verbose', 'Call %s for %s' % (reload_plugins[repository], repository)) if not reload_plugins[repository]( deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, - cursor=cursor, new_cluster_config=new_cluster_config, repository_dir=repository.repository_dir): + cursor=cursor, new_cluster_config=new_cluster_config, repository_dir=repository.repository_dir, deploy_name=deploy.name): continue component_num -= 1 if component_num == 0: @@ -1627,6 +1790,7 @@ class ObdHome(object): self._call_stdio('start_loading', 'Get local repositories and plugins') # Get the repository repositories = self.load_local_repositories(deploy_info) + repositories = self.sort_repository_by_depend(repositories, deploy_config) # Check whether the components have the parameter plugins and apply the plugins self.search_param_plugin_and_apply(repositories, deploy_config) @@ -1856,6 +2020,9 @@ class ObdHome(object): cluster_configs = {} component_num = len(components) repositories = self.sort_repositories_by_depends(deploy_config, repositories) + repository_dir_map = {} + for repository in repositories: + repository_dir_map[repository.name] = repository.repository_dir for repository in repositories: if repository.name not in components: continue @@ -1883,7 +2050,9 @@ class ObdHome(object): display_plugin=display_plugins[repository], repository=repository, new_cluster_config=new_cluster_config, - new_clients=new_ssh_clients + new_clients=new_ssh_clients, + repository_dir_map=repository_dir_map, + deploy_name=deploy.name, ): component_num -= 1 done_repositories.append(repository) @@ -1926,6 +2095,8 @@ class ObdHome(object): new_clients=new_ssh_clients, rollback=True, bootstrap_plugin=bootstrap_plugins[repository], + repository_dir_map=repository_dir_map, + deploy_name=deploy.name ): deploy_config.update_component(cluster_config) @@ -2146,7 +2317,7 @@ class ObdHome(object): if need_restart and deploy_info.status == DeployStatus.STATUS_RUNNING: self._call_stdio('verbose', 'Call %s for %s' % (start_plugins[current_repository], repository)) setattr(options, 'without_parameter', True) - if not start_plugins[current_repository](deploy_config.components.keys(), ssh_clients, cluster_config, [], options, self.stdio, self.home_path, repository.repository_dir) and getattr(options, 'force', False) is False: + if not start_plugins[current_repository](deploy_config.components.keys(), ssh_clients, cluster_config, [], options, self.stdio, self.home_path, repository.repository_dir, deploy_name=deploy.name) and getattr(options, 'force', False) is False: self.install_repositories_to_servers(deploy_config, [current_repository, ], install_plugins, ssh_clients, options) return False @@ -2467,6 +2638,171 @@ class ObdHome(object): self._call_stdio('error', 'Repository(%s) existed' % tag_repository.repository_dir) return True + def _test_optimize_init(self, opts, test_name, deploy_config, cluster_config): + optimize_config_path = getattr(opts, 'optimize_config', None) + if optimize_config_path: + self._call_stdio('verbose', 'load optimize config {}'.format(optimize_config_path)) + self.optimize_manager.load_config(optimize_config_path, stdio=self.stdio) + else: + for component, cluster_config in deploy_config.components.items(): + self.optimize_manager.register_component(component, cluster_config.version) + self._call_stdio('verbose', 'load default optimize config for {}'.format(test_name)) + self.optimize_manager.load_default_config(test_name=test_name, stdio=self.stdio) + self._call_stdio('verbose', 'Get optimize config') + optimize_config = self.optimize_manager.optimize_config + check_options_plugin = self.plugin_manager.get_best_py_script_plugin('check_options', 'optimize', '0.1') + self._call_stdio('verbose', 'Call check options plugin for optimize') + return check_options_plugin(deploy_config.components.keys(), [], cluster_config, [], opts, self.stdio, optimize_config=optimize_config) + + @staticmethod + def _get_first_db_and_cursor_from_connect(connect_ret): + dbs = connect_ret.get_return('connect') + cursors = connect_ret.get_return('cursor') + if not dbs or not cursors: + return None, None + if isinstance(dbs, dict) and isinstance(cursors, dict): + tmp_server = list(dbs.keys())[0] + db = dbs[tmp_server] + cursor = cursors[tmp_server] + return db, cursor + else: + return dbs, cursors + + def _test_optimize_operation(self, deploy, optimize_envs, connect_context, stage=None, opts=None, operation='optimize'): + """ + + :param deploy: + :param stage: optimize stage + :param optimize_envs: envs for optimize plugin + :param connect_context: { + "": { + "db": db, + "cursor": cursor, + "connect_kwargs": { + "component": , + "target_server": "server1" # kwargs for connect plugin + } + } + } + :param operation: "optimize" or "recover" + :return: + """ + if operation == 'optimize': + self._call_stdio('verbose', 'Optimize for stage {}'.format(stage)) + elif operation == 'recover': + self._call_stdio('verbose', 'Recover the optimizes') + else: + raise Exception("Invalid optimize operation!") + deploy_config = deploy.deploy_config + ob_cursor = None + odp_cursor = None + cluster_config = None + for component in connect_context.keys(): + self._call_stdio('verbose', 'get cursor for component {}'.format(component)) + connect_context[component] = connect_context.get(component, {}) + cursor = connect_context[component].get('cursor') + db = connect_context[component].get('db') + if not cursor or not db: + self._call_stdio('verbose', 'cursor not found for component {}, try to connect'.format(component)) + connect_kwargs = connect_context[component].get('connect_kwargs', {}) + ret = self._get_connect(deploy, **connect_kwargs) + db, cursor = self._get_first_db_and_cursor_from_connect(ret) + connect_context[component]['db'] = db + cursor = connect_context[component]['cursor'] = cursor + if component in ['oceanbase', 'oceanbase-ce']: + ob_cursor = cursor + elif component in ['obproxy', 'obproxy-ce']: + odp_cursor = cursor + cluster_config = deploy_config.components[component] + operation_plugin = self.plugin_manager.get_best_py_script_plugin(operation, 'optimize', '0.1') + optimize_config = self.optimize_manager.optimize_config + kwargs = dict(optimize_config=optimize_config, stage=stage, ob_cursor=ob_cursor, odp_cursor=odp_cursor, optimize_envs=optimize_envs) + self._call_stdio('verbose', 'Call {} plugin.'.format(operation)) + ret = operation_plugin(deploy_config.components.keys(), [], cluster_config, [], opts, self.stdio, **kwargs) + if ret: + restart_components = ret.get_return('restart_components') + else: + return False + if restart_components: + self._call_stdio('verbose', 'Components {} need restart.'.format(','.join(restart_components))) + for component in restart_components: + self._call_stdio('verbose', 'close cursor for {}'.format(component)) + connect_context[component]['cursor'].close() + connect_context[component]['db'].close() + ret = self._restart_cluster_for_optimize(deploy.name, restart_components) + if not ret: + return False + if operation == 'optimize': + for component, connect_item in connect_context.items(): + connect_kwargs = connect_item['connect_kwargs'] + self._call_stdio('verbose', 'reconnect {} by kwargs {}'.format(component, connect_kwargs)) + if connect_kwargs['component_name'] in restart_components: + ret = self._get_connect(deploy, **connect_kwargs) + if not ret: + return False + db, cursor = self._get_first_db_and_cursor_from_connect(ret) + connect_context[component]['db'] = db + connect_context[component]['cursor'] = cursor + for component in restart_components: + self._call_stdio('verbose', '{}: major freeze for component ready'.format(component)) + self._call_stdio('start_loading', 'Waiting for {} ready'.format(component)) + cursor = connect_context[component]['cursor'] + if not self._major_freeze(deploy_config, component, cursor=cursor, tenant=optimize_envs.get('tenant')): + self._call_stdio('stop_loading', 'fail') + return False + self._call_stdio('stop_loading', 'succeed') + return True + + def _major_freeze(self, deploy_config, component, **kwargs): + cluster_config = deploy_config.components[component] + major_freeze_plugin = self.plugin_manager.get_best_py_script_plugin('major_freeze', component, cluster_config.version) + if not major_freeze_plugin: + self._call_stdio('verbose', 'no major freeze plugin for component {}, skip.'.format(component)) + return True + return major_freeze_plugin(deploy_config.components.keys(), [], cluster_config, [], {}, self.stdio, **kwargs) + + def _restart_cluster_for_optimize(self, deploy_name, components): + self._call_stdio('start_loading', 'Restart cluster') + if getattr(self.stdio, 'sub_io'): + stdio = self.stdio.sub_io(msg_lv=MsgLevel.ERROR) + else: + stdio = None + obd = ObdHome(self.home_path, self.dev_mode, stdio=stdio) + obd.lock_manager.set_try_times(-1) + option = Values({'components': ','.join(components), 'without_parameter': True}) + if obd.stop_cluster(name=deploy_name, options=option) and \ + obd.start_cluster(name=deploy_name, options=option) and obd.display_cluster(name=deploy_name): + self._call_stdio('stop_loading', 'succeed') + return True + else: + self._call_stdio('stop_loading', 'fail') + return False + + def _get_connect(self, deploy, component_name, **kwargs): + deploy_config = deploy.deploy_config + cluster_config = deploy_config.components[component_name] + connect_plugin = self.plugin_manager.get_best_py_script_plugin('connect', component_name, cluster_config.version) + ret = connect_plugin(deploy_config.components.keys(), [], cluster_config, [], {}, self.stdio, **kwargs) + if not ret or not ret.get_return('connect'): + return None + return ret + + def create_mysqltest_snap(self, deploy, ssh_clients, repositories, create_snap_plugin, start_plugins, stop_plugins, options, snap_configs, env={}): + deploy_config = deploy.deploy_config + for repository in repositories: + if repository in snap_configs: + cluster_config = deploy_config.components[repository.name] + self._call_stdio('verbose', 'Call %s for %s' % (stop_plugins[repository], repository)) + if not stop_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio): + return False + self._call_stdio('verbose', 'Call %s for %s' % (create_snap_plugin, repository)) + if not create_snap_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, env=env, snap_config=snap_configs[repository]): + return False + self._call_stdio('verbose', 'Call %s for %s' % (start_plugins[repository], repository)) + if not start_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, [], options, self.stdio, self.home_path, repository.repository_dir, deploy_name=deploy.name): + return False + return True + def mysqltest(self, name, opts): self._call_stdio('verbose', 'Get Deploy by name') deploy = self.deploy_manager.get_deploy_config(name) @@ -2474,9 +2810,15 @@ class ObdHome(object): self._call_stdio('error', 'No such deploy: %s.' % name) return False + fast_reboot = getattr(opts, 'fast_reboot', False) deploy_info = deploy.deploy_info self._call_stdio('verbose', 'Check deploy status') - if deploy_info.status != DeployStatus.STATUS_RUNNING: + if fast_reboot: + setattr(opts, 'without_parameter', True) + status = [DeployStatus.STATUS_DEPLOYED, DeployStatus.STATUS_RUNNING] + else: + status = [DeployStatus.STATUS_RUNNING] + if deploy_info.status not in status: self._call_stdio('print', 'Deploy "%s" is %s' % (name, deploy_info.status.value)) return False self._call_stdio('verbose', 'Get deploy configuration') @@ -2518,16 +2860,27 @@ class ObdHome(object): # Get the repository # repositories = self.get_local_repositories({opts.component: deploy_config.components[opts.component]}) repositories = self.load_local_repositories(deploy_info) + target_repository = None + ob_repository = None for repository in repositories: if repository.name == opts.component: - break - else: + target_repository = repository + if repository.name in ['oceanbase', 'oceanbase-ce']: + ob_repository = repository + + if not target_repository: self._call_stdio('error', 'Can not find the component for mysqltest, use `--component` to select component') return False + if not ob_repository: + self._call_stdio('error', 'Deploy {} must contain the component oceanbase or oceanbase-ce.'.format(deploy.name)) + return False # Check whether the components have the parameter plugins and apply the plugins self.search_param_plugin_and_apply(repositories, deploy_config) self._call_stdio('stop_loading', 'succeed') + if deploy_info.status == DeployStatus.STATUS_DEPLOYED and not self._start_cluster(deploy, repositories): + return False + # Get the client ssh_clients = self.get_clients(deploy_config, repositories) @@ -2544,29 +2897,39 @@ class ObdHome(object): self._call_stdio('print', '%s %s is stopped' % (server, repository.name)) return False - connect_plugin = self.search_py_script_plugin(repositories, 'connect')[repository] + connect_plugin = self.search_py_script_plugin(repositories, 'connect')[target_repository] ret = connect_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, target_server=opts.test_server, sys_root=False) if not ret or not ret.get_return('connect'): return False db = ret.get_return('connect') cursor = ret.get_return('cursor') - - mysqltest_init_plugin = self.plugin_manager.get_best_py_script_plugin('init', 'mysqltest', repository.version) - mysqltest_check_opt_plugin = self.plugin_manager.get_best_py_script_plugin('check_opt', 'mysqltest', repository.version) - mysqltest_check_test_plugin = self.plugin_manager.get_best_py_script_plugin('check_test', 'mysqltest', repository.version) - mysqltest_run_test_plugin = self.plugin_manager.get_best_py_script_plugin('run_test', 'mysqltest', repository.version) - mysqltest_collect_log_plugin = self.plugin_manager.get_best_py_script_plugin('collect_log', 'mysqltest', repository.version) - env = opts.__dict__ env['cursor'] = cursor env['host'] = opts.test_server.ip env['port'] = db.port - self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_check_opt_plugin, repository)) + + mysqltest_init_plugin = self.plugin_manager.get_best_py_script_plugin('init', 'mysqltest', ob_repository.version) + mysqltest_check_opt_plugin = self.plugin_manager.get_best_py_script_plugin('check_opt', 'mysqltest', ob_repository.version) + mysqltest_check_test_plugin = self.plugin_manager.get_best_py_script_plugin('check_test', 'mysqltest', ob_repository.version) + mysqltest_run_test_plugin = self.plugin_manager.get_best_py_script_plugin('run_test', 'mysqltest', ob_repository.version) + mysqltest_collect_log_plugin = self.plugin_manager.get_best_py_script_plugin('collect_log', 'mysqltest', ob_repository.version) + + start_plugins = self.search_py_script_plugin(repositories, 'start') + stop_plugins = self.search_py_script_plugin(repositories, 'stop') + # display_plugin = self.search_py_script_plugin(repositories, 'display')[repository] + + if fast_reboot: + create_snap_plugin = self.plugin_manager.get_best_py_script_plugin('create_snap', 'general', '0.1') + load_snap_plugin = self.plugin_manager.get_best_py_script_plugin('load_snap', 'general', '0.1') + snap_check_plugin = self.plugin_manager.get_best_py_script_plugin('snap_check', 'general', '0.1') + snap_configs = self.search_plugins(repositories, PluginType.SNAP_CONFIG, no_found_exit=False) + + self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_check_opt_plugin, target_repository)) ret = mysqltest_check_opt_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, env) if not ret: return False if not env['init_only']: - self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_check_test_plugin, repository)) + self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_check_test_plugin, target_repository)) ret = mysqltest_check_test_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, env) if not ret: self._call_stdio('error', 'Failed to get test set') @@ -2575,23 +2938,61 @@ class ObdHome(object): self._call_stdio('error', 'Test set is empty') return False + use_snap = False if env['need_init'] or env['init_only']: - self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_init_plugin, repository)) + self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_init_plugin, target_repository)) if not mysqltest_init_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, env): self._call_stdio('error', 'Failed to init for mysqltest') return False + if fast_reboot: + if not self.create_mysqltest_snap(deploy, ssh_clients, repositories, create_snap_plugin, start_plugins, stop_plugins, opts, snap_configs, env): + return False + connect_plugin = self.search_py_script_plugin(repositories, 'connect')[target_repository] + ret = connect_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, target_server=opts.test_server, sys_root=False) + if not ret or not ret.get_return('connect'): + return False + db = ret.get_return('connect') + cursor = ret.get_return('cursor') + env['cursor'] = cursor + env['host'] = opts.test_server.ip + env['port'] = db.port + self._call_stdio('start_loading', 'Check init') + env['load_snap'] = True + self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_init_plugin, target_repository)) + mysqltest_init_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, env) + env['load_snap'] = False + self._call_stdio('stop_loading', 'succeed') + use_snap = True + if env['init_only']: return True + if fast_reboot and use_snap is False: + self._call_stdio('start_loading', 'Check init') + env['load_snap'] = True + mysqltest_init_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, env) + env['load_snap'] = False + self._call_stdio('stop_loading', 'succeed') + snap_num = 0 + for repository in repositories: + if repository in snap_configs: + cluster_config = deploy_config.components[repository.name] + self._call_stdio('verbose', 'Call %s for %s' % (snap_check_plugin, repository)) + if not snap_check_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, env=env, snap_config=snap_configs[repository]): + break + snap_num += 1 + use_snap = len(snap_configs) == snap_num + env['load_snap'] = use_snap + self._call_stdio('verbose', 'test set: {}'.format(env['test_set'])) self._call_stdio('verbose', 'total: {}'.format(len(env['test_set']))) reboot_success = True while True: - self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_run_test_plugin, repository)) + self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_run_test_plugin, target_repository)) ret = mysqltest_run_test_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, env) if not ret: break - self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_collect_log_plugin, repository)) + self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_collect_log_plugin, target_repository)) mysqltest_collect_log_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, env) if ret.get_return('finished'): @@ -2609,19 +3010,36 @@ class ObdHome(object): while reboot_retries and not reboot_success: reboot_retries -= 1 with timeout(reboot_timeout): - self._call_stdio('start_loading', 'Reboot') - obd = ObdHome(self.home_path, self.dev_mode, stdio=stdio) - obd.lock_manager.set_try_times(-1) - if obd.redeploy_cluster( - name, - opt=Values({'force_kill': True, 'force': True, 'force_delete': True}), search_repo=False): - self._call_stdio('stop_loading', 'succeed') + if use_snap: + self._call_stdio('start_loading', 'Snap Reboot') + for repository in repositories: + if repository in snap_configs: + cluster_config = deploy_config.components[repository.name] + self._call_stdio('verbose', 'Call %s for %s' % (stop_plugins[repository], repository)) + if not stop_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, stdio): + self._call_stdio('stop_loading', 'fail') + continue + self._call_stdio('verbose', 'Call %s for %s' % (load_snap_plugin, repository)) + if not load_snap_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, stdio, env=env, snap_config=snap_configs[repository]): + self._call_stdio('stop_loading', 'fail') + continue + if not start_plugins[repository](deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, stdio, self.home_path, repository.repository_dir, deploy_name=deploy.name): + self._call_stdio('stop_loading', 'fail') + continue else: - self._call_stdio('stop_loading', 'fail') - continue - obd.lock_manager.set_try_times(6000) - obd = None - connect_plugin = self.search_py_script_plugin(repositories, 'connect')[repository] + self._call_stdio('start_loading', 'Reboot') + obd = ObdHome(self.home_path, self.dev_mode, stdio=stdio) + obd.lock_manager.set_try_times(-1) + if not obd.redeploy_cluster( + name, + opt=Values({'force_kill': True, 'force': True, 'force_delete': True}), search_repo=False): + self._call_stdio('stop_loading', 'fail') + continue + obd.lock_manager.set_try_times(6000) + obd = None + + self._call_stdio('stop_loading', 'succeed') + connect_plugin = self.search_py_script_plugin(repositories, 'connect')[target_repository] ret = connect_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, target_server=opts.test_server, sys_root=False) if not ret or not ret.get_return('connect'): @@ -2630,9 +3048,25 @@ class ObdHome(object): db = ret.get_return('connect') cursor = ret.get_return('cursor') env['cursor'] = cursor - self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_init_plugin, repository)) + + self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_init_plugin, target_repository)) if mysqltest_init_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, env): + if fast_reboot and use_snap is False: + if not self.create_mysqltest_snap(deploy, ssh_clients, repositories, create_snap_plugin, start_plugins, stop_plugins, opts, snap_configs, env): + return False + use_snap = True + connect_plugin = self.search_py_script_plugin(repositories, 'connect')[target_repository] + ret = connect_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, + self.stdio, target_server=opts.test_server, sys_root=False) + if not ret or not ret.get_return('connect'): + self._call_stdio('error', 'Failed to connect server') + continue + db = ret.get_return('connect') + cursor = ret.get_return('cursor') + env['cursor'] = cursor + self._call_stdio('verbose', 'Call %s for %s' % (mysqltest_init_plugin, target_repository)) + mysqltest_init_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, env) reboot_success = True else: self._call_stdio('error', 'Failed to prepare for mysqltest') @@ -2723,69 +3157,90 @@ class ObdHome(object): ssh_clients = self.get_clients(deploy_config, repositories) # Check the status for the deployed cluster - component_status = {} - cluster_status = self.cluster_status_check(ssh_clients, deploy_config, repositories, component_status) - if cluster_status is False or cluster_status == 0: - if self.stdio: - self._call_stdio('error', EC_SOME_SERVER_STOPED) - for repository in component_status: - cluster_status = component_status[repository] - for server in cluster_status: - if cluster_status[server] == 0: - self._call_stdio('print', '%s %s is stopped' % (server, repository.name)) - return False + if not getattr(opts, 'skip_cluster_status_check', False): + component_status = {} + cluster_status = self.cluster_status_check(ssh_clients, deploy_config, repositories, component_status) + if cluster_status is False or cluster_status == 0: + if self.stdio: + self._call_stdio('error', EC_SOME_SERVER_STOPED) + for repository in component_status: + cluster_status = component_status[repository] + for server in cluster_status: + if cluster_status[server] == 0: + self._call_stdio('print', '%s %s is stopped' % (server, repository.name)) + return False ob_repository = None repository = None + env = {'sys_root': False} + odp_db = None + odp_cursor = None + ob_component = None + connect_context = {} for tmp_repository in repositories: if tmp_repository.name in ["oceanbase", "oceanbase-ce"]: ob_repository = tmp_repository + ob_component = tmp_repository.name if tmp_repository.name == opts.component: repository = tmp_repository - + if tmp_repository.name in ['obproxy', 'obproxy-ce']: + odp_component = tmp_repository.name + allow_components = ['oceanbase', 'oceanbase-ce'] + for component_name in deploy_config.components: + if component_name in allow_components: + config = deploy_config.components[component_name] + env['user'] = 'root' + env['password'] = config.get_global_conf().get('root_password', '') + env['target_server'] = opts.test_server + break + connect_kwargs = dict(component_name=odp_component, target_server=opts.test_server) + ret = self._get_connect(deploy, **connect_kwargs) + if not ret or not ret.get_return('connect'): + return False + odp_db, odp_cursor = self._get_first_db_and_cursor_from_connect(ret) + connect_context[tmp_repository.name] = {'connect_kwargs': connect_kwargs, 'db': odp_db, + 'cursor': odp_cursor} + if not ob_repository: + self._call_stdio('error', 'Deploy {} must contain the component oceanbase or oceanbase-ce.'.format(deploy.name)) + return False plugin_version = ob_repository.version if ob_repository else repository.version - env = {'sys_root': False} - db = None - cursor = None - odp_db = None - odp_cursor = None - ob_optimization = True - - connect_plugin = self.search_py_script_plugin(repositories, 'connect')[repository] - - if repository.name in ['obproxy', 'obproxy-ce']: - ob_optimization = False - allow_components = ['oceanbase', 'oceanbase-ce'] - for component_name in deploy_config.components: - if component_name in allow_components: - config = deploy_config.components[component_name] - env['user'] = 'root' - env['password'] = config.get_global_conf().get('root_password', '') - ob_optimization = True - break - ret = connect_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, target_server=opts.test_server) - if not ret or not ret.get_return('connect'): - return False - odp_db = ret.get_return('connect') - odp_cursor = ret.get_return('cursor') - - ret = connect_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, target_server=opts.test_server, **env) + connect_kwargs = dict(component_name=repository.name, **env) + ret = self._get_connect(deploy=deploy, **connect_kwargs) if not ret or not ret.get_return('connect'): return False - db = ret.get_return('connect') - cursor = ret.get_return('cursor') + db, cursor = self._get_first_db_and_cursor_from_connect(ret) + connect_context[ob_component] = {'connect_kwargs': connect_kwargs, 'db': db, 'cursor': cursor} + pre_test_plugin = self.plugin_manager.get_best_py_script_plugin('pre_test', 'sysbench', plugin_version) run_test_plugin = self.plugin_manager.get_best_py_script_plugin('run_test', 'sysbench', plugin_version) setattr(opts, 'host', opts.test_server.ip) setattr(opts, 'port', db.port) - setattr(opts, 'ob_optimization', ob_optimization) - self._call_stdio('verbose', 'Call %s for %s' % (run_test_plugin, repository)) - if run_test_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio, db, cursor, odp_db, odp_cursor): - return True - return False + optimization = getattr(opts, 'optimization', 0) + + self._call_stdio('verbose', 'Call %s for %s' % (pre_test_plugin, repository)) + ret = pre_test_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio, cursor=cursor) + if not ret: + return False + kwargs = ret.kwargs + optimization_init = False + try: + if optimization: + if not self._test_optimize_init(opts=opts, test_name='sysbench', deploy_config=deploy_config, cluster_config=cluster_config): + return False + optimization_init = True + if not self._test_optimize_operation(deploy=deploy, stage='test', opts=opts, connect_context=connect_context, optimize_envs=kwargs): + return False + self._call_stdio('verbose', 'Call %s for %s' % (run_test_plugin, repository)) + if run_test_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio): + return True + + return False + finally: + if optimization and optimization_init: + self._test_optimize_operation(deploy=deploy, connect_context=connect_context, optimize_envs=kwargs, operation='recover') def tpch(self, name, opts): self._call_stdio('verbose', 'Get Deploy by name') @@ -2842,25 +3297,28 @@ class ObdHome(object): # Get the client ssh_clients = self.get_clients(deploy_config, repositories) - # Check the status for the deployed cluster - component_status = {} - cluster_status = self.cluster_status_check(ssh_clients, deploy_config, repositories, component_status) - if cluster_status is False or cluster_status == 0: - if self.stdio: - self._call_stdio('error', EC_SOME_SERVER_STOPED) - for repository in component_status: - cluster_status = component_status[repository] - for server in cluster_status: - if cluster_status[server] == 0: - self._call_stdio('print', '%s %s is stopped' % (server, repository.name)) - return False + if not getattr(opts, 'skip_cluster_status_check', False): + # Check the status for the deployed cluster + component_status = {} + cluster_status = self.cluster_status_check(ssh_clients, deploy_config, repositories, component_status) + if cluster_status is False or cluster_status == 0: + if self.stdio: + self._call_stdio('error', EC_SOME_SERVER_STOPED) + for repository in component_status: + cluster_status = component_status[repository] + for server in cluster_status: + if cluster_status[server] == 0: + self._call_stdio('print', '%s %s is stopped' % (server, repository.name)) + return False - connect_plugin = self.search_py_script_plugin(repositories, 'connect')[repository] - ret = connect_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, target_server=opts.test_server) + connect_context = {} + connect_kwargs = dict(component_name=repository.name, target_server=opts.test_server) + ret = self._get_connect(deploy=deploy, **connect_kwargs) if not ret or not ret.get_return('connect'): return False db = ret.get_return('connect') cursor = ret.get_return('cursor') + connect_context[repository.name] = {'connect_kwargs': connect_kwargs, 'db': db, 'cursor': cursor} pre_test_plugin = self.plugin_manager.get_best_py_script_plugin('pre_test', 'tpch', repository.version) run_test_plugin = self.plugin_manager.get_best_py_script_plugin('run_test', 'tpch', repository.version) @@ -2868,13 +3326,31 @@ class ObdHome(object): setattr(opts, 'host', opts.test_server.ip) setattr(opts, 'port', db.port) + optimization = getattr(opts, 'optimization', 0) self._call_stdio('verbose', 'Call %s for %s' % (pre_test_plugin, repository)) - if pre_test_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio): + ret = pre_test_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio, cursor=cursor) + if not ret: + return False + kwargs = ret.kwargs + optimization_init = False + try: + if optimization: + if not self._test_optimize_init(opts=opts, test_name='tpch', deploy_config=deploy_config, cluster_config=cluster_config): + return False + optimization_init = True + if not self._test_optimize_operation(deploy=deploy, stage='test', opts=opts, connect_context=connect_context, optimize_envs=kwargs): + return False self._call_stdio('verbose', 'Call %s for %s' % (run_test_plugin, repository)) - if run_test_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio, db, cursor): + if run_test_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio, db, cursor, **kwargs): return True - return False + return False + except Exception as e: + self._call_stdio('error', e) + return False + finally: + if optimization and optimization_init: + self._test_optimize_operation(deploy=deploy, connect_context=connect_context, optimize_envs=kwargs, operation='recover') def update_obd(self, version, install_prefix='/'): self._obd_update_lock() @@ -2925,7 +3401,7 @@ class ObdHome(object): if opts.component not in deploy_config.components: self._call_stdio('error', 'Can not find the component for tpcds, use `--component` to select component') return False - + for component_name in db_components: if component_name in deploy_config.components: db_component = component_name @@ -3030,7 +3506,6 @@ class ObdHome(object): self._call_stdio('start_loading', 'Get local repositories and plugins') # Get the repository - # repositories = self.get_local_repositories({opts.component: deploy_config.components[opts.component]}) repositories = self.load_local_repositories(deploy_info) # Check whether the components have the parameter plugins and apply the plugins @@ -3041,143 +3516,90 @@ class ObdHome(object): ssh_clients = self.get_clients(deploy_config, repositories) # Check the status for the deployed cluster - component_status = {} - cluster_status = self.cluster_status_check(ssh_clients, deploy_config, repositories, component_status) - if cluster_status is False or cluster_status == 0: - if self.stdio: - self._call_stdio('error', EC_SOME_SERVER_STOPED) - for repository in component_status: - cluster_status = component_status[repository] - for server in cluster_status: - if cluster_status[server] == 0: - self._call_stdio('print', '%s %s is stopped' % (server, repository.name)) - return False + if not getattr(opts, 'skip_cluster_status_check', False): + component_status = {} + cluster_status = self.cluster_status_check(ssh_clients, deploy_config, repositories, component_status) + if cluster_status is False or cluster_status == 0: + if self.stdio: + self._call_stdio('error', EC_SOME_SERVER_STOPED) + for repository in component_status: + cluster_status = component_status[repository] + for server in cluster_status: + if cluster_status[server] == 0: + self._call_stdio('print', '%s %s is stopped' % (server, repository.name)) + return False ob_repository = None repository = None + env = {} + odp_cursor = None + ob_component = None + odp_component = None + connect_context = {} for tmp_repository in repositories: if tmp_repository.name in ["oceanbase", "oceanbase-ce"]: ob_repository = tmp_repository + ob_component = tmp_repository.name if tmp_repository.name == opts.component: repository = tmp_repository - + if tmp_repository.name in ['obproxy', 'obproxy-ce']: + odp_component = tmp_repository.name + allow_components = ['oceanbase', 'oceanbase-ce'] + for component in deploy_info.components: + if component in allow_components: + config = deploy_config.components[component] + env['user'] = 'root' + env['password'] = config.get_global_conf().get('root_password', '') + env['target_server'] = opts.test_server + break + connect_kwargs = dict(component_name=odp_component, target_server=opts.test_server) + ret = self._get_connect(deploy, **connect_kwargs) + if not ret or not ret.get_return('connect'): + return False + odp_db, odp_cursor = self._get_first_db_and_cursor_from_connect(ret) + connect_context[odp_component] = {'connect_kwargs': connect_kwargs, 'db': odp_db, 'cursor': odp_cursor} + if not ob_repository: + self._call_stdio('error', 'Deploy {} must contain the component oceanbase or oceanbase-ce.'.format(deploy.name)) + return False plugin_version = ob_repository.version if ob_repository else repository.version - - env = {'sys_root': False} - odp_db = None - odp_cursor = None - ob_optimization = True - ob_component = None - odp_component = None - # ob_cluster_config = None - - connect_plugin = self.search_py_script_plugin(repositories, 'connect')[repository] - - if repository.name in ['obproxy', 'obproxy-ce']: - odp_component = repository.name - ob_optimization = False - allow_components = ['oceanbase', 'oceanbase-ce'] - for component in deploy_info.components: - if component in allow_components: - ob_component = component - config = deploy_config.components[component] - env['user'] = 'root' - env['password'] = config.get_global_conf().get('root_password', '') - ob_optimization = True - break - ret = connect_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, - target_server=opts.test_server) - if not ret or not ret.get_return('connect'): - return False - odp_db = ret.get_return('connect') - odp_cursor = ret.get_return('cursor') - # ob_cluster_config = deploy_config.components[ob_component] - else: - ob_component = opts.component - # ob_cluster_config = cluster_config - - ret = connect_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, self.stdio, - target_server=opts.test_server, **env) + connect_kwargs = dict(component_name=repository.name, **env) + ret = self._get_connect(deploy=deploy, **connect_kwargs) if not ret or not ret.get_return('connect'): return False - db = ret.get_return('connect') - cursor = ret.get_return('cursor') + db, cursor = self._get_first_db_and_cursor_from_connect(ret) + connect_context[ob_component] = {'connect_kwargs': connect_kwargs, 'db': db, 'cursor': cursor} + pre_test_plugin = self.plugin_manager.get_best_py_script_plugin('pre_test', 'tpcc', plugin_version) - optimize_plugin = self.plugin_manager.get_best_py_script_plugin('optimize', 'tpcc', plugin_version) build_plugin = self.plugin_manager.get_best_py_script_plugin('build', 'tpcc', plugin_version) run_test_plugin = self.plugin_manager.get_best_py_script_plugin('run_test', 'tpcc', plugin_version) - recover_plugin = self.plugin_manager.get_best_py_script_plugin('recover', 'tpcc', plugin_version) setattr(opts, 'host', opts.test_server.ip) setattr(opts, 'port', db.port) - setattr(opts, 'ob_optimization', ob_optimization) kwargs = {} - optimized = False optimization = getattr(opts, 'optimization', 0) test_only = getattr(opts, 'test_only', False) - components = [] - if getattr(self.stdio, 'sub_io'): - stdio = self.stdio.sub_io() - else: - stdio = None - obd = None + optimization_inited = False try: self._call_stdio('verbose', 'Call %s for %s' % (pre_test_plugin, repository)) ret = pre_test_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio, - cursor, odp_cursor, **kwargs) + cursor=cursor, odp_cursor=odp_cursor, **kwargs) if not ret: return False else: kwargs.update(ret.kwargs) if optimization: - optimized = True - kwargs['optimization_step'] = 'build' - self._call_stdio('verbose', 'Call %s for %s' % (optimize_plugin, repository)) - ret = optimize_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio, cursor, - odp_cursor, **kwargs) - if not ret: + if not self._test_optimize_init(opts=opts, test_name='tpcc', deploy_config=deploy_config, cluster_config=cluster_config): + return False + optimization_inited = True + if not self._test_optimize_operation(deploy=deploy, stage='build', opts=opts, connect_context=connect_context, optimize_envs=kwargs): return False - else: - kwargs.update(ret.kwargs) - if kwargs.get('odp_need_reboot') and odp_component: - components.append(odp_component) - if kwargs.get('obs_need_reboot') and ob_component: - components.append(ob_component) - if components: - db.close() - cursor.close() - if odp_db: - odp_db.close() - if odp_cursor: - odp_cursor.close() - self._call_stdio('start_loading', 'Restart cluster') - obd = ObdHome(self.home_path, self.dev_mode, stdio=stdio) - obd.lock_manager.set_try_times(-1) - option = Values({'components': ','.join(components), 'without_parameter': True}) - if obd.stop_cluster(name=name, options=option) and obd.start_cluster(name=name, options=option) and obd.display_cluster(name=name): - self._call_stdio('stop_loading', 'succeed') - else: - self._call_stdio('stop_loading', 'fail') - return False - if repository.name in ['obproxy', 'obproxy-ce']: - ret = connect_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, - self.stdio, - target_server=opts.test_server) - if not ret or not ret.get_return('connect'): - return False - odp_db = ret.get_return('connect') - odp_cursor = ret.get_return('cursor') - ret = connect_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], {}, - self.stdio, - target_server=opts.test_server, **env) - if not ret or not ret.get_return('connect'): - return False - db = ret.get_return('connect') - cursor = ret.get_return('cursor') if not test_only: self._call_stdio('verbose', 'Call %s for %s' % (build_plugin, repository)) + cursor = connect_context[ob_component]['cursor'] + if odp_component: + odp_cursor = connect_context[odp_component]['cursor'] ret = build_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio, cursor, odp_cursor, **kwargs) if not ret: @@ -3185,17 +3607,12 @@ class ObdHome(object): else: kwargs.update(ret.kwargs) if optimization: - kwargs['optimization_step'] = 'test' - self._call_stdio('verbose', 'Call %s for %s' % (optimize_plugin, repository)) - ret = optimize_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio, cursor, - odp_cursor, **kwargs) + ret = self._test_optimize_operation(deploy=deploy, stage='test', opts=opts, connect_context=connect_context, optimize_envs=kwargs) if not ret: return False - else: - kwargs.update(ret.kwargs) self._call_stdio('verbose', 'Call %s for %s' % (run_test_plugin, repository)) - ret = run_test_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio, cursor, - odp_cursor, **kwargs) + cursor = connect_context[ob_component]['cursor'] + ret = run_test_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio, cursor, **kwargs) if not ret: return False else: @@ -3205,22 +3622,8 @@ class ObdHome(object): self._call_stdio('error', e) return False finally: - if optimization and optimized: - self._call_stdio('verbose', 'Call %s for %s' % (recover_plugin, repository)) - if not recover_plugin(deploy_config.components.keys(), ssh_clients, cluster_config, [], opts, self.stdio, - cursor, odp_cursor, **kwargs): - return False - if components and obd: - self._call_stdio('start_loading', 'Restart cluster') - option = Values({'components': ','.join(components), 'without_parameter': True}) - if obd.stop_cluster(name=name, options=option) and obd.start_cluster(name=name, options=option): - self._call_stdio('stop_loading', 'succeed') - else: - self._call_stdio('stop_loading', 'fail') - if db: - db.close() - if odp_db: - odp_db.close() + if optimization and optimization_inited: + self._test_optimize_operation(deploy=deploy, connect_context=connect_context, optimize_envs=kwargs, operation='recover') def db_connect(self, name, opts): self._call_stdio('verbose', 'Get Deploy by name') @@ -3247,7 +3650,7 @@ class ObdHome(object): self._call_stdio('error', '%s not support. %s is allowed' % (opts.component, allow_components)) return False if opts.component not in deploy_config.components: - self._call_stdio('error', 'Can not find the component for tpch, use `--component` to select component') + self._call_stdio('error', 'Can not find the component for db connect, use `--component` to select component') return False cluster_config = deploy_config.components[opts.component] @@ -3266,7 +3669,8 @@ class ObdHome(object): return False self._call_stdio('start_loading', 'Get local repositories and plugins') # Get the repository - repositories = self.load_local_repositories(deploy_info) + repositories = self.get_local_repositories({opts.component: deploy_config.components[opts.component]}) + # Check whether the components have the parameter plugins and apply the plugins self.search_param_plugin_and_apply(repositories, deploy_config) self._call_stdio('stop_loading', 'succeed') @@ -3322,3 +3726,70 @@ class ObdHome(object): results = context.get('results', []) self._call_stdio("print_list", results, ["Component", "Server", cmd_name.title()], title=cmd_name.title()) return not context.get('failed') + + def dooba(self, name, opts): + self._call_stdio('verbose', 'Get Deploy by name') + deploy = self.deploy_manager.get_deploy_config(name, read_only=True) + if not deploy: + self._call_stdio('error', 'No such deploy: %s.' % name) + return False + + self._call_stdio('verbose', 'Get deploy configuration') + deploy_config = deploy.deploy_config + deploy_info = deploy.deploy_info + + if deploy_info.status in (DeployStatus.STATUS_DESTROYED, DeployStatus.STATUS_CONFIGURED): + self._call_stdio('print', 'Deploy "%s" is %s' % (name, deploy_info.status.value)) + return False + + allow_components = ['obproxy', 'obproxy-ce', 'oceanbase', 'oceanbase-ce'] + if opts.component is None: + for component_name in allow_components: + if component_name in deploy_config.components: + opts.component = component_name + break + elif opts.component not in allow_components: + self._call_stdio('error', '%s not support. %s is allowed' % (opts.component, allow_components)) + return False + if opts.component not in deploy_config.components: + self._call_stdio('error', + 'Can not find the component for dooba, use `--component` to select component') + return False + + for component in deploy_config.components: + if component in ['oceanbase', 'oceanbase-ce']: + break + else: + self._call_stdio('error', 'Dooba must contain the component oceanbase or oceanbase-ce.') + return False + + cluster_config = deploy_config.components[opts.component] + if not cluster_config.servers: + self._call_stdio('error', '%s server list is empty' % opts.component) + return False + if opts.server is None: + opts.server = cluster_config.servers[0] + else: + for server in cluster_config.servers: + if server.name == opts.server: + opts.server = server + break + else: + self._call_stdio('error', '%s is not a server in %s' % (opts.server, opts.component)) + return False + self._call_stdio('start_loading', 'Get local repositories and plugins') + # Get the repository + repositories = self.load_local_repositories(deploy_info) + plugin_version = None + for repository in repositories: + if repository.name in ['oceanbase', 'oceanbase-ce']: + plugin_version = repository.version + break + # Check whether the components have the parameter plugins and apply the plugins + self.search_param_plugin_and_apply(repositories, deploy_config) + self._call_stdio('stop_loading', 'succeed') + + sync_config_plugin = self.plugin_manager.get_best_py_script_plugin('sync_cluster_config', 'general', '0.1') + sync_config_plugin(deploy_config.components.keys(), [], cluster_config, [], opts, self.stdio) + dooba_plugin = self.plugin_manager.get_best_py_script_plugin('run', 'dooba', plugin_version) + return dooba_plugin(deploy_config.components.keys(), [], cluster_config, [], opts, self.stdio) \ No newline at end of file diff --git a/docs/.menu_map.yml b/docs/.menu_map.yml new file mode 100644 index 0000000000000000000000000000000000000000..71cca576339f3b294620825bfccc0d7d6bca7d64 --- /dev/null +++ b/docs/.menu_map.yml @@ -0,0 +1,3 @@ +3.user-guide=使用指南 +3.obd-command=OBD 命令 +5.faq=常见问题 diff --git a/docs/.menu_map_en.yml b/docs/.menu_map_en.yml new file mode 100644 index 0000000000000000000000000000000000000000..264ff28d874a34132ce8e94fe273b8dce098806a --- /dev/null +++ b/docs/.menu_map_en.yml @@ -0,0 +1,2 @@ +3.user-guide=User Guide +3.obd-command=OBD command \ No newline at end of file diff --git a/docs/en-US/3.user-guide/2.start-the-oceanbase-cluster-by-using-obd.md b/docs/en-US/3.user-guide/2.start-the-oceanbase-cluster-by-using-obd.md index bf1979887fc189bf016b9bb2d230849a6913032a..674fb086748010e68b9fb6a309908f0a5f87a22d 100644 --- a/docs/en-US/3.user-guide/2.start-the-oceanbase-cluster-by-using-obd.md +++ b/docs/en-US/3.user-guide/2.start-the-oceanbase-cluster-by-using-obd.md @@ -4,30 +4,39 @@ To start an OceanBase cluster, follow these steps: ## Step 1: Select a configuration file -Select a configuration file based on your resource configurations: +OBD provides different configuration files for different deployment scenarios. These configuration file examples are placed in the directory `/usr/OBD/example/`. Select a configuration file based on your resource configurations: ### Small-scale deployment mode This deployment mode applies to personal devices with at least 8 GB of memory. -- [Sample configuration file for local single-node deployment](https://github.com/oceanbase/obdeploy/blob/master/example/mini-local-example.yaml) -- [Sample configuration file for single-node deployment](https://github.com/oceanbase/obdeploy/blob/master/example/mini-single-example.yaml) -- [Sample configuration file for three-node deployment](https://github.com/oceanbase/obdeploy/blob/master/example/mini-distributed-example.yaml) -- [Sample configuration file for single-node deployment with ODP](https://github.com/oceanbase/obdeploy/blob/master/example/mini-single-with-obproxy-example.yaml) -- [Sample configuration file for three-node deployment with ODP](https://github.com/oceanbase/obdeploy/blob/master/example/mini-distributed-with-obproxy-example.yaml) +- Sample configuration file for local single-node deployment: /usr/obd/example/mini-local-example.yaml + +- Sample configuration file for single-node deployment: /usr/obd/example/mini-single-example.yaml + +- Sample configuration file for three-node deployment: /usr/obd/example/mini-distributed-example.yaml + +- Sample configuration file for single-node deployment with ODP: /usr/obd/example/mini-single-with-obproxy-example.yaml + +- Sample configuration file for three-node deployment with ODP: /usr/obd/example/mini-distributed-with-obproxy-example.yaml ### Professional deployment mode This deployment mode applies to advanced Elastic Compute Service (ECS) instances or physical servers with at least 16 CPU cores and 64 GB of memory. -- [Sample configuration file for local single-node deployment](https://github.com/oceanbase/obdeploy/blob/master/example/local-example.yaml) -- [Sample configuration file for single-node deployment](https://github.com/oceanbase/obdeploy/blob/master/example/single-example.yaml) -- [Sample configuration file for three-node deployment](https://github.com/oceanbase/obdeploy/blob/master/example/distributed-example.yaml) -- [Sample configuration file for single-node deployment with ODP](https://github.com/oceanbase/obdeploy/blob/master/example/single-with-obproxy-example.yaml) -- [Sample configuration file for three-node deployment with ODP](https://github.com/oceanbase/obdeploy/blob/master/example/distributed-with-obproxy-example.yaml) -- [Sample configuration file for three-node deployment with ODP and obagent](https://github.com/oceanbase/obdeploy/blob/master/example/obagent/distributed-with-obproxy-and-obagent-example.yaml) +- Sample configuration file for local single-node deployment: /usr/obd/example/local-example.yaml + +- Sample configuration file for single-node deployment: /usr/obd/example/single-example.yaml -This section describes how to start a local single-node OceanBase cluster by using the [sample configuration file for local single-node deployment in the small-scale deployment mode](https://github.com/oceanbase/obdeploy/blob/master/example/mini-local-example.yaml). +- Sample configuration file for three-node deployment: /usr/obd/example/distributed-example.yaml + +- Sample configuration file for single-node deployment with ODP: /usr/obd/example/single-with-obproxy-example.yaml + +- Sample configuration file for three-node deployment with ODP: /usr/obd/example/distributed-with-obproxy-example.yaml + +- Sample configuration file for three-node deployment with ODP and obagent: /usr/obd/example/obagent/distributed-with-obproxy-and-obagent-example.yaml + +This section describes how to start a local single-node OceanBase cluster by using the sample configuration file for local single-node deployment in the small-scale deployment mode: /usr/obd/example/mini-local-example.yaml. ```shell # Modify the working directory of the OceanBase cluster: home_path. @@ -47,7 +56,10 @@ user: ``` `username` specifies the username used to log on to the target server. Make sure that your username has the write permission on the `home_path` directory. `password` and `key_file` are used to authenticate the user. Generally, only one of them is required. -> **NOTE:** After you specify the path of the key, add an annotation to the `password` field or delete it if your key does not require a password. Otherwise, `password` will be deemed as the password of the key and used for login, leading to a logon verification failure. + +> **NOTE:** +> +> After you specify the path of the key, add an annotation to the `password` field or delete it if your key does not require a password. Otherwise, `password` will be deemed as the password of the key and used for login, leading to a logon verification failure. ## Step 2: Deploy and start a cluster diff --git a/docs/en-US/3.user-guide/3.obd-command/1.cluster-command-groups.md b/docs/en-US/3.user-guide/3.obd-command/1.cluster-command-groups.md index 64aa0d3399bd1f962b0cc6d6f9b2b9790bbbd37c..a174ef0f969664c4f43664c3bc74338c14d1d29a 100644 --- a/docs/en-US/3.user-guide/3.obd-command/1.cluster-command-groups.md +++ b/docs/en-US/3.user-guide/3.obd-command/1.cluster-command-groups.md @@ -181,12 +181,28 @@ obd cluster upgrade -c -V [tags] | Option | Required | Data type | Default value | Description | --- | --- | --- |--- |--- -c/--component | Yes | string | empty | The component name you want to upgrade. --V/--version | Yes | string | The target upgrade version number. +-V/--version | Yes | string | empty | The target upgrade version number. --skip-check | No | bool | false | Skip check. --usable | No | string | empty | The hash list for the mirrors that you use during upgrade. Separated with `,`. --disable | No | string | empty | The hash list for the mirrors that you disable during upgrade. Separated with `,`. -e/--executer-path | No | string | /usr/obd/lib/executer | The executer path for the upgrade script. +## obd cluster reinstall + +You can run this command to reinstall the repository of a deployed component. The new repository must have the same version number as the previous repository. If this command is used to replace the repository when the deployment status is 'running', the component is restarted after the replacement without loading parameters. + +```bash +obd cluster reinstall -c --hash [-f/--force] +``` + +The `deploy name` parameter indicates the name of the deployed cluster, which is also the alias of the configuration file. + +| Option name | Required | Data type | Default value | Description | +|---------|----------|-------------|-------------|--------------| +| -c/--component | Yes | string | Null | The name of the component whose repository is to be replaced. | +|--hash | Yes | string | Null | The target repository. It must be of the same version as the current repository. | +| -f/--force | No | Bool | false | Specifies whether to enable forced replacement even if the restart fails. | + ## `obd cluster tenant create` Creates a tenant. This command applies only to an OceanBase cluster. This command automatically creates resource units and resource pools. @@ -232,3 +248,41 @@ obd cluster tenant drop [-n ] `deploy name` specifies the name of the deployment configuration file. `-n` is `--tenant-name`. This option specifies the name of the tenant to be deleted. This option is required. + +## obd cluster chst + +You can run this command to change the configuration style. + +```shell +obd cluster chst --style