未验证 提交 10ac254f 编写于 作者: R Rongfeng Fu 提交者: GitHub

V1.3.0 (#93)

* v1.3.0
上级 02b42498
......@@ -27,11 +27,12 @@ import time
import logging
from logging import handlers
from uuid import uuid1 as uuid
from optparse import OptionParser,OptionGroup
from optparse import OptionParser, OptionGroup, BadOptionError, Option
from core import ObdHome
from _stdio import IO
from log import Logger
from _errno import DOC_LINK_MSG, LockError
from tool import DirectoryUtil, FileUtil
......@@ -43,6 +44,49 @@ BUILD_TIME = '<B_TIME>'
DEBUG = True if '<DEBUG>' else False
class AllowUndefinedOptionParser(OptionParser):
def __init__(self,
usage=None,
option_list=None,
option_class=Option,
version=None,
conflict_handler="error",
description=None,
formatter=None,
add_help_option=True,
prog=None,
epilog=None,
allow_undefine=True):
OptionParser.__init__(
self, usage, option_list, option_class, version, conflict_handler,
description, formatter, add_help_option, prog, epilog
)
self.allow_undefine = allow_undefine
def warn(self, msg, file=None):
print ('warn: %s' % msg)
def _process_long_opt(self, rargs, values):
try:
OptionParser._process_long_opt(self, rargs, values)
except BadOptionError as e:
if self.allow_undefine:
return self.warn(e)
else:
raise e
def _process_short_opts(self, rargs, values):
try:
OptionParser._process_short_opts(self, rargs, values)
except BadOptionError as e:
if self.allow_undefine:
return self.warn(e)
return
else:
raise e
class BaseCommand(object):
def __init__(self, name, summary):
......@@ -53,7 +97,8 @@ class BaseCommand(object):
self.opts = {}
self.prev_cmd = ''
self.is_init = False
self.parser = OptionParser(add_help_option=False)
self.hidden = False
self.parser = AllowUndefinedOptionParser(add_help_option=False)
self.parser.add_option('-h', '--help', action='callback', callback=self._show_help, help='Show help and exit.')
self.parser.add_option('-v', '--verbose', action='callback', callback=self._set_verbose, help='Activate verbose output.')
......@@ -91,6 +136,7 @@ class ObdCommand(BaseCommand):
OBD_PATH = os.path.join(os.environ.get('OBD_HOME', os.getenv('HOME')), '.obd')
OBD_INSTALL_PRE = os.environ.get('OBD_INSTALL_PRE', '/')
OBD_DEV_MODE_FILE = '.dev_mode'
def init_home(self):
version_path = os.path.join(self.OBD_PATH, 'version')
......@@ -99,23 +145,30 @@ class ObdCommand(BaseCommand):
version_fobj.seek(0)
version = version_fobj.read()
if VERSION != version:
obd_plugin_path = os.path.join(self.OBD_PATH, 'plugins')
if DirectoryUtil.mkdir(self.OBD_PATH):
root_plugin_path = os.path.join(self.OBD_INSTALL_PRE, 'usr/obd/plugins')
if os.path.exists(root_plugin_path):
DirectoryUtil.copy(root_plugin_path, obd_plugin_path, ROOT_IO)
obd_mirror_path = os.path.join(self.OBD_PATH, 'mirror')
obd_remote_mirror_path = os.path.join(self.OBD_PATH, 'mirror/remote')
if DirectoryUtil.mkdir(obd_mirror_path):
root_remote_mirror = os.path.join(self.OBD_INSTALL_PRE, 'usr/obd/mirror/remote')
if os.path.exists(root_remote_mirror):
DirectoryUtil.copy(root_remote_mirror, obd_remote_mirror_path, ROOT_IO)
for part in ['plugins', 'config_parser', 'mirror/remote']:
obd_part_dir = os.path.join(self.OBD_PATH, part)
if DirectoryUtil.mkdir(self.OBD_PATH):
root_part_path = os.path.join(self.OBD_INSTALL_PRE, 'usr/obd/', part)
if os.path.exists(root_part_path):
DirectoryUtil.copy(root_part_path, obd_part_dir, ROOT_IO)
version_fobj.seek(0)
version_fobj.truncate()
version_fobj.write(VERSION)
version_fobj.flush()
version_fobj.close()
@property
def dev_mode_path(self):
return os.path.join(self.OBD_PATH, self.OBD_DEV_MODE_FILE)
@property
def dev_mode(self):
return os.path.exists(self.dev_mode_path)
def parse_command(self):
self.parser.allow_undefine = self.dev_mode
return super(ObdCommand, self).parse_command()
def do_command(self):
self.parse_command()
self.init_home()
......@@ -127,7 +180,7 @@ class ObdCommand(BaseCommand):
log_path = os.path.join(log_dir, 'obd')
logger = Logger('obd')
handler = handlers.TimedRotatingFileHandler(log_path, when='midnight', interval=1, backupCount=30)
handler.setFormatter(logging.Formatter("[%%(asctime)s] [%s] [%%(levelname)s] %%(message)s" % trace_id, "%Y-%m-%d %H:%M:%S"))
handler.setFormatter(logging.Formatter("[%%(asctime)s.%%(msecs)03d] [%s] [%%(levelname)s] %%(message)s" % trace_id, "%Y-%m-%d %H:%M:%S"))
logger.addHandler(handler)
ROOT_IO.trace_logger = logger
obd = ObdHome(self.OBD_PATH, ROOT_IO)
......@@ -135,14 +188,17 @@ class ObdCommand(BaseCommand):
ROOT_IO.verbose('cmd: %s' % self.cmds)
ROOT_IO.verbose('opts: %s' % self.opts)
ret = self._do_command(obd)
if not ret:
ROOT_IO.print(DOC_LINK_MSG)
except NotImplementedError:
ROOT_IO.exception('command \'%s\' is not implemented' % self.prev_cmd)
except IOError:
except LockError:
ROOT_IO.exception('Another app is currently holding the obd lock.')
except SystemExit:
pass
except:
ROOT_IO.exception('Running Error.')
e = sys.exc_info()[1]
ROOT_IO.exception('Running Error: %s' % e)
if DEBUG:
ROOT_IO.print('Trace ID: %s' % trace_id)
return ret
......@@ -163,7 +219,8 @@ class MajorCommand(BaseCommand):
commands = [x for x in self.commands.values() if not (hasattr(x, 'hidden') and x.hidden)]
commands.sort(key=lambda x: x.name)
for command in commands:
usage.append("%-14s %s\n" % (command.name, command.summary))
if command.hidden is False:
usage.append("%-14s %s\n" % (command.name, command.summary))
self.parser.set_usage('\n'.join(usage))
return super(MajorCommand, self)._mk_usage()
......@@ -188,6 +245,63 @@ class MajorCommand(BaseCommand):
self.commands[command.name] = command
class HiddenObdCommand(ObdCommand):
def __init__(self, name, summary):
super(HiddenObdCommand, self).__init__(name, summary)
self.hidden = self.dev_mode is False
class HiddenMajorCommand(MajorCommand, HiddenObdCommand):
pass
class DevCommand(HiddenObdCommand):
def do_command(self):
if self.hidden:
ROOT_IO.error('`%s` is a developer command. Please start the developer mode first.\nUse `obd devmode enable` to start the developer mode' % self.prev_cmd)
return False
return super(DevCommand, self).do_command()
class DevModeEnableCommand(HiddenObdCommand):
def __init__(self):
super(DevModeEnableCommand, self).__init__('enable', 'Enable Dev Mode')
def _do_command(self, obd):
from tool import FileUtil
if FileUtil.open(self.dev_mode_path, _type='w', stdio=obd.stdio):
obd.stdio.print("Dev Mode: ON")
return True
return False
class DevModeDisableCommand(HiddenObdCommand):
def __init__(self):
super(DevModeDisableCommand, self).__init__('disable', 'Disable Dev Mode')
def _do_command(self, obd):
from tool import FileUtil
if FileUtil.rm(self.dev_mode_path, stdio=obd.stdio):
obd.stdio.print("Dev Mode: OFF")
return True
return False
class DevModeMajorCommand(HiddenMajorCommand):
def __init__(self):
super(DevModeMajorCommand, self).__init__('devmode', 'Developer mode switch')
self.register_command(DevModeEnableCommand())
self.register_command(DevModeDisableCommand())
class MirrorCloneCommand(ObdCommand):
def __init__(self):
......@@ -246,9 +360,12 @@ class MirrorListCommand(ObdCommand):
self.show_pkg(name, pkgs)
return True
else:
repos = obd.mirror_manager.get_mirrors()
repos = obd.mirror_manager.get_mirrors(is_enabled=None)
for repo in repos:
if repo.section_name == name:
if not repo.enabled:
ROOT_IO.error('Mirror repository %s is disabled.' % name)
return False
pkgs = repo.get_all_pkg_info()
self.show_pkg(name, pkgs)
return True
......@@ -292,7 +409,7 @@ class MirrorEnableCommand(ObdCommand):
def _do_command(self, obd):
name = self.cmds[0]
obd.mirror_manager.set_remote_mirror_enabled(name, True)
return obd.mirror_manager.set_remote_mirror_enabled(name, True)
class MirrorDisableCommand(ObdCommand):
......@@ -302,7 +419,7 @@ class MirrorDisableCommand(ObdCommand):
def _do_command(self, obd):
name = self.cmds[0]
obd.mirror_manager.set_remote_mirror_enabled(name, False)
return obd.mirror_manager.set_remote_mirror_enabled(name, False)
class MirrorMajorCommand(MajorCommand):
......@@ -352,6 +469,35 @@ class ClusterMirrorCommand(ObdCommand):
return self
class ClusterConfigStyleChange(ClusterMirrorCommand):
def __init__(self):
super(ClusterConfigStyleChange, self).__init__('chst', 'Change Deployment Configuration Style')
self.parser.add_option('-c', '--components', type='string', help="List the components. Multiple components are separated with commas.")
self.parser.add_option('--style', type='string', help="Preferred Style")
def _do_command(self, obd):
if self.cmds:
return obd.change_deploy_config_style(self.cmds[0], self.opts)
else:
return self._show_help()
class ClusterCheckForOCPChange(ClusterMirrorCommand):
def __init__(self):
super(ClusterCheckForOCPChange, self).__init__('check4ocp', 'Check Whether OCP Can Take Over Configurations in Use')
self.parser.add_option('-c', '--components', type='string', help="List the components. Multiple components are separated with commas.")
self.parser.add_option('-V', '--version', type='string', help="OCP Version", default='3.1.1')
def _do_command(self, obd):
if self.cmds:
return obd.check_for_ocp(self.cmds[0], self.opts)
else:
return self._show_help()
class ClusterAutoDeployCommand(ClusterMirrorCommand):
def __init__(self):
......@@ -413,7 +559,7 @@ class ClusterStopCommand(ClusterMirrorCommand):
def __init__(self):
super(ClusterStopCommand, self).__init__('stop', 'Stop a started cluster.')
self.parser.add_option('-s', '--servers', type='string', help="List the started servers. Multiple servers are separated with commas.")
self.parser.add_option('-c', '--components', type='string', help="List the started components. Multiple components are separated with commas.")
self.parser.add_option('-c', '--components', type='string', help="List the stoped components. Multiple components are separated with commas.")
def _do_command(self, obd):
if self.cmds:
......@@ -452,7 +598,7 @@ class ClusterRestartCommand(ClusterMirrorCommand):
def __init__(self):
super(ClusterRestartCommand, self).__init__('restart', 'Restart a started cluster.')
self.parser.add_option('-s', '--servers', type='string', help="List the started servers. Multiple servers are separated with commas.")
self.parser.add_option('-c', '--components', type='string', help="List the started components. Multiple components are separated with commas.")
self.parser.add_option('-c', '--components', type='string', help="List the restarted components. Multiple components are separated with commas.")
self.parser.add_option('--with-parameter', '--wp', action='store_true', help='Restart with parameters.')
def _do_command(self, obd):
......@@ -588,6 +734,8 @@ class ClusterMajorCommand(MajorCommand):
def __init__(self):
super(ClusterMajorCommand, self).__init__('cluster', 'Deploy and manage a cluster.')
self.register_command(ClusterCheckForOCPChange())
self.register_command(ClusterConfigStyleChange())
self.register_command(ClusterAutoDeployCommand())
self.register_command(ClusterDeployCommand())
self.register_command(ClusterStartCommand())
......@@ -690,7 +838,7 @@ class TPCHCommand(TestMirrorCommand):
self.parser.add_option('--tenant', type='string', help='Tenant for a test. [test]', default='test')
self.parser.add_option('--database', type='string', help='Database for a test. [test]', default='test')
self.parser.add_option('--obclient-bin', type='string', help='OBClient bin path. [obclient]', default='obclient')
self.parser.add_option('--dbgen-bin', type='string', help='dbgen bin path. [/usr/local/tpc-h-tools/bin/dbgen]', default='/usr/local/tpc-h-tools/bin/dbgen')
self.parser.add_option('--dbgen-bin', type='string', help='dbgen bin path. [/usr/tpc-h-tools/tpc-h-tools/bin/dbgen]', default='/usr/tpc-h-tools/tpc-h-tools/bin/dbgen')
self.parser.add_option('-s', '--scale-factor', type='int', help='Set Scale Factor (SF) to <n>. [1] ', default=1)
self.parser.add_option('--tmp-dir', type='string', help='The temporary directory for executing TPC-H. [./tmp]', default='./tmp')
self.parser.add_option('--ddl-path', type='string', help='Directory for DDL files.')
......@@ -698,7 +846,7 @@ class TPCHCommand(TestMirrorCommand):
self.parser.add_option('--sql-path', type='string', help='Directory for SQL files.')
self.parser.add_option('--remote-tbl-dir', type='string', help='Directory for the tbl on target observers. Make sure that you have read and write access to the directory when you start observer.')
self.parser.add_option('--disable-transfer', '--dt', action='store_true', help='Disable the transfer. When enabled, OBD will use the tbl files under remote-tbl-dir instead of transferring local tbl files to remote remote-tbl-dir.')
self.parser.add_option('--dss-config', type='string', help='Directory for dists.dss. [/usr/local/tpc-h-tools]', default='/usr/local/tpc-h-tools/')
self.parser.add_option('--dss-config', type='string', help='Directory for dists.dss. [/usr/tpc-h-tools/tpc-h-tools]', default='/usr/tpc-h-tools/tpc-h-tools/')
self.parser.add_option('-O', '--optimization', type='int', help='Optimization level {0/1}. [1]', default=1)
self.parser.add_option('--test-only', action='store_true', help='Only testing SQLs are executed. No initialization is executed.')
......@@ -709,6 +857,37 @@ class TPCHCommand(TestMirrorCommand):
return self._show_help()
class TPCCCommand(TestMirrorCommand):
def __init__(self):
super(TPCCCommand, self).__init__('tpcc', 'Run a TPC-C test for a deployment.')
self.parser.add_option('--component', type='string', help='Components for a test.')
self.parser.add_option('--test-server', type='string', help='The server for a test. By default, the first root server in the component is the test server.')
self.parser.add_option('--user', type='string', help='Username for a test. [root]', default='root')
self.parser.add_option('--password', type='string', help='Password for a test.')
self.parser.add_option('--tenant', type='string', help='Tenant for a test. [test]', default='test')
self.parser.add_option('--database', type='string', help='Database for a test. [test]', default='test')
self.parser.add_option('--obclient-bin', type='string', help='OBClient bin path. [obclient]', default='obclient')
self.parser.add_option('--java-bin', type='string', help='Java bin path. [java]', default='java')
self.parser.add_option('--tmp-dir', type='string', help='The temporary directory for executing TPC-C. [./tmp]', default='./tmp')
self.parser.add_option('--bmsql-dir', type='string', help='The directory of BenchmarkSQL.')
self.parser.add_option('--bmsql-jar', type='string', help='BenchmarkSQL jar path.')
self.parser.add_option('--bmsql-libs', type='string', help='BenchmarkSQL libs path.')
self.parser.add_option('--bmsql-sql-dir', type='string', help='The directory of BenchmarkSQL sql scripts.')
self.parser.add_option('--warehouses', type='int', help='The number of warehouses.')
self.parser.add_option('--load-workers', type='int', help='The number of workers to load data.')
self.parser.add_option('--terminals', type='int', help='The number of terminals.')
self.parser.add_option('--run-mins', type='int', help='To run for specified minutes.', default=10)
self.parser.add_option('--test-only', action='store_true', help='Only testing SQLs are executed. No initialization is executed.')
self.parser.add_option('-O', '--optimization', type='int', help='Optimization level {0/1/2}. [1] 0 - No optimization. 1 - Optimize some of the parameters which do not need to restart servers. 2 - Optimize all the parameters and maybe RESTART SERVERS for better performance.', default=1)
def _do_command(self, obd):
if self.cmds:
return obd.tpcc(self.cmds[0], self.opts)
else:
return self._show_help()
class TestMajorCommand(MajorCommand):
def __init__(self):
......@@ -716,6 +895,7 @@ class TestMajorCommand(MajorCommand):
self.register_command(MySQLTestCommand())
self.register_command(SysBenchCommand())
self.register_command(TPCHCommand())
# self.register_command(TPCCCommand())
class BenchMajorCommand(MajorCommand):
......@@ -743,6 +923,7 @@ class MainCommand(MajorCommand):
def __init__(self):
super(MainCommand, self).__init__('obd', '')
self.register_command(DevModeMajorCommand())
self.register_command(MirrorMajorCommand())
self.register_command(ClusterMajorCommand())
self.register_command(RepositoryMajorCommand())
......
此差异已折叠。
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
from enum import Enum
class LockError(Exception):
pass
class OBDErrorCode(object):
def __init__(self, code, msg):
self.code = code
self.msg = msg
self._str_ = ('OBD-%04d: ' % code) + msg
def format(self, *args, **kwargs):
return self._str_.format(*args, **kwargs)
def __str__(self):
return self._str_
class InitDirFailedErrorMessage(object):
PATH_ONLY = ': {path}.'
NOT_EMPTY = ': {path} is not empty.'
CREATE_FAILED = ': create {path} failed.'
PERMISSION_DENIED = ': {path} permission denied .'
DOC_LINK_MSG = 'See https://open.oceanbase.com/docs/obd-cn/V1.2.0/10000000000017237.'
EC_CONFIG_CONFLICT_PORT = OBDErrorCode(1000, 'Configuration conflict {server1}:{port} port is used for {server2}\'s {key}')
EC_CONFLICT_PORT = OBDErrorCode(1001, '{server}:{port} port is already used')
EC_FAIL_TO_INIT_PATH = OBDErrorCode(1002, 'Fail to init {server} {key}{msg}')
EC_CLEAN_PATH_FAILED = OBDErrorCode(1003, 'Fail to clean {server}:{path}')
EC_CONFIG_CONFLICT_DIR = OBDErrorCode(1004, 'Configuration conflict {server1}: {path} is used for {server2}\'s {key}')
EC_SOME_SERVER_STOPED = OBDErrorCode(1005, 'Some of the servers in the cluster have been stopped')
EC_OBSERVER_NOT_ENOUGH_MEMORY = OBDErrorCode(2000, '({ip}) not enough memory. (Free: {free}, Need: {need})')
EC_OBSERVER_CAN_NOT_MIGRATE_IN = OBDErrorCode(2001, 'server can not migrate in')
EC_OBSERVER_FAIL_TO_START = OBDErrorCode(2002, 'Failed to start {server} observer')
EC_OBSERVER_NOT_ENOUGH_DISK_4_CLOG = OBDErrorCode(2003, '({ip}) {path} not enough disk space for clog. Use redo_dir to set other disk for clog, or reduce the value of datafile_size')
EC_OBSERVER_INVALID_MODFILY_GLOBAL_KEY = OBDErrorCode(2004, 'Invalid: {key} is not a single server configuration item')
EC_MYSQLTEST_PARSE_CMD_FAILED = OBDErrorCode(3000, 'parse cmd failed: {path}')
EC_MYSQLTEST_FAILE_NOT_FOUND = OBDErrorCode(3001, '{file} not found in {path}')
EC_OBAGENT_RELOAD_FAILED = OBDErrorCode(4000, 'Fail to reload {server}')
EC_OBAGENT_SEND_CONFIG_FAILED = OBDErrorCode(4001, 'Fail to send config file to {server}')
......@@ -26,6 +26,7 @@ from enum import Enum
from tool import FileUtil
from _manager import Manager
from _errno import LockError
class LockType(Enum):
......@@ -59,11 +60,17 @@ class MixLock(object):
def _ex_lock(self):
if self.lock_obj:
FileUtil.exclusive_lock_obj(self.lock_obj, stdio=self.stdio)
try:
FileUtil.exclusive_lock_obj(self.lock_obj, stdio=self.stdio)
except Exception as e:
raise LockError(e)
def _sh_lock(self):
if self.lock_obj:
FileUtil.share_lock_obj(self.lock_obj, stdio=self.stdio)
try:
FileUtil.share_lock_obj(self.lock_obj, stdio=self.stdio)
except Exception as e:
raise LockError(e)
def sh_lock(self):
if not self.locked:
......@@ -80,7 +87,7 @@ class MixLock(object):
if self._sh_cnt:
self.lock_escalation(LockManager.TRY_TIMES)
else:
raise e
raise LockError(e)
self._ex_cnt += 1
self.stdio and getattr(self.stdio, 'verbose', print)('exclusive lock `%s`, count %s' % (self.path, self._ex_cnt))
return True
......@@ -92,7 +99,7 @@ class MixLock(object):
self.stdio and getattr(self.stdio, 'stop_loading', print)('succeed')
except Exception as e:
self.stdio and getattr(self.stdio, 'stop_loading', print)('fail')
raise e
raise LockError(e)
def _lock_escalation(self, try_times):
stdio = self.stdio
......@@ -107,13 +114,13 @@ class MixLock(object):
break
except KeyboardInterrupt:
self.stdio = stdio
raise IOError('fail to get lock')
raise Exception('fail to get lock')
except Exception as e:
if try_times:
time.sleep(LockManager.TRY_INTERVAL)
else:
self.stdio = stdio
raise e
raise LockError(e)
self.stdio = stdio
def _sh_unlock(self):
......@@ -209,12 +216,19 @@ class LockManager(Manager):
def __del__(self):
for lock in self.locks[::-1]:
lock.unlock()
self.locks = None
def _get_mix_lock(self, path):
if path not in self.LOCKS:
self.LOCKS[path] = MixLock(path, stdio=self.stdio)
return self.LOCKS[path]
@classmethod
def shutdown(cls):
for path in cls.LOCKS:
cls.LOCKS[path] = None
cls.LOCKS = None
def _lock(self, path, clz):
mix_lock = self._get_mix_lock(path)
lock = clz(mix_lock)
......@@ -248,3 +262,7 @@ class LockManager(Manager):
def deploy_sh_lock(self, deploy_name):
return self._sh_lock(self._deploy_lock_fp(deploy_name))
import atexit
atexit.register(LockManager.shutdown)
\ No newline at end of file
......@@ -534,7 +534,7 @@ class RemoteMirrorRepository(MirrorRepository):
if release and info.release != release:
raise Exception ('break')
return [0 ,]
c = [len(name) / len(info.name), info]
return c
......@@ -763,7 +763,7 @@ class LocalMirrorRepository(MirrorRepository):
return [0 ,]
if release and info.release != release:
return [0 ,]
c = [len(name) / len(info.name), info]
return c
......@@ -1008,8 +1008,8 @@ class MirrorRepositoryManager(Manager):
enabled_str = '1' if enabled else '0'
self.stdio and getattr(self.stdio, 'start_loading')('%s %s' % (op, section_name))
if section_name == 'local':
self.stdio and getattr(self.stdio, 'error', print)('%s is local mirror repository.' % (section_name))
return
self.stdio and getattr(self.stdio, 'error', print)('Local mirror repository CANNOT BE %sD.' % op.upper())
return False
if section_name == 'remote':
self._scan_repo_configs()
for repo_config in self._cache_path_repo_config.values():
......@@ -1024,21 +1024,22 @@ class MirrorRepositoryManager(Manager):
mirror_section.meta_data['enabled'] = enabled_str
mirror_section.meta_data['repo_age'] = repo_age
self.stdio and getattr(self.stdio, 'stop_loading')('succeed')
return True
else:
mirror_section = self._get_section(section_name)
if not mirror_section:
self.stdio and getattr(self.stdio, 'error', print)('%s not found.' % (section_name))
self.stdio and getattr(self.stdio, 'stop_loading')('fail')
return
return False
if mirror_section.is_enabled == enabled:
self.stdio and getattr(self.stdio, 'print', print)('%s is already %sd' % (section_name, op))
self.stdio and getattr(self.stdio, 'print', print)('%s is already %sd' % (section_name, op.lower()))
self.stdio and getattr(self.stdio, 'stop_loading')('succeed')
return
return True
repo_config = self._get_repo_config_by_section(section_name)
if not repo_config:
self.stdio and getattr(self.stdio, 'error', print)('%s not found.' % (section_name))
self.stdio and getattr(self.stdio, 'stop_loading')('fail')
return
return False
repo_config.parser.set(section_name, 'enabled', enabled_str)
with FileUtil.open(repo_config.path, 'w', stdio=self.stdio) as confpp_obj:
......@@ -1049,4 +1050,5 @@ class MirrorRepositoryManager(Manager):
repo_config.repo_age = repo_age
mirror_section.meta_data['enabled'] = enabled_str
mirror_section.meta_data['repo_age'] = repo_age
self.stdio and getattr(self.stdio, 'stop_loading')('succeed')
\ No newline at end of file
self.stdio and getattr(self.stdio, 'stop_loading')('succeed')
return True
......@@ -36,6 +36,7 @@ if sys.version_info.major == 2:
from backports import lzma
setattr(sys.modules['rpmfile'], 'lzma', getattr(sys.modules[__name__], 'lzma'))
class Version(str):
def __init__(self, bytes_or_buffer, encoding=None, errors=None):
......
......@@ -21,6 +21,7 @@
from __future__ import absolute_import, division, print_function
import os
import signal
import sys
import traceback
......@@ -28,7 +29,7 @@ from enum import Enum
from halo import Halo, cursor
from colorama import Fore
from prettytable import PrettyTable
from progressbar import Bar, ETA, FileTransferSpeed, Percentage, ProgressBar
from progressbar import AdaptiveETA, Bar, SimpleProgress, ETA, FileTransferSpeed, Percentage, ProgressBar
if sys.version_info.major == 3:
......@@ -145,11 +146,19 @@ class IOHalo(Halo):
class IOProgressBar(ProgressBar):
def __init__(self, maxval=None, text='', term_width=None, poll=1, left_justify=True, stream=None):
widgets=['%s: ' % text, Percentage(), ' ',
Bar(marker='#', left='[', right=']'),
' ', ETA(), ' ', FileTransferSpeed()]
super(IOProgressBar, self).__init__(maxval=maxval, widgets=widgets, term_width=term_width, poll=poll, left_justify=left_justify, fd=stream)
@staticmethod
def _get_widgets(widget_type, text):
if widget_type == 'download':
return ['%s: ' % text, Percentage(), ' ', Bar(marker='#', left='[', right=']'), ' ', ETA(), ' ', FileTransferSpeed()]
elif widget_type == 'timer':
return ['%s: ' % text, Percentage(), ' ', Bar(marker='#', left='[', right=']'), ' ', AdaptiveETA()]
elif widget_type == 'simple_progress':
return ['%s: (' % text, SimpleProgress(sep='/'), ') ', Bar(marker='#', left='[', right=']')]
else:
return ['%s: ' % text, Percentage(), ' ', Bar(marker='#', left='[', right=']')]
def __init__(self, maxval=None, text='', term_width=None, poll=1, left_justify=True, stream=None, widget_type='download'):
super(IOProgressBar, self).__init__(maxval=maxval, widgets=self._get_widgets(widget_type, text), term_width=term_width, poll=poll, left_justify=left_justify, fd=stream)
def start(self):
self._hide_cursor()
......@@ -159,9 +168,20 @@ class IOProgressBar(ProgressBar):
return super(IOProgressBar, self).update(value=value)
def finish(self):
if self.finished:
return
self._show_cursor()
return super(IOProgressBar, self).finish()
def interrupt(self):
if self.finished:
return
self._show_cursor()
self.finished = True
self.fd.write('\n')
if self.signal_set:
signal.signal(signal.SIGWINCH, signal.SIG_DFL)
def _need_update(self):
return (self.currval == self.maxval or self.currval == 0 or getattr(self.fd, 'isatty', lambda : False)()) \
and super(IOProgressBar, self)._need_update()
......@@ -356,10 +376,17 @@ class IO(object):
else:
return self._stop_sync_obj(IOHalo, 'stop')
def start_progressbar(self, text, maxval):
def update_loading_text(self, text):
if not isinstance(self.sync_obj, IOHalo):
return False
self.log(MsgLevel.VERBOSE, text)
self.sync_obj.text = text
return self.sync_obj
def start_progressbar(self, text, maxval, widget_type='download'):
if self.sync_obj:
return False
self.sync_obj = self._start_sync_obj(IOProgressBar, lambda x: x.finish_progressbar(), text=text, maxval=maxval)
self.sync_obj = self._start_sync_obj(IOProgressBar, lambda x: x.finish_progressbar(), text=text, maxval=maxval, widget_type=widget_type)
if self.sync_obj:
self.log(MsgLevel.INFO, text)
return self.sync_obj.start()
......@@ -373,6 +400,11 @@ class IO(object):
if not isinstance(self.sync_obj, IOProgressBar):
return False
return self._stop_sync_obj(IOProgressBar, 'finish')
def interrupt_progressbar(self):
if not isinstance(self.sync_obj, IOProgressBar):
return False
return self._stop_sync_obj(IOProgressBar, 'interrupt')
def sub_io(self, pid=None, msg_lv=None):
if not pid:
......@@ -381,7 +413,7 @@ class IO(object):
msg_lv = self.msg_lv
key = "%s-%s" % (pid, msg_lv)
if key not in self.sub_ios:
self.sub_ios[key] = IO(
self.sub_ios[key] = self.__class__(
self.level + 1,
msg_lv=msg_lv,
trace_logger=self.trace_logger,
......
oceanbase
\ No newline at end of file
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import re
from copy import deepcopy
from collections import OrderedDict
from tool import ConfigUtil
from _deploy import (
InnerConfigItem,
ServerConfigFlyweightFactory,
ClusterConfig,
ConfigParser,
CommentedMap
)
class ClusterConfigParser(ConfigParser):
STYLE = 'cluster'
INNER_CONFIG_MAP = {
'$_zone_idc': 'idc'
}
@classmethod
def get_server_src_conf(cls, cluster_config, component_config, server):
server_config = cluster_config.get_server_conf(server)
zones = component_config['zones']
zone_name = server_config.get('zone', list(zones.keys())[0])
zone = zones[zone_name]
if 'config' not in zone:
zone['config'] = {}
zone_config = zone['config']
if server.name not in zone_config:
zone_config[server.name] = {}
return zone_config[server.name]
@classmethod
def _to_cluster_config(cls, component_name, conf):
servers = OrderedDict()
zones = conf.get('zones', {})
for zone_name in zones:
zone = zones[zone_name]
zone_servers = zone.get('servers', [])
zone_configs = zone.get('config', {})
zone_global_config = zone_configs.get('global', {})
for server in zone_servers:
if isinstance(server, dict):
ip = ConfigUtil.get_value_from_dict(server, 'ip', transform_func=str)
name = ConfigUtil.get_value_from_dict(server, 'name', transform_func=str)
else:
ip = server
name = None
if not re.match('^\d{1,3}(\\.\d{1,3}){3}$', ip):
continue
server = ServerConfigFlyweightFactory.get_instance(ip, name)
if server not in servers:
server_config = deepcopy(zone_global_config)
if server.name in zone_configs:
server_config.update(zone_configs[server.name])
if 'idc' in zone:
key = '$_zone_idc'
if key in server_config:
del server_config[key]
server_config[InnerConfigItem(key)] = str(zone['idc'])
server_config['zone'] = zone_name
servers[server] = server_config
cluster_conf = ClusterConfig(
servers.keys(),
component_name,
ConfigUtil.get_value_from_dict(conf, 'version', None, str),
ConfigUtil.get_value_from_dict(conf, 'tag', None, str),
ConfigUtil.get_value_from_dict(conf, 'package_hash', None, str)
)
global_config = {}
if 'id' in conf:
global_config['cluster_id'] = int(conf['id'])
if 'name' in conf:
global_config['appname'] = str(conf['name'])
if 'config' in conf:
global_config.update(conf['config'])
cluster_conf.set_global_conf(global_config)
for server in servers:
cluster_conf.add_server_conf(server, servers[server])
return cluster_conf
@classmethod
def extract_inner_config(cls, cluster_config, config):
inner_config = cluster_config.get_inner_config()
for server in inner_config:
server_config = inner_config[server]
keys = list(server_config.keys())
for key in keys:
if key in cls.INNER_CONFIG_MAP:
del server_config[key]
for server in cluster_config.servers:
if server.name not in inner_config:
inner_config[server.name] = {}
server_config = cluster_config.get_server_conf(server)
keys = list(server_config.keys())
for key in keys:
if cls._is_inner_item(key) and key not in cls.INNER_CONFIG_MAP:
inner_config[server.name] = server_config[key]
del server_config[key]
return inner_config
@classmethod
def _from_cluster_config(cls, conf, cluster_config):
global_config_items = {}
zones_config = {}
for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server)
server_config_with_default = cluster_config.get_server_conf_with_default(server)
zone_name = server_config_with_default.get('zone', 'zone1')
if zone_name not in zones_config:
zones_config[zone_name] = {
'servers': OrderedDict(),
'config': OrderedDict(),
}
zone_servers = zones_config[zone_name]['servers']
zone_config_items = zones_config[zone_name]['config']
zone_servers[server] = server_config
for key in server_config:
if key in zone_config_items:
if zone_config_items[key]['value'] == server_config[key]:
zone_config_items[key]['count'] += 1
else:
zone_config_items[key] = {
'value': server_config[key],
'count': 1
}
zones = CommentedMap()
server_num = len(cluster_config.servers)
for zone_name in zones_config:
zones[zone_name] = CommentedMap()
zone_global_config = {}
zone_servers = zones_config[zone_name]['servers']
zone_config_items = zones_config[zone_name]['config']
zone_server_num = len(zone_servers)
zone_global_config_items = {}
for key in zone_config_items:
item = zone_config_items[key]
clear_item = isinstance(key, InnerConfigItem)
if item['count'] == zone_server_num:
zone_global_config_items[key] = item['value']
if key in global_config_items:
if global_config_items[key]['value'] == zone_global_config_items[key]:
global_config_items[key]['count'] += 1
else:
global_config_items[key] = {
'value': zone_global_config_items[key],
'count': 1
}
clear_item = True
if clear_item:
for server in zone_servers:
del zone_servers[server][key]
for key in zone_global_config_items:
if cls._is_inner_item(key):
if key in cls.INNER_CONFIG_MAP:
zones[zone_name][cls.INNER_CONFIG_MAP[key]] = zone_global_config_items[key]
else:
zone_global_config[key] = zone_global_config_items[key]
zones[zone_name]['servers'] = []
zone_config = {}
if 'zone' in zone_global_config:
del zone_global_config['zone']
if zone_global_config:
zone_config['global'] = zone_global_config
for server in zone_servers:
if server.name == server.ip:
zones[zone_name]['servers'].append(server.name)
else:
zones[zone_name]['servers'].append({'name': server.name, 'ip': server.ip})
if zone_servers[server]:
zone_config[server.name] = zone_servers[server]
if zone_config:
zones[zone_name]['config'] = zone_config
global_config = CommentedMap()
zone_num = len(zones)
del global_config_items['zone']
for key in global_config_items:
item = global_config_items[key]
if item['count'] == zone_num:
global_config[key] = item['value']
for zone_name in zones:
del zones[zone_name]['config']['global'][key]
for zone_name in zones:
if 'config' in zones[zone_name]:
configs = zones[zone_name]['config']
keys = list(configs.keys())
for key in keys:
if not configs[key]:
del configs[key]
if not configs:
del zones[zone_name]['config']
if 'cluster_id' in global_config:
conf['id'] = global_config['cluster_id']
del global_config['cluster_id']
if 'appname' in global_config:
conf['name'] = global_config['appname']
del global_config['appname']
if global_config:
conf['config'] = global_config
conf['zones'] = zones
return conf
\ No newline at end of file
此差异已折叠。
......@@ -24,6 +24,8 @@ import re
import os
from ssh import LocalClient
from _errno import EC_MYSQLTEST_FAILE_NOT_FOUND, EC_MYSQLTEST_PARSE_CMD_FAILED
def parse_size(size):
_bytes = 0
......@@ -69,7 +71,7 @@ def init(plugin_context, env, *args, **kwargs):
def exec_sql(cmd):
ret = re.match('(.*\.sql)(?:\|([^\|]*))?(?:\|([^\|]*))?', cmd)
if not ret:
stdio.error('parse cmd failed: %s' % cmd)
stdio.error(EC_MYSQLTEST_PARSE_CMD_FAILED.format(path=cmd))
return False
cmd = ret.groups()
sql_file_path1 = os.path.join(init_sql_dir, cmd[0])
......@@ -79,7 +81,7 @@ def init(plugin_context, env, *args, **kwargs):
elif os.path.isfile(sql_file_path2):
sql_file_path = sql_file_path2
else:
stdio.error('%s not found in [%s, %s]' % (cmd[0], init_sql_dir, plugin_init_sql_dir))
stdio.error(EC_MYSQLTEST_FAILE_NOT_FOUND.format(file=cmd[0], path='[%s, %s]' % (init_sql_dir, plugin_init_sql_dir)))
return False
exec_sql_cmd = exec_sql_temp % (cmd[1] if cmd[1] else 'root', cmd[2] if cmd[2] else 'oceanbase', sql_file_path)
ret = LocalClient.execute_command(exec_sql_cmd, stdio=stdio)
......
create resource unit box1 max_cpu 2, max_memory 1073741824, max_iops 128, max_disk_size '5G', max_session_num 64, MIN_CPU=1, MIN_MEMORY=1073741824, MIN_IOPS=128;
create resource pool pool1 unit = 'box1', unit_num = 1;
create tenant ora_tt replica_num = 1, resource_pool_list=('pool1') set ob_tcp_invited_nodes='%', ob_compatibility_mode='oracle';
alter tenant ora_tt set variables autocommit='on';
alter tenant ora_tt set variables nls_date_format='YYYY-MM-DD HH24:MI:SS';
alter tenant ora_tt set variables nls_timestamp_format='YYYY-MM-DD HH24:MI:SS.FF';
alter tenant ora_tt set variables nls_timestamp_tz_format='YYYY-MM-DD HH24:MI:SS.FF TZR TZD';
\ No newline at end of file
system sleep 5;
alter system set balancer_idle_time = '60s';
create user 'admin' IDENTIFIED BY 'admin';
use oceanbase;
create database if not exists test;
use test;
grant all on *.* to 'admin' WITH GRANT OPTION;
set global ob_enable_jit='OFF';
alter system set large_query_threshold='1s';
alter system set syslog_level='info';
alter system set syslog_io_bandwidth_limit='30M';
alter system set trx_try_wait_lock_timeout='0';
alter system set zone_merge_concurrency=0;
alter system set merger_completion_percentage=100;
alter system set trace_log_slow_query_watermark='500ms';
alter system set minor_freeze_times=30;
alter system set clog_sync_time_warn_threshold = '1000ms';
alter system set trace_log_slow_query_watermark = '10s';
alter system set enable_sql_operator_dump = 'false';
alter system set rpc_timeout=1000000000;
create resource unit tpch_box1 min_memory '100g', max_memory '100g', max_disk_size '1000g', max_session_num 64, min_cpu=9, max_cpu=9, max_iops 128, min_iops=128;
create resource pool tpch_pool1 unit = 'tpch_box1', unit_num = 1, zone_list = ('z1', 'z2', 'z3');
create tenant oracle replica_num = 3, resource_pool_list=('tpch_pool1') set ob_tcp_invited_nodes='%', ob_compatibility_mode='oracle';
alter tenant oracle set variables autocommit='on';
alter tenant oracle set variables nls_date_format='yyyy-mm-dd hh24:mi:ss';
alter tenant oracle set variables nls_timestamp_format='yyyy-mm-dd hh24:mi:ss.ff';
alter tenant oracle set variables nls_timestamp_tz_format='yyyy-mm-dd hh24:mi:ss.ff tzr tzd';
alter tenant oracle set variables ob_query_timeout=7200000000;
alter tenant oracle set variables ob_trx_timeout=7200000000;
alter tenant oracle set variables max_allowed_packet=67108864;
alter tenant oracle set variables ob_enable_jit='OFF';
alter tenant oracle set variables ob_sql_work_area_percentage=80;
alter tenant oracle set variables parallel_max_servers=512;
alter tenant oracle set variables parallel_servers_target=512;
select count(*) from oceanbase.__all_server group by zone limit 1 into @num;
set @sql_text = concat('alter resource pool tpch_pool1', ' unit_num = ', @num);
prepare stmt from @sql_text;
execute stmt;
deallocate prepare stmt;
......@@ -20,17 +20,19 @@
from __future__ import absolute_import, division, print_function
from _errno import EC_CLEAN_PATH_FAILED
global_ret = True
def destroy(plugin_context, *args, **kwargs):
def clean(server, path):
client = clients[server]
ret = client.execute_command('rm -fr %s/*' % (path))
ret = client.execute_command('rm -fr %s/' % (path))
if not ret:
global global_ret
global_ret = False
stdio.warn('fail to clean %s:%s' % (server, path))
stdio.warn(EC_CLEAN_PATH_FAILED.format(server=server, path=path))
else:
stdio.verbose('%s:%s cleaned' % (server, path))
cluster_config = plugin_context.cluster_config
......
......@@ -46,7 +46,7 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
if comp in depends:
have_depend = True
for server in cluster_config.servers:
obs_config = cluster_config.get_depled_config(comp, server)
obs_config = cluster_config.get_depend_config(comp, server)
if obs_config is not None:
server_depends[server].append(comp)
......
......@@ -20,6 +20,8 @@
from __future__ import absolute_import, division, print_function
from _errno import EC_FAIL_TO_INIT_PATH, InitDirFailedErrorMessage
def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
cluster_config = plugin_context.cluster_config
......@@ -32,25 +34,25 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
server_config = cluster_config.get_server_conf(server)
client = clients[server]
home_path = server_config['home_path']
remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
remote_home_path = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
stdio.verbose('%s init cluster work home', server)
if force:
ret = client.execute_command('rm -fr %s/*' % home_path)
ret = client.execute_command('rm -fr %s' % home_path)
if not ret:
global_ret = False
stdio.error('failed to initialize %s home path: %s' % (server, ret.stderr))
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=ret.stderr))
continue
else:
if client.execute_command('mkdir -p %s' % home_path):
ret = client.execute_command('ls %s' % (home_path))
if not ret or ret.stdout.strip():
global_ret = False
stdio.error('fail to init %s home path: %s is not empty' % (server, home_path))
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.NOT_EMPTY.format(path=home_path)))
continue
else:
global_ret = False
stdio.error('fail to init %s home path: create %s failed' % (server, home_path))
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.CREATE_FAILED.format(path=home_path)))
continue
if not (client.execute_command("bash -c 'mkdir -p %s/{run,bin,lib,conf,log}'" % (home_path)) \
......@@ -58,7 +60,7 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
and client.execute_command("if [ -d %s/bin ]; then ln -fs %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -fs %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))):
global_ret = False
stdio.error('fail to init %s home path', server)
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.PATH_ONLY.format(path=home_path)))
if global_ret:
stdio.stop_loading('succeed')
......
......@@ -26,6 +26,8 @@ from copy import deepcopy
from glob import glob
from tool import YamlLoader
from _errno import *
def reload(plugin_context, repository_dir, new_cluster_config, *args, **kwargs):
stdio = plugin_context.stdio
......@@ -46,8 +48,8 @@ def reload(plugin_context, repository_dir, new_cluster_config, *args, **kwargs):
for comp in ['oceanbase', 'oceanbase-ce']:
if comp in cluster_config.depends:
root_servers = {}
ob_config = cluster_config.get_depled_config(comp)
new_ob_config = new_cluster_config.get_depled_config(comp)
ob_config = cluster_config.get_depend_config(comp)
new_ob_config = new_cluster_config.get_depend_config(comp)
ob_config = {} if ob_config is None else ob_config
new_ob_config = {} if new_ob_config is None else new_ob_config
for key in config_map:
......@@ -66,6 +68,7 @@ def reload(plugin_context, repository_dir, new_cluster_config, *args, **kwargs):
config_kv[key] = key
global_ret = True
stdio.start_load('Reload obagent')
for server in servers:
change_conf = deepcopy(global_change_conf)
client = clients[server]
......@@ -79,6 +82,18 @@ def reload(plugin_context, repository_dir, new_cluster_config, *args, **kwargs):
if key not in config_kv:
continue
if key not in config or config[key] != new_config[key]:
item = cluster_config.get_temp_conf_item(key)
if item:
if item.need_redeploy or item.need_restart:
stdio.verbose('%s can not be reload' % key)
global_ret = False
continue
try:
item.modify_limit(config.get(key), new_config.get(key))
except Exception as e:
stdio.verbose('%s: %s' % (server, str(e)))
global_ret = False
continue
change_conf[config_kv[key]] = new_config[key]
if change_conf:
......@@ -93,6 +108,11 @@ def reload(plugin_context, repository_dir, new_cluster_config, *args, **kwargs):
)
if not client.execute_command(cmd):
global_ret = False
stdio.error('fail to reload %s' % server)
stdio.error(EC_OBAGENT_RELOAD_FAILED.format(server=server))
return plugin_context.return_true() if global_ret else None
if global_ret:
stdio.stop_load('succeed')
return plugin_context.return_true()
else:
stdio.stop_load('fail')
return
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import os
class Restart(object):
def __init__(self, plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config=None, new_clients=None):
self.local_home_path = local_home_path
self.plugin_context = plugin_context
self.components = plugin_context.components
self.clients = plugin_context.clients
self.cluster_config = plugin_context.cluster_config
self.stdio = plugin_context.stdio
self.repository = repository
self.start_plugin = start_plugin
self.reload_plugin = reload_plugin
self.connect_plugin = connect_plugin
self.display_plugin = display_plugin
self.stop_plugin = stop_plugin
self.new_clients = new_clients
self.new_cluster_config = new_cluster_config
self.sub_io = self.stdio.sub_io()
def dir_read_check(self, client, path):
if not client.execute_command('cd %s' % path):
dirpath, name = os.path.split(path)
return self.dir_read_check(client, dirpath) and client.execute_command('sudo chmod +1 %s' % path)
return True
def restart(self):
clients = self.clients
self.stdio.verbose('Call %s for %s' % (self.stop_plugin, self.repository))
if not self.stop_plugin(self.components, clients, self.cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io):
self.stdio.stop_loading('stop_loading', 'fail')
return False
if self.new_clients:
self.stdio.verbose('use new clients')
for server in self.cluster_config.servers:
new_client = self.new_clients[server]
server_config = self.cluster_config.get_server_conf(server)
home_path = server_config['home_path']
if not new_client.execute_command('sudo chown -R %s: %s' % (new_client.config.username, home_path)):
self.stdio.stop_loading('stop_loading', 'fail')
return False
self.dir_read_check(new_client, home_path)
clients = self.new_clients
cluster_config = self.new_cluster_config if self.new_cluster_config else self.cluster_config
self.stdio.verbose('Call %s for %s' % (self.start_plugin, self.repository))
if not self.start_plugin(self.components, clients, cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io, local_home_path=self.local_home_path, repository_dir=self.repository.repository_dir):
self.rollback()
self.stdio.stop_loading('stop_loading', 'fail')
return False
return self.display_plugin(self.components, clients, cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io, cursor=None)
def rollback(self):
if self.new_clients:
self.stop_plugin(self.components, self.new_clients, self.new_cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io)
for server in self.cluster_config.servers:
client = self.clients[server]
new_client = self.new_clients[server]
server_config = self.cluster_config.get_server_conf(server)
home_path = server_config['home_path']
new_client.execute_command('sudo chown -R %s: %s' % (client.config.username, home_path))
def restart(plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config=None, new_clients=None, rollback=False, *args, **kwargs):
task = Restart(plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config, new_clients)
call = task.rollback if rollback else task.restart
if call():
plugin_context.return_true()
......@@ -34,6 +34,7 @@ from Crypto import Random
from Crypto.Cipher import AES
from tool import YamlLoader
from _errno import *
stdio = None
......@@ -155,7 +156,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
"zone_name": "zone",
}
stdio.start_loading('Start obproxy')
stdio.start_loading('Start obagent')
for server in cluster_config.servers:
client = clients[server]
server_config = cluster_config.get_server_conf(server)
......@@ -203,7 +204,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if use_parameter:
for comp in ['oceanbase', 'oceanbase-ce']:
obs_config = cluster_config.get_depled_config(comp, server)
obs_config = cluster_config.get_depend_config(comp, server)
if obs_config is not None:
break
......@@ -244,7 +245,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
tf.write(text)
tf.flush()
if not client.put_file(tf.name, path.replace(repository_dir, home_path)):
stdio.error('Fail to send config file to %s' % server)
stdio.error(EC_OBAGENT_SEND_CONFIG_FAILED.format(server=server))
stdio.stop_loading('fail')
return
......@@ -256,7 +257,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
else:
ret = client.put_file(path, path.replace(repository_dir, home_path))
if not ret:
stdio.error('Fail to send config file to %s' % server)
stdio.error(EC_OBAGENT_SEND_CONFIG_FAILED.format(server=server))
stdio.stop_loading('fail')
return
......@@ -284,7 +285,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
with tempfile.NamedTemporaryFile(suffix=".yaml") as tf:
yaml.dump(config, tf)
if not client.put_file(tf.name, os.path.join(home_path, 'conf/monagent.yaml')):
stdio.error('Fail to send config file to %s' % server)
stdio.error(EC_OBAGENT_SEND_CONFIG_FAILED.format(server=server))
stdio.stop_loading('fail')
return
......
......@@ -20,6 +20,8 @@
from __future__ import absolute_import, division, print_function
from _errno import EC_CONFIG_CONFLICT_PORT
stdio = None
success = True
......@@ -74,7 +76,7 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs):
port = int(server_config[key])
alert_f = alert if key == 'pprof_port' else critical
if port in ports:
alert_f('Configuration conflict %s: %s port is used for %s\'s %s' % (server, port, ports[port]['server'], ports[port]['key']))
alert_f(EC_CONFIG_CONFLICT_PORT.format(server1=server, port=port, server2=ports[port]['server'], key=ports[port]['key']))
continue
ports[port] = {
'server': server,
......
......@@ -42,7 +42,7 @@ def upgrade(plugin_context, search_py_script_plugin, apply_param_plugin, *args,
client = clients[server]
server_config = cluster_config.get_server_conf(server)
home_path = server_config['home_path']
remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
remote_home_path = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
client.execute_command("bash -c 'mkdir -p %s/{bin,lib}'" % (home_path))
client.execute_command("ln -fs %s/bin/* %s/bin" % (remote_repository_dir, home_path))
......
......@@ -34,6 +34,7 @@ from Crypto import Random
from Crypto.Cipher import AES
from tool import YamlLoader
from _errno import *
stdio = None
......@@ -156,7 +157,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
"ob_install_path": "home_path"
}
stdio.start_loading('Start obproxy')
stdio.start_loading('Start obagent')
for server in cluster_config.servers:
client = clients[server]
server_config = cluster_config.get_server_conf(server)
......@@ -198,7 +199,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
continue
for comp in ['oceanbase', 'oceanbase-ce']:
obs_config = cluster_config.get_depled_config(comp, server)
obs_config = cluster_config.get_depend_config(comp, server)
if obs_config is not None:
break
......@@ -239,7 +240,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
tf.write(text)
tf.flush()
if not client.put_file(tf.name, path.replace(repository_dir, home_path)):
stdio.error('Fail to send config file to %s' % server)
stdio.error(EC_OBAGENT_SEND_CONFIG_FAILED.format(server=server))
stdio.stop_loading('fail')
return
......@@ -251,7 +252,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
else:
ret = client.put_file(path, path.replace(repository_dir, home_path))
if not ret:
stdio.error('Fail to send config file to %s' % server)
stdio.error(EC_OBAGENT_SEND_CONFIG_FAILED.format(server=server))
stdio.stop_loading('fail')
return
......@@ -279,7 +280,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
with tempfile.NamedTemporaryFile(suffix=".yaml") as tf:
yaml.dump(config, tf)
if not client.put_file(tf.name, os.path.join(home_path, 'conf/monagent.yaml')):
stdio.error('Fail to send config file to %s' % server)
stdio.error(EC_OBAGENT_SEND_CONFIG_FAILED.format(server=server))
stdio.stop_loading('fail')
return
......
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
from _errno import EC_CONFIG_CONFLICT_PORT
stdio = None
success = True
def get_port_socket_inode(client, port):
port = hex(port)[2:].zfill(4).upper()
cmd = "bash -c 'cat /proc/net/{tcp,udp}' | awk -F' ' '{print $2,$10}' | grep '00000000:%s' | awk -F' ' '{print $2}' | uniq" % port
res = client.execute_command(cmd)
if not res or not res.stdout.strip():
return False
stdio.verbose(res.stdout)
return res.stdout.strip().split('\n')
def start_check(plugin_context, strict_check=False, *args, **kwargs):
# def alert(*arg, **kwargs):
# global success
# if strict_check:
# success = False
# stdio.error(*arg, **kwargs)
# else:
# stdio.warn(*arg, **kwargs)
def critical(*arg, **kwargs):
global success
success = False
stdio.error(*arg, **kwargs)
global stdio
cluster_config = plugin_context.cluster_config
clients = plugin_context.clients
stdio = plugin_context.stdio
servers_port = {}
stdio.start_loading('Check before start obagent')
for server in cluster_config.servers:
ip = server.ip
client = clients[server]
server_config = cluster_config.get_server_conf(server)
port = int(server_config["server_port"])
prometheus_port = int(server_config["pprof_port"])
remote_pid_path = "%s/run/obagent-%s-%s.pid" % (server_config['home_path'], server.ip, server_config["server_port"])
remote_pid = client.execute_command("cat %s" % remote_pid_path).stdout.strip()
if remote_pid:
if client.execute_command('ls /proc/%s' % remote_pid):
continue
if ip not in servers_port:
servers_port[ip] = {}
ports = servers_port[ip]
server_config = cluster_config.get_server_conf_with_default(server)
stdio.verbose('%s port check' % server)
for key in ['server_port', 'pprof_port']:
port = int(server_config[key])
# alert_f = alert if key == 'pprof_port' else critical
if port in ports:
critical(EC_CONFIG_CONFLICT_PORT.format(server1=server, port=port, server2=ports[port]['server'], key=ports[port]['key']))
continue
ports[port] = {
'server': server,
'key': key
}
if get_port_socket_inode(client, port):
critical('%s:%s port is already used' % (ip, port))
if success:
stdio.stop_loading('succeed')
plugin_context.return_true()
else:
stdio.stop_loading('fail')
\ No newline at end of file
......@@ -77,14 +77,14 @@ def connect(plugin_context, target_server=None, sys_root=True, *args, **kwargs):
for comp in ['oceanbase', 'oceanbase-ce']:
if comp in cluster_config.depends:
ob_config = cluster_config.get_depled_config(comp)
ob_config = cluster_config.get_depend_config(comp)
if not ob_config:
continue
odp_config = cluster_config.get_global_conf()
config_map = {
'observer_sys_password': 'proxyro_password',
'cluster_name': 'appname',
'root_password': 'observer_root_password'
'observer_root_password': 'root_password'
}
for key in config_map:
ob_key = config_map[key]
......@@ -101,14 +101,12 @@ def connect(plugin_context, target_server=None, sys_root=True, *args, **kwargs):
server_config = cluster_config.get_server_conf(server)
if sys_root:
pwd_key = 'obproxy_sys_password'
default_pwd = 'proxysys'
else:
pwd_key = 'observer_root_password'
default_pwd = ''
r_password = password if password else server_config.get(pwd_key)
if r_password is None:
r_password = ''
db, cursor = _connect(server.ip, server_config['listen_port'], user, r_password if count % 2 else default_pwd)
db, cursor = _connect(server.ip, server_config['listen_port'], user, r_password if count % 2 else '')
dbs[server] = db
cursors[server] = cursor
except:
......
......@@ -20,18 +20,20 @@
from __future__ import absolute_import, division, print_function
from _errno import EC_CLEAN_PATH_FAILED
global_ret = True
def destroy(plugin_context, *args, **kwargs):
def clean(server, path):
client = clients[server]
ret = client.execute_command('rm -fr %s/* %s/.conf' % (path, path))
ret = client.execute_command('rm -fr %s/' % (path))
if not ret:
# pring stderror
global global_ret
global_ret = False
stdio.warn('fail to clean %s:%s' % (server, path))
stdio.warn(EC_CLEAN_PATH_FAILED.format(server=server, path=path))
else:
stdio.verbose('%s:%s cleaned' % (server, path))
cluster_config = plugin_context.cluster_config
......
......@@ -19,6 +19,8 @@
from __future__ import absolute_import, division, print_function
from _errno import EC_FAIL_TO_INIT_PATH, InitDirFailedErrorMessage
def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
cluster_config = plugin_context.cluster_config
......@@ -31,20 +33,20 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
server_config = cluster_config.get_server_conf(server)
client = clients[server]
home_path = server_config['home_path']
remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
remote_home_path = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
stdio.verbose('%s init cluster work home', server)
if force:
ret = client.execute_command('rm -fr %s/*' % home_path)
ret = client.execute_command('rm -fr %s' % home_path)
if not ret:
global_ret = False
stdio.error('failed to initialize %s home path: %s' % (server, ret.stderr))
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=ret.stderr))
continue
if not (client.execute_command("bash -c 'mkdir -p %s/{run,bin,lib}'" % (home_path)) \
and client.execute_command("if [ -d %s/bin ]; then ln -fs %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -fs %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))):
global_ret = False
stdio.error('fail to init %s home path', server)
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.NOT_EMPTY.format(path=home_path)))
if global_ret:
stdio.stop_loading('succeed')
......
......@@ -19,6 +19,7 @@ function start() {
if [ $? != 0 ]; then
exit $?
fi
kill -9 $pid
while [ 1 ];
do
......
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
from _rpm import Version
def ocp_check(plugin_context, ocp_version, cursor, new_cluster_config=None, new_clients=None, *args, **kwargs):
cluster_config = new_cluster_config if new_cluster_config else plugin_context.cluster_config
clients = new_clients if new_clients else plugin_context.clients
stdio = plugin_context.stdio
is_admin = True
can_sudo = True
only_one = True
min_version = Version('3.1.1')
max_version = min_version
ocp_version = Version(ocp_version)
if ocp_version < min_version:
stdio.error('The current plugin version does not support OCP V%s' % ocp_version)
return
if ocp_version > max_version:
stdio.warn('The plugin library does not support OCP V%s. The takeover requirements are not applicable to the current check.' % ocp_version)
for server in cluster_config.servers:
client = clients[server]
if is_admin and client.config.username != 'admin':
is_admin = False
stdio.error('The current user must be the admin user. Run the edit-config command to modify the user.username field')
if can_sudo and not client.execute_command('sudo whoami'):
can_sudo = False
stdio.error('The user must have the privilege to run sudo commands without a password.')
if not client.execute_command('bash -c "if [ `pgrep obproxy | wc -l` -gt 1 ]; then exit 1; else exit 0;fi;"'):
only_one = False
stdio.error('%s Multiple OBProxies exist.' % server)
if is_admin and can_sudo and only_one:
stdio.print('Configurations of the OBProxy can be taken over by OCP after they take effect.' if new_cluster_config else 'Configurations of the OBProxy can be taken over by OCP.')
return plugin_context.return_true()
\ No newline at end of file
......@@ -37,8 +37,8 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs):
for comp in ['oceanbase', 'oceanbase-ce']:
if comp in cluster_config.depends:
root_servers = {}
ob_config = cluster_config.get_depled_config(comp)
new_ob_config = new_cluster_config.get_depled_config(comp)
ob_config = cluster_config.get_depend_config(comp)
new_ob_config = new_cluster_config.get_depend_config(comp)
ob_config = {} if ob_config is None else ob_config
new_ob_config = {} if new_ob_config is None else new_ob_config
for key in config_map:
......@@ -56,6 +56,18 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs):
stdio.verbose('compare configuration of %s' % (server))
for key in new_config:
if key not in config or config[key] != new_config[key]:
item = cluster_config.get_temp_conf_item(key)
if item:
if item.need_redeploy or item.need_restart:
stdio.verbose('%s can not be reload' % key)
global_ret = False
continue
try:
item.modify_limit(config.get(key), new_config.get(key))
except Exception as e:
stdio.verbose('%s: %s' % (server, str(e)))
global_ret = False
continue
change_conf[server][key] = new_config[key]
if key not in global_change_conf:
global_change_conf[key] = 1
......@@ -64,6 +76,7 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs):
servers_num = len(servers)
stdio.verbose('apply new configuration')
stdio.start_load('Reload obproxy')
success_conf = {}
sql = ''
value = None
......@@ -87,4 +100,10 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs):
for server in success_conf[key]:
value = change_conf[server][key]
cluster_config.update_server_conf(server,key, value, False)
return plugin_context.return_true() if global_ret else None
if global_ret:
stdio.stop_load('succeed')
return plugin_context.return_true()
else:
stdio.stop_load('fail')
return
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import os
class Restart(object):
def __init__(self, plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config=None, new_clients=None):
self.local_home_path = local_home_path
self.plugin_context = plugin_context
self.components = plugin_context.components
self.clients = plugin_context.clients
self.cluster_config = plugin_context.cluster_config
self.stdio = plugin_context.stdio
self.repository = repository
self.start_plugin = start_plugin
self.reload_plugin = reload_plugin
self.connect_plugin = connect_plugin
self.display_plugin = display_plugin
self.stop_plugin = stop_plugin
self.new_clients = new_clients
self.new_cluster_config = new_cluster_config
self.sub_io = self.stdio.sub_io()
self.dbs = None
self.cursors = None
# def close(self):
# if self.dbs:
# for server in self.cursors:
# self.cursors[server].close()
# for db in self.dbs:
# self.dbs[server].close()
# self.cursors = None
# self.dbs = None
def connect(self):
if self.cursors is None:
self.stdio.verbose('Call %s for %s' % (self.connect_plugin, self.repository))
self.sub_io.start_loading('Connect to obproxy')
ret = self.connect_plugin(self.components, self.clients, self.cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io)
if not ret:
self.sub_io.stop_loading('fail')
return False
self.sub_io.stop_loading('succeed')
# self.close()
self.cursors = ret.get_return('cursor')
self.dbs = ret.get_return('connect')
return True
def dir_read_check(self, client, path):
if not client.execute_command('cd %s' % path):
dirpath, name = os.path.split(path)
return self.dir_read_check(client, dirpath) and client.execute_command('sudo chmod +1 %s' % path)
return True
def restart(self):
clients = self.clients
self.stdio.verbose('Call %s for %s' % (self.stop_plugin, self.repository))
if not self.stop_plugin(self.components, clients, self.cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io):
self.stdio.stop_loading('stop_loading', 'fail')
return False
if self.new_clients:
self.stdio.verbose('use new clients')
for server in self.cluster_config.servers:
client = clients[server]
new_client = self.new_clients[server]
server_config = self.cluster_config.get_server_conf(server)
home_path = server_config['home_path']
if not new_client.execute_command('sudo chown -R %s: %s' % (new_client.config.username, home_path)):
self.stdio.stop_loading('stop_loading', 'fail')
return False
self.dir_read_check(new_client, home_path)
clients = self.new_clients
cluster_config = self.new_cluster_config if self.new_cluster_config else self.cluster_config
self.stdio.verbose('Call %s for %s' % (self.start_plugin, self.repository))
if not self.start_plugin(self.components, clients, cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io, local_home_path=self.local_home_path, repository_dir=self.repository.repository_dir):
self.rollback()
self.stdio.stop_loading('stop_loading', 'fail')
return False
if self.connect():
ret = self.display_plugin(self.components, clients, cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io, cursor=self.cursors)
if self.new_cluster_config:
self.stdio.verbose('Call %s for %s' % (self.reload_plugin, self.repository))
self.reload_plugin(self.components, self.clients, self.cluster_config, [], {}, self.sub_io,
cursor=self.cursors, new_cluster_config=self.new_cluster_config, repository_dir=self.repository.repository_dir)
return ret
return False
def rollback(self):
if self.new_clients:
self.stop_plugin(self.components, self.new_clients, self.new_cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io)
for server in self.cluster_config.servers:
client = self.clients[server]
new_client = self.new_clients[server]
server_config = self.cluster_config.get_server_conf(server)
home_path = server_config['home_path']
new_client.execute_command('sudo chown -R %s: %s' % (client.config.username, home_path))
def restart(plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config=None, new_clients=None, rollback=False, *args, **kwargs):
task = Restart(plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config, new_clients)
call = task.rollback if rollback else task.restart
if call():
plugin_context.return_true()
......@@ -100,12 +100,12 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
for comp in ['oceanbase', 'oceanbase-ce']:
if comp in cluster_config.depends:
root_servers = {}
ob_config = cluster_config.get_depled_config(comp)
ob_config = cluster_config.get_depend_config(comp)
if not ob_config:
continue
odp_config = cluster_config.get_global_conf()
for server in cluster_config.get_depled_servers(comp):
config = cluster_config.get_depled_config(comp, server)
for server in cluster_config.get_depend_servers(comp):
config = cluster_config.get_depend_config(comp, server)
zone = config['zone']
if zone not in root_servers:
root_servers[zone] = '%s:%s' % (server.ip, config['mysql_port'])
......@@ -124,7 +124,6 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
error = False
for server in cluster_config.servers:
client = clients[server]
server_config = cluster_config.get_server_conf(server)
if 'rs_list' not in server_config and 'obproxy_config_server_url' not in server_config:
error = True
......@@ -142,13 +141,20 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
need_bootstrap = True
break
if getattr(options, 'without_parameter', False) and need_bootstrap is False:
use_parameter = False
else:
# Bootstrap is required when starting with parameter, ensure the passwords are correct.
need_bootstrap = True
use_parameter = True
for server in cluster_config.servers:
client = clients[server]
server_config = cluster_config.get_server_conf(server)
home_path = server_config['home_path']
if client.execute_command("bash -c 'if [ -f %s/bin/obproxy ]; then exit 1; else exit 0; fi;'" % home_path):
remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
remote_home_path = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
client.execute_command("bash -c 'mkdir -p %s/{bin,lib}'" % (home_path))
client.execute_command("ln -fs %s/bin/* %s/bin" % (remote_repository_dir, home_path))
......@@ -156,11 +162,6 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
pid_path[server] = "%s/run/obproxy-%s-%s.pid" % (home_path, server.ip, server_config["listen_port"])
if getattr(options, 'without_parameter', False) and need_bootstrap is False:
use_parameter = False
else:
use_parameter = True
if use_parameter:
not_opt_str = [
'listen_port',
......@@ -170,7 +171,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
]
start_unuse = ['home_path', 'observer_sys_password', 'obproxy_sys_password', 'observer_root_password']
get_value = lambda key: "'%s'" % server_config[key] if isinstance(server_config[key], str) else server_config[key]
opt_str = ["obproxy_sys_password=e3fd448c516073714189b57233c9cf428ccb1bed"]
opt_str = ["obproxy_sys_password=''"]
for key in server_config:
if key not in start_unuse and key not in not_opt_str:
value = get_value(key)
......@@ -217,7 +218,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
stdio.start_loading('obproxy program health check')
failed = []
servers = cluster_config.servers
count = 8
count = 20
while servers and count:
count -= 1
tmp_servers = []
......@@ -236,7 +237,6 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
else:
client.execute_command('echo %s > %s' % (pid, pid_path[server]))
obproxyd(server_config["home_path"], client, server.ip, server_config["listen_port"])
client.execute_command('cat %s | xargs kill -9' % pid_path[server])
tmp_servers.append(server)
break
stdio.verbose('failed to start %s obproxy, remaining retries: %d' % (server, count))
......
......@@ -20,6 +20,8 @@
from __future__ import absolute_import, division, print_function
from _errno import EC_CONFIG_CONFLICT_PORT
stdio = None
success = True
......@@ -74,7 +76,7 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs):
port = int(server_config[key])
alert_f = alert if key == 'prometheus_listen_port' else critical
if port in ports:
alert_f('Configuration conflict %s: %s port is used for %s\'s %s' % (server, port, ports[port]['server'], ports[port]['key']))
alert_f(EC_CONFIG_CONFLICT_PORT.format(server1=server, port=port, server2=ports[port]['server'], key=ports[port]['key']))
continue
ports[port] = {
'server': server,
......
......@@ -42,7 +42,7 @@ def upgrade(plugin_context, search_py_script_plugin, apply_param_plugin, *args,
client = clients[server]
server_config = cluster_config.get_server_conf(server)
home_path = server_config['home_path']
remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
remote_home_path = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
client.execute_command("bash -c 'mkdir -p %s/{bin,lib}'" % (home_path))
client.execute_command("ln -fs %s/bin/* %s/bin" % (remote_repository_dir, home_path))
......
......@@ -21,6 +21,7 @@
from __future__ import absolute_import, division, print_function
import time
from _deploy import InnerConfigItem
def bootstrap(plugin_context, cursor, *args, **kwargs):
......@@ -28,6 +29,11 @@ def bootstrap(plugin_context, cursor, *args, **kwargs):
stdio = plugin_context.stdio
bootstrap = []
floor_servers = {}
zones_config = {}
inner_config = {
InnerConfigItem('$_zone_idc'): 'idc'
}
inner_keys = inner_config.keys()
for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server)
zone = server_config['zone']
......@@ -35,7 +41,18 @@ def bootstrap(plugin_context, cursor, *args, **kwargs):
floor_servers[zone].append('%s:%s' % (server.ip, server_config['rpc_port']))
else:
floor_servers[zone] = []
zones_config[zone] = {}
bootstrap.append('REGION "sys_region" ZONE "%s" SERVER "%s:%s"' % (server_config['zone'], server.ip, server_config['rpc_port']))
zone_config = zones_config[zone]
for key in server_config:
if not isinstance(key, InnerConfigItem):
continue
if key not in inner_config:
continue
if key in zone_config:
continue
zone_config[key] = server_config[key]
try:
sql = 'set session ob_query_timeout=1000000000'
stdio.verbose('execute sql: %s' % sql)
......@@ -52,16 +69,22 @@ def bootstrap(plugin_context, cursor, *args, **kwargs):
global_conf = cluster_config.get_global_conf()
if 'proxyro_password' in global_conf or 'obproxy' in plugin_context.components:
value = global_conf['proxyro_password'] if global_conf.get('proxyro_password') is not None else ''
sql = 'create user "proxyro" IDENTIFIED BY "%s"' % value
sql = 'create user "proxyro" IDENTIFIED BY %s'
stdio.verbose(sql)
cursor.execute(sql)
sql = 'grant select on oceanbase.* to proxyro IDENTIFIED BY "%s"' % value
cursor.execute(sql, [value])
sql = 'grant select on oceanbase.* to proxyro IDENTIFIED BY %s'
stdio.verbose(sql)
cursor.execute(sql)
cursor.execute(sql, [value])
if global_conf.get('root_password') is not None:
sql = 'alter user "root" IDENTIFIED BY "%s"' % global_conf.get('root_password')
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
for zone in zones_config:
zone_config = zones_config[zone]
for key in zone_config:
sql = 'alter system modify zone %s set %s = %%s' % (zone, inner_config[key])
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql, [zone_config[key]])
stdio.stop_loading('succeed')
plugin_context.return_true()
except:
......
......@@ -24,6 +24,8 @@ from __future__ import absolute_import, division, print_function
import re
import time
from _errno import EC_OBSERVER_CAN_NOT_MIGRATE_IN
def parse_size(size):
_bytes = 0
......@@ -36,7 +38,7 @@ def parse_size(size):
return _bytes
def formate_size(size, precision=1):
def format_size(size, precision=1):
units = ['B', 'K', 'M', 'G', 'T', 'P']
idx = 0
if precision:
......@@ -121,7 +123,7 @@ def create_tenant(plugin_context, cursor, *args, **kwargs):
count -= 1
time.sleep(1)
if count == 0:
stdio.error('server can not migrate in')
stdio.error(EC_OBSERVER_CAN_NOT_MIGRATE_IN)
return
except:
exception('execute sql exception: %s' % sql)
......@@ -188,9 +190,9 @@ def create_tenant(plugin_context, cursor, *args, **kwargs):
if cpu_total < MIN_CPU:
return error('%s: resource not enough: cpu count less than %s' % (zone_list, MIN_CPU))
if mem_total < MIN_MEMORY:
return error('%s: resource not enough: memory less than %s' % (zone_list, formate_size(MIN_MEMORY)))
return error('%s: resource not enough: memory less than %s' % (zone_list, format_size(MIN_MEMORY)))
if disk_total < MIN_DISK_SIZE:
return error('%s: resource not enough: disk space less than %s' % (zone_list, formate_size(MIN_DISK_SIZE)))
return error('%s: resource not enough: disk space less than %s' % (zone_list, format_size(MIN_DISK_SIZE)))
max_cpu = get_option('max_cpu', cpu_total)
max_memory = parse_size(get_option('max_memory', mem_total))
......@@ -204,9 +206,9 @@ def create_tenant(plugin_context, cursor, *args, **kwargs):
if cpu_total < max_cpu:
return error('resource not enough: cpu (Avail: %s, Need: %s)' % (cpu_total, max_cpu))
if mem_total < max_memory:
return error('resource not enough: memory (Avail: %s, Need: %s)' % (formate_size(mem_total), formate_size(max_memory)))
return error('resource not enough: memory (Avail: %s, Need: %s)' % (format_size(mem_total), format_size(max_memory)))
if disk_total < max_disk_size:
return error('resource not enough: disk space (Avail: %s, Need: %s)' % (formate_size(disk_total), formate_size(max_disk_size)))
return error('resource not enough: disk space (Avail: %s, Need: %s)' % (format_size(disk_total), format_size(max_disk_size)))
if max_iops < MIN_IOPS:
return error('max_iops must greater than %d' % MIN_IOPS)
......
......@@ -20,18 +20,20 @@
from __future__ import absolute_import, division, print_function
from _errno import EC_CLEAN_PATH_FAILED
global_ret = True
def destroy(plugin_context, *args, **kwargs):
def clean(server, path):
client = clients[server]
ret = client.execute_command('rm -fr %s/* %s/.conf' % (path, path))
ret = client.execute_command('rm -fr %s/' % (path))
if not ret:
# print stderror
global global_ret
global_ret = False
stdio.warn('fail to clean %s:%s' % (server, path))
stdio.warn(EC_CLEAN_PATH_FAILED.format(server=server, path=path))
else:
stdio.verbose('%s:%s cleaned' % (server, path))
cluster_config = plugin_context.cluster_config
......
......@@ -23,6 +23,8 @@ from __future__ import absolute_import, division, print_function
import re, os
from _errno import EC_OBSERVER_NOT_ENOUGH_MEMORY
def parse_size(size):
_bytes = 0
......@@ -145,7 +147,7 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
free_memory = parse_size(str(v))
memory_limit = free_memory
if memory_limit < MIN_MEMORY:
stdio.error('(%s) not enough memory. (Free: %s, Need: %s)' % (ip, format_size(free_memory), format_size(MIN_MEMORY)))
stdio.error(EC_OBSERVER_NOT_ENOUGH_MEMORY.format(ip=ip, free=format_size(free_memory), need=format_size(MIN_MEMORY)))
success = False
continue
memory_limit = max(MIN_MEMORY, memory_limit * 0.9)
......@@ -173,7 +175,7 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
ret = client.execute_command("grep -e 'processor\s*:' /proc/cpuinfo | wc -l")
if ret and ret.stdout.strip().isdigit():
cpu_num = int(ret.stdout)
server_config['cpu_count'] = max(16, int(cpu_num * - 2))
server_config['cpu_count'] = max(16, int(cpu_num - 2))
else:
server_config['cpu_count'] = 16
......
......@@ -21,6 +21,8 @@
from __future__ import absolute_import, division, print_function
import os
from _errno import EC_CONFIG_CONFLICT_DIR, EC_FAIL_TO_INIT_PATH, InitDirFailedErrorMessage
stdio = None
force = False
......@@ -34,18 +36,18 @@ def critical(*arg, **kwargs):
def init_dir(server, client, key, path, link_path=None):
if force:
ret = client.execute_command('rm -fr %s/*' % path)
ret = client.execute_command('rm -fr %s' % path)
if not ret:
critical('fail to initialize %s %s path: %s permission denied' % (server, key, ret.stderr))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='%s path' % key, msg=ret.stderr))
return False
else:
if client.execute_command('mkdir -p %s' % path):
ret = client.execute_command('ls %s' % (path))
if not ret or ret.stdout.strip():
critical('fail to initialize %s %s path: %s is not empty' % (server, key, path))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='%s path' % key, msg=InitDirFailedErrorMessage.NOT_EMPTY.format(path=path)))
return False
else:
critical('fail to initialize %s %s path: create %s failed' % (server, key, path))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='%s path' % key, msg=InitDirFailedErrorMessage.CREATE_FAILED.format(path=path)))
return False
ret = client.execute_command('mkdir -p %s' % path)
if ret:
......@@ -53,7 +55,7 @@ def init_dir(server, client, key, path, link_path=None):
client.execute_command("if [ ! '%s' -ef '%s' ]; then ln -sf %s %s; fi" % (path, link_path, path, link_path))
return True
else:
critical('fail to initialize %s %s path: %s permission denied' % (server, key, ret.stderr))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='%s path' % key, msg=ret.stderr))
return False
......@@ -74,7 +76,7 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
server_config = cluster_config.get_server_conf(server)
client = clients[server]
home_path = server_config['home_path']
remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
remote_home_path = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
if not server_config.get('data_dir'):
......@@ -94,7 +96,7 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
for key in keys:
path = server_config[key]
if path in dirs:
critical('Configuration conflict %s: %s is used for %s\'s %s' % (server, path, dirs[path]['server'], dirs[path]['key']))
critical(EC_CONFIG_CONFLICT_DIR.format(server1=server, path=path, server2=dirs[path]['server'], key=dirs[path]['key']))
continue
dirs[path] = {
'server': server,
......@@ -105,16 +107,16 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if force:
ret = client.execute_command('rm -fr %s/*' % home_path)
if not ret:
critical('failed to initialize %s home path: %s' % (server, ret.stderr))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=ret.stderr))
continue
else:
if client.execute_command('mkdir -p %s' % home_path):
ret = client.execute_command('ls %s' % (home_path))
if not ret or ret.stdout.strip():
critical('fail to init %s home path: %s is not empty' % (server, home_path))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.NOT_EMPTY.format(path=home_path)))
continue
else:
critical('fail to init %s home path: create %s failed' % (server, home_path))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.CREATE_FAILED.format(path=home_path)))
ret = client.execute_command('bash -c "mkdir -p %s/{etc,admin,.conf,log,bin,lib}"' % home_path) \
and client.execute_command("if [ -d %s/bin ]; then ln -fs %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -fs %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))
......@@ -123,16 +125,16 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if force:
ret = client.execute_command('rm -fr %s/*' % data_path)
if not ret:
critical('fail to init %s data path: %s permission denied' % (server, ret.stderr))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='data dir', msg=InitDirFailedErrorMessage.PERMISSION_DENIED.format(path=data_path)))
continue
else:
if client.execute_command('mkdir -p %s' % data_path):
ret = client.execute_command('ls %s' % (data_path))
if not ret or ret.stdout.strip():
critical('fail to init %s data path: %s is not empty' % (server, data_path))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='data dir', msg=InitDirFailedErrorMessage.NOT_EMPTY.format(path=data_path)))
continue
else:
critical('fail to init %s data path: create %s failed' % (server, data_path))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='data dir', msg=InitDirFailedErrorMessage.CREATE_FAILED.format(path=data_path)))
ret = client.execute_command('bash -c "mkdir -p %s/sstable"' % data_path)
if ret:
link_path = '%s/store' % home_path
......@@ -143,26 +145,26 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if force:
ret = client.execute_command('rm -fr %s/*' % log_dir)
if not ret:
critical('fail to init %s %s dir: %s permission denied' % (server, key, ret.stderr))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='%s dir' % key, msg=InitDirFailedErrorMessage.PERMISSION_DENIED.format(path=log_dir)))
continue
else:
if client.execute_command('mkdir -p %s' % log_dir):
ret = client.execute_command('ls %s' % (log_dir))
if not ret or ret.stdout.strip():
critical('fail to init %s %s dir: %s is not empty' % (server, key, log_dir))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='%s dir' % key, msg=InitDirFailedErrorMessage.NOT_EMPTY.format(path=log_dir)))
continue
else:
critical('fail to init %s %s dir: create %s failed' % (server, key, log_dir))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='%s dir' % key, msg=InitDirFailedErrorMessage.CREATE_FAILED.format(path=log_dir)))
ret = client.execute_command('mkdir -p %s' % log_dir)
if ret:
link_path = '%s/%s' % (data_path, key)
client.execute_command("if [ ! '%s' -ef '%s' ]; then ln -sf %s %s; fi" % (log_dir, link_path, log_dir, link_path))
else:
critical('failed to initialize %s %s dir' % (server, key))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='%s dir' % key, msg=ret.stderr))
else:
critical('failed to initialize %s date path' % (server))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='data dir', msg=InitDirFailedErrorMessage.PATH_ONLY.format(path=data_path)))
else:
critical('fail to init %s home path: %s permission denied' % (server, ret.stderr))
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.PERMISSION_DENIED.format(path=home_path)))
if global_ret:
stdio.stop_loading('succeed')
......
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
from _rpm import Version
from _deploy import InnerConfigItem
def ocp_check(plugin_context, ocp_version, cursor, new_cluster_config=None, new_clients=None, *args, **kwargs):
cluster_config = new_cluster_config if new_cluster_config else plugin_context.cluster_config
clients = new_clients if new_clients else plugin_context.clients
stdio = plugin_context.stdio
is_admin = True
can_sudo = True
only_one = True
pwd_not_empty = True
min_version = Version('3.1.1')
max_version = min_version
ocp_version = Version(ocp_version)
if ocp_version < min_version:
stdio.error('The current plugin version does not support OCP V%s' % ocp_version)
return
if ocp_version > max_version:
stdio.warn('The plugin library does not support OCP V%s. The takeover requirements are not applicable to the current check.' % ocp_version)
for server in cluster_config.servers:
client = clients[server]
if is_admin and client.config.username != 'admin':
is_admin = False
stdio.error('The current user must be the admin user. Run the edit-config command to modify the user.username field')
if can_sudo and not client.execute_command('sudo whoami'):
can_sudo = False
stdio.error('The user must have the privilege to run sudo commands without a password.')
if not client.execute_command('bash -c "if [ `pgrep observer | wc -l` -gt 1 ]; then exit 1; else exit 0;fi;"'):
only_one = False
stdio.error('%s Multiple OBservers exist.' % server)
try:
cursor.execute("select * from oceanbase.__all_user where user_name = 'root' and passwd = ''")
if cursor.fetchone() and not cluster_config.get_global_conf().get("root_password"):
pwd_not_empty = False
stdio.error('The password of root@sys is empty. Run the edit-config command to modify the root_password value of %s.' % cluster_config.name)
except:
if not cluster_config.get_global_conf().get("root_password"):
pwd_not_empty = False
zones = {}
try:
cursor.execute("select zone from oceanbase.__all_zone where name = 'idc' and info = ''")
ret = cursor.fetchall()
if ret:
for row in ret:
zones[str(row['zone'])] = 1
finally:
for server in cluster_config.servers:
config = cluster_config.get_server_conf(server)
zone = str(config.get('zone'))
if zone in zones and config.get('$_zone_idc'):
keys = list(config.keys())
if '$_zone_idc' in keys and isinstance(keys[keys.index('$_zone_idc')], InnerConfigItem):
del zones[zone]
if zones:
if not cluster_config.parser or cluster_config.parser.STYLE == 'default':
stdio.error('Zone: IDC information is missing for %s. Run the chst command to change the configuration style of %s to cluster, and then run the edit-config command to add IDC information.' % (','.join(zones.keys()), cluster_config.name))
else:
stdio.error('Zone: IDC information is missing for %s. Run the edit-config command to add IDC information.' % ','.join(zones.keys()))
if is_admin and can_sudo and only_one and pwd_not_empty and not zones:
stdio.print('Configurations of the %s can be taken over by OCP after they take effect.' % cluster_config.name if new_cluster_config else 'Configurations of the %s can be taken over by OCP.' % cluster_config.name)
return plugin_context.return_true()
\ No newline at end of file
......@@ -20,11 +20,19 @@
from __future__ import absolute_import, division, print_function
from _deploy import InnerConfigItem
from _errno import EC_OBSERVER_INVALID_MODFILY_GLOBAL_KEY
def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs):
stdio = plugin_context.stdio
cluster_config = plugin_context.cluster_config
servers = cluster_config.servers
inner_config = {
InnerConfigItem('$_zone_idc'): 'idc'
}
inner_keys = inner_config.keys()
zones_config = {}
cluster_server = {}
change_conf = {}
global_change_conf = {}
......@@ -41,34 +49,65 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs):
for key in new_config:
n_value = new_config[key]
if key not in config or config[key] != n_value:
change_conf[server][key] = n_value
if key not in global_change_conf:
global_change_conf[key] = {'value': n_value, 'count': 1}
elif n_value == global_change_conf[key]['value']:
global_change_conf[key]['count'] += 1
if isinstance(key, InnerConfigItem) and key in inner_keys:
zone = config['zone']
if zone not in zones_config:
zones_config[zone] = {}
zones_config[zone][key] = n_value
else:
item = cluster_config.get_temp_conf_item(key)
if item:
if item.need_redeploy or item.need_restart:
stdio.verbose('%s can not be reload' % key)
global_ret = False
continue
try:
item.modify_limit(config.get(key), n_value)
except Exception as e:
global_ret = False
stdio.verbose('%s: %s' % (server, str(e)))
continue
change_conf[server][key] = n_value
if key not in global_change_conf:
global_change_conf[key] = {'value': n_value, 'count': 1}
elif n_value == global_change_conf[key]['value']:
global_change_conf[key]['count'] += 1
servers_num = len(servers)
stdio.verbose('apply new configuration')
stdio.start_load('Reload observer')
for zone in zones_config:
zone_config = zones_config[zone]
for key in zone_config:
msg = ''
try:
msg = sql = 'alter system modify zone %s set %s = %%s' % (zone, inner_config[key])
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql, [zone_config[key]])
stdio.verbose('%s ok' % sql)
except:
global_ret = False
stdio.exception('execute sql exception: %s' % msg)
for key in global_change_conf:
msg = ''
try:
if key in ['proxyro_password', 'root_password']:
if global_change_conf[key]['count'] != servers_num:
stdio.warn('Invalid: proxyro_password is not a single server configuration item')
stdio.warn(EC_OBSERVER_INVALID_MODFILY_GLOBAL_KEY.format(key=key))
continue
value = change_conf[server][key] if change_conf[server].get(key) is not None else ''
user = key.split('_')[0]
msg = sql = 'CREATE USER IF NOT EXISTS %s IDENTIFIED BY "%s"' % (user, value)
msg = sql = 'CREATE USER IF NOT EXISTS %s IDENTIFIED BY %%s' % (user)
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
msg = sql = 'alter user "%s" IDENTIFIED BY "%s"' % (user, value)
cursor.execute(sql, [value])
msg = sql = 'alter user "%s" IDENTIFIED BY %%s' % (user)
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
cursor.execute(sql, [value])
continue
if global_change_conf[key]['count'] == servers_num:
sql = 'alter system set %s = %%s' % key
value = change_conf[server][key]
msg = sql % value
stdio.verbose('execute sql: %s' % msg)
cursor.execute(sql, [value])
cluster_config.update_global_conf(key, value, False)
......@@ -76,11 +115,9 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs):
for server in servers:
if key not in change_conf[server]:
continue
sql = 'alter system set %s = %%s server=%%s' % key
value = (change_conf[server][key], cluster_server[server])
msg = sql % value
msg = sql = 'alter system set %s = %%s server=%%s' % key
stdio.verbose('execute sql: %s' % msg)
cursor.execute(sql, value)
cursor.execute(sql, [change_conf[server][key], cluster_server[server]])
cluster_config.update_server_conf(server,key, value, False)
except:
global_ret = False
......@@ -89,4 +126,10 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs):
cursor.execute('alter system reload server')
cursor.execute('alter system reload zone')
cursor.execute('alter system reload unit')
return plugin_context.return_true() if global_ret else None
if global_ret:
stdio.stop_load('succeed')
return plugin_context.return_true()
else:
stdio.stop_load('fail')
return
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import os
import time
class Restart(object):
def __init__(self, plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config=None, new_clients=None):
self.local_home_path = local_home_path
self.plugin_context = plugin_context
self.components = plugin_context.components
self.clients = plugin_context.clients
self.cluster_config = plugin_context.cluster_config
self.stdio = plugin_context.stdio
self.repository = repository
self.start_plugin = start_plugin
self.reload_plugin = reload_plugin
self.connect_plugin = connect_plugin
self.stop_plugin = stop_plugin
self.display_plugin = display_plugin
self.new_clients = new_clients
self.new_cluster_config = new_cluster_config
self.now_clients = {}
self.sub_io = self.stdio.sub_io()
self.db = None
self.cursor = None
for server in self.cluster_config.servers:
self.now_clients[server] = self.clients[server]
def close(self):
if self.db:
self.cursor.close()
self.db.close()
self.cursor = None
self.db = None
def connect(self):
if self.cursor is None or self.execute_sql('select version()', error=False) is False:
self.stdio.verbose('Call %s for %s' % (self.connect_plugin, self.repository))
self.sub_io.start_loading('Connect to observer')
ret = self.connect_plugin(self.components, self.clients, self.cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io)
if not ret:
self.sub_io.stop_loading('fail')
return False
self.sub_io.stop_loading('succeed')
self.close()
self.cursor = ret.get_return('cursor')
self.db = ret.get_return('connect')
while self.execute_sql('use oceanbase', error=False) is False:
time.sleep(2)
self.execute_sql('set session ob_query_timeout=1000000000')
return True
def execute_sql(self, query, args=None, one=True, error=True):
msg = query % tuple(args) if args is not None else query
self.stdio.verbose("query: %s. args: %s" % (query, args))
try:
self.stdio.verbose('execute sql: %s' % msg)
self.cursor.execute(query, args)
result = self.cursor.fetchone() if one else self.cursor.fetchall()
result and self.stdio.verbose(result)
return result
except:
msg = 'execute sql exception: %s' % msg if error else ''
self.stdio.exception(msg)
return False
def broken_sql(self, sql, sleep_time=3):
while True:
ret = self.execute_sql(sql, error=False)
if ret is None:
break
time.sleep(sleep_time)
def wait(self):
if not self.connect():
return False
self.stdio.verbose('server cneck')
self.broken_sql("select * from oceanbase.__all_server where status != 'active' or stop_time > 0 or start_service_time = 0")
self.broken_sql("select * from oceanbase.__all_virtual_clog_stat where is_in_sync= 0 and is_offline = 0")
return True
def start_zone(self, zone=None):
if not self.connect():
return False
if zone:
self.stdio.verbose('start zone %s' % zone)
start_sql = "alter system start zone %s" % zone
check_sql = "select * from oceanbase.__all_zone where name = 'status' and zone = '%s' and info != 'ACTIVE'" % zone
while True:
if self.execute_sql(start_sql, error=False) is None:
break
if self.execute_sql(check_sql, error=False) is None:
break
time.sleep(3)
self.wait()
return True
def stop_zone(self, zone):
if not self.wait():
return False
self.stdio.verbose('stop zone %s' % zone)
stop_sql = "alter system stop zone %s" % zone
check_sql = "select * from oceanbase.__all_zone where name = 'status' and zone = '%s' and info = 'ACTIVE'" % zone
while True:
if self.execute_sql(stop_sql, error=False) is None:
break
if self.execute_sql(check_sql, error=False):
break
time.sleep(3)
return True
def rollback(self):
if self.new_clients:
self.stdio.start_loading('Rollback')
self.stop_plugin(self.components, self.now_clients, self.new_cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io)
for server in self.cluster_config.servers:
client = self.clients[server]
new_client = self.now_clients[server]
server_config = self.cluster_config.get_server_conf(server)
home_path = server_config['home_path']
chown_cmd = 'sudo chown -R %s:' % client.config.username
for key in ['home_path', 'data_dir', 'redo_dir']:
if key in server_config:
chown_cmd += ' %s' % server_config[key]
new_client.execute_command(chown_cmd)
self.stdio.stop_loading('succeed')
def dir_read_check(self, client, path):
if not client.execute_command('cd %s' % path):
dirpath, name = os.path.split(path)
return self.dir_read_check(client, dirpath) and client.execute_command('sudo chmod +1 %s' % path)
return True
def _restart(self):
clients = self.clients
self.stdio.verbose('Call %s for %s' % (self.stop_plugin, self.repository))
if not self.stop_plugin(self.components, clients, self.cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io):
self.stdio.stop_loading('stop_loading', 'fail')
return False
if self.new_clients:
self.stdio.verbose('use new clients')
for server in self.cluster_config.servers:
new_client = self.new_clients[server]
server_config = self.cluster_config.get_server_conf(server)
chown_cmd = 'sudo chown -R %s:' % new_client.config.username
for key in ['home_path', 'data_dir', 'redo_dir']:
if key in server_config:
chown_cmd += ' %s' % server_config[key]
if not new_client.execute_command(chown_cmd):
self.stdio.stop_loading('stop_loading', 'fail')
return False
self.dir_read_check(new_client, server_config['home_path'])
self.now_clients[server] = new_client
clients = self.new_clients
cluster_config = self.new_cluster_config if self.new_cluster_config else self.cluster_config
self.stdio.verbose('Call %s for %s' % (self.start_plugin, self.repository))
if not self.start_plugin(self.components, clients, cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io, local_home_path=self.local_home_path, repository_dir=self.repository.repository_dir):
self.stdio.stop_loading('stop_loading', 'fail')
return False
return True
def rolling(self, zones_servers):
self.stdio.start_loading('Observer rotation restart')
all_servers = self.cluster_config.servers
pre_zone = None
for zone in zones_servers:
self.cluster_config.servers = zones_servers[zone]
if self.new_cluster_config:
self.new_cluster_config.servers = zones_servers[zone]
if not self.start_zone(pre_zone):
self.stdio.stop_loading('stop_loading', 'fail')
return False
while True:
for server in zones_servers[zone]:
config = self.cluster_config.get_server_conf(server)
sql = '''
select count(*) as cnt from oceanbase.__all_tenant as a left join (
select tenant_id, refreshed_schema_version
from oceanbase.__all_virtual_server_schema_info
where svr_ip = %s and svr_port = %s and refreshed_schema_version > 1
) as b on a.tenant_id = b.tenant_id
where b.tenant_id is null'''
if self.execute_sql(sql, args=(server.ip, config['rpc_port'])).get('cnt'):
break
else:
break
time.sleep(3)
while self.execute_sql("select * from oceanbase.__all_virtual_clog_stat where table_id = 1099511627777 and status != 'ACTIVE'"):
time.sleep(3)
self.stop_zone(zone)
if not self._restart():
return False
pre_zone = zone
if not self.start_zone(pre_zone):
self.stdio.stop_loading('stop_loading', 'fail')
return False
self.cluster_config.servers = all_servers
if self.new_cluster_config:
self.new_cluster_config.servers = all_servers
self.stdio.stop_loading('succeed')
return True
def un_rolling(self):
self.stdio.start_loading('Observer restart')
if not self._restart():
return False
self.wait()
self.stdio.stop_loading('succeed')
return True
def restart(self):
zones_servers = {}
all_servers = self.cluster_config.servers
if self.connect():
self.stdio.start_loading('Server check')
servers = self.execute_sql("select * from oceanbase.__all_server", one=False, error=False)
if len(self.cluster_config.servers) == len(servers):
for server in servers:
if server['status'] != 'active' or server['stop_time'] > 0 or server['start_service_time'] == 0:
break
else:
for server in self.cluster_config.servers:
config = self.cluster_config.get_server_conf_with_default(server)
zone = config['zone']
if zone not in zones_servers:
zones_servers[zone] = []
zones_servers[zone].append(server)
servers = self.cluster_config.servers
self.stdio.stop_loading('succeed')
ret = False
try:
if len(zones_servers) > 2:
ret = self.rolling(zones_servers)
else:
ret = self.un_rolling()
if ret and self.connect():
self.display_plugin(self.components, self.new_clients if self.new_clients else self.clients, self.new_cluster_config if self.new_cluster_config else self.cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io, cursor=self.cursor)
if self.new_cluster_config:
self.stdio.verbose('Call %s for %s' % (self.reload_plugin, self.repository))
self.reload_plugin(self.components, self.clients, self.cluster_config, [], {}, self.sub_io,
cursor=self.cursor, new_cluster_config=self.new_cluster_config, repository_dir=self.repository.repository_dir)
except Exception as e:
self.stdio.exception('Run Exception: %s' % e)
finally:
self.cluster_config.servers = all_servers
if self.new_cluster_config:
self.new_cluster_config.servers = all_servers
if not ret:
self.rollback()
return ret
def restart(plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config=None, new_clients=None, rollback=False, *args, **kwargs):
task = Restart(plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config, new_clients)
call = task.rollback if rollback else task.restart
if call():
plugin_context.return_true()
......@@ -26,6 +26,8 @@ import time
import requests
from copy import deepcopy
from _errno import EC_OBSERVER_FAIL_TO_START
def config_url(ocp_config_server, appname, cid):
cfg_url = '%s&Action=ObRootServiceInfo&ObCluster=%s' % (ocp_config_server, appname)
......@@ -97,7 +99,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
home_path = server_config['home_path']
if client.execute_command("bash -c 'if [ -f %s/bin/observer ]; then exit 1; else exit 0; fi;'" % home_path):
remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
remote_home_path = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
client.execute_command("bash -c 'mkdir -p %s/{bin,lib}'" % (home_path))
client.execute_command("ln -fs %s/bin/* %s/bin" % (remote_repository_dir, home_path))
......@@ -139,7 +141,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
}
not_cmd_opt = [
'home_path', 'obconfig_url', 'root_password', 'proxyro_password',
'redo_dir', 'clog_dir', 'ilog_dir', 'slog_dir'
'redo_dir', 'clog_dir', 'ilog_dir', 'slog_dir', '$_zone_idc'
]
get_value = lambda key: "'%s'" % server_config[key] if isinstance(server_config[key], str) else server_config[key]
opt_str = []
......@@ -168,7 +170,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
client.add_env('LD_LIBRARY_PATH', '', True)
if not ret:
stdio.stop_loading('fail')
stdio.error('failed to start %s observer: %s' % (server, ret.stderr))
stdio.error(EC_OBSERVER_FAIL_TO_START.format(server=server) + ': ' + ret.stderr)
return
stdio.stop_loading('succeed')
......@@ -185,7 +187,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if remote_pid and client.execute_command('ls /proc/%s' % remote_pid):
stdio.verbose('%s observer[pid: %s] started', server, remote_pid)
else:
failed.append('failed to start %s observer' % server)
failed.append(EC_OBSERVER_FAIL_TO_START.format(server=server))
if failed:
stdio.stop_loading('fail')
for msg in failed:
......
......@@ -24,6 +24,8 @@ import os
import re
import time
from _errno import EC_OBSERVER_NOT_ENOUGH_DISK_4_CLOG, EC_CONFIG_CONFLICT_PORT, EC_OBSERVER_NOT_ENOUGH_MEMORY
stdio = None
success = True
......@@ -50,7 +52,7 @@ def parse_size(size):
return _bytes
def formate_size(size):
def format_size(size):
units = ['B', 'K', 'M', 'G', 'T', 'P']
idx = 0
while idx < 5 and size >= 1024:
......@@ -120,7 +122,7 @@ def _start_check(plugin_context, strict_check=False, *args, **kwargs):
for key in ['mysql_port', 'rpc_port']:
port = int(server_config[key])
if port in ports:
critical('Configuration conflict %s: %s port is used for %s\'s %s' % (server, port, ports[port]['server'], ports[port]['key']))
critical(EC_CONFIG_CONFLICT_PORT.format(server1=server, port=port, server2=ports[port]['server'], key=ports[port]['key']))
continue
ports[port] = {
'server': server,
......@@ -214,7 +216,7 @@ def _start_check(plugin_context, strict_check=False, *args, **kwargs):
free_memory = parse_size(str(v))
total_use = servers_memory[ip]['percentage'] * total_memory / 100 + servers_memory[ip]['num']
if total_use > free_memory:
critical('(%s) not enough memory. (Free: %s, Need: %s)' % (ip, formate_size(free_memory), formate_size(total_use)))
stdio.error(EC_OBSERVER_NOT_ENOUGH_MEMORY.formate(ip=ip, free=format_size(free_memory), need=format_size(total_use)))
# disk
disk = {'/': 0}
ret = client.execute_command('df --block-size=1024')
......@@ -275,11 +277,12 @@ def _start_check(plugin_context, strict_check=False, *args, **kwargs):
if need > 0 and threshold < 2:
alert('(%s) clog and data use the same disk (%s)' % (ip, p))
if need > avail:
critical('(%s) %s not enough disk space. (Avail: %s, Need: %s)' % (ip, p, formate_size(avail), formate_size(need)))
critical('(%s) %s not enough disk space. (Avail: %s, Need: %s)' % (ip, p, format_size(avail), format_size(need)))
elif 1.0 * (total - avail + need) / total > disk[p]['threshold']:
msg = '(%s) %s not enough disk space for clog. Use `redo_dir` to set other disk for clog' % (ip, p)
msg += ', or reduce the value of `datafile_size`' if need > 0 else '.'
critical(msg)
# msg = '(%s) %s not enough disk space for clog. Use `redo_dir` to set other disk for clog' % (ip, p)
# msg += ', or reduce the value of `datafile_size`' if need > 0 else '.'
# critical(msg)
critical(EC_OBSERVER_NOT_ENOUGH_DISK_4_CLOG.format(ip=ip, path=p))
if success:
for ip in servers_net_inferface:
......
......@@ -44,13 +44,16 @@ def get_port_socket_inode(client, port):
return res.stdout.strip().split('\n')
def confirm_port(client, pid, port):
def port_release_check(client, pid, port, count):
socket_inodes = get_port_socket_inode(client, port)
if not socket_inodes:
return False
ret = client.execute_command("ls -l /proc/%s/fd/ |grep -E 'socket:\[(%s)\]'" % (pid, '|'.join(socket_inodes)))
if ret and ret.stdout.strip():
return True
if count < 5:
ret = client.execute_command("ls -l /proc/%s/fd/ |grep -E 'socket:\[(%s)\]'" % (pid, '|'.join(socket_inodes)))
if ret:
return not ret.stdout.strip()
else:
return not client.execute_command("ls -l /proc/%s" % pid)
return False
......@@ -97,7 +100,6 @@ def stop(plugin_context, *args, **kwargs):
else:
stdio.verbose('%s observer is not running ...' % server)
count = 30
check = lambda client, pid, port: confirm_port(client, pid, port) if count < 5 else get_port_socket_inode(client, port)
time.sleep(1)
while count and servers:
tmp_servers = {}
......@@ -106,7 +108,7 @@ def stop(plugin_context, *args, **kwargs):
client = clients[server]
stdio.verbose('%s check whether the port is released' % server)
for key in ['rpc_port', 'mysql_port']:
if data[key] and check(data['client'], data['pid'], data[key]):
if data[key] and not port_release_check(data['client'], data['pid'], data[key], count):
tmp_servers[server] = data
break
data[key] = ''
......
......@@ -137,7 +137,7 @@ class Upgrader(object):
self._connect_plugin = None
self._start_plugin = None
self._stop_plugin = None
self._display_plguin = None
self._display_plugin = None
self.local_home_path = local_home_path
self.exector_path = exector_path
self.components = plugin_context.components
......@@ -193,15 +193,15 @@ class Upgrader(object):
@property
def display_plugin(self):
if self._display_plguin is None:
self._display_plguin = self.search_py_script_plugin(self.route_index - 1, 'display')
return self._display_plguin
if self._display_plugin is None:
self._display_plugin = self.search_py_script_plugin(self.route_index - 1, 'display')
return self._display_plugin
def _clear_plugin(self):
self._connect_plugin = None
self._start_plugin = None
self._stop_plugin = None
self._display_plguin = None
self._display_plugin = None
def run(self):
total = len(self.route)
......@@ -392,7 +392,7 @@ class Upgrader(object):
client = self.clients[server]
server_config = self.cluster_config.get_server_conf(server)
home_path = server_config['home_path']
remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
remote_home_path = client.execute_command('echo ${OBD_HOME:-"$HOME"}/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(self.local_home_path, remote_home_path)
client.execute_command("bash -c 'mkdir -p %s/{bin,lib}'" % (home_path))
client.execute_command("ln -fs %s/bin/* %s/bin" % (remote_repository_dir, home_path))
......
......@@ -83,7 +83,7 @@ def upgrade_check(plugin_context, current_repository, repositories, route, curso
succeed = False
stdio.error('No such file: %s .' % path)
if cant_use:
stdio.error('%s 不可用于升级,可以使用--disable禁用该镜像' % repository)
stdio.error('%s cannot be used for the upgrade. You can use the --disable option to disable the image.' % repository)
i += 1
if succeed:
......
......@@ -55,7 +55,7 @@ def upgrade_file_check(plugin_context, current_repository, repositories, route,
succeed = False
stdio.error('No such file: %s .' % path)
if cant_use:
stdio.error('%s 不可用于升级,可以使用--disable禁用该镜像' % repository)
stdio.error('%s cannot be used for the upgrade. You can use the --disable option to disable the image.' % repository)
i += 1
if succeed:
......
......@@ -170,7 +170,7 @@ def run_test(plugin_context, db, cursor, odp_db, odp_cursor=None, *args, **kwarg
tenant_variables_done = []
odp_configs = [
# [配置名, 新值, 旧值, 替换条件: lambda n, o: n != o]
['enable_compression_protocol', False, False, lambda n, o: n != o],
# ['enable_compression_protocol', False, False, lambda n, o: n != o],
['proxy_mem_limited', format_size(min(max(threads * (8 << 10), 2 << 30), 4 << 30), 0), 0, lambda n, o: parse_size(n) > parse_size(o)],
['enable_prometheus', False, False, lambda n, o: n != o],
['enable_metadb_used', False, False, lambda n, o: n != o],
......
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
try:
import subprocess32 as subprocess
except:
import subprocess
import os
import time
import re
from ssh import LocalClient
def build(plugin_context, cursor, odp_cursor, *args, **kwargs):
def get_option(key, default=''):
value = getattr(options, key, default)
if value is None:
value = default
stdio.verbose('get option: {} value {}'.format(key, value))
return value
def local_execute_command(command, env=None, timeout=None):
return LocalClient.execute_command(command, env, timeout, stdio)
def run_sql(sql_file, force=False):
sql_cmd = "{obclient} -h{host} -P{port} -u{user}@{tenant} {password_arg} -A {db} {force_flag} < {sql_file}".format(
obclient=obclient_bin, host=host, port=port, user=user, tenant=tenant_name,
password_arg=("-p'%s'" % password) if password else '',
db=db_name,
force_flag='-f' if force else '',
sql_file=sql_file)
return local_execute_command(sql_cmd)
def get_table_rows(table_name):
table_rows = 0
ret = local_execute_command('%s "%s" -E' % (exec_sql_cmd, 'select count(*) from %s' % table_name))
matched = re.match(r'.*count\(\*\):\s?(\d+)', ret.stdout, re.S)
if matched:
table_rows = int(matched.group(1))
return table_rows
stdio = plugin_context.stdio
options = plugin_context.options
bmsql_jar = get_option('bmsql_jar')
bmsql_libs = get_option('bmsql_libs')
host = get_option('host', '127.0.0.1')
port = get_option('port', 2881)
db_name = get_option('database', 'test')
user = get_option('user', 'root')
password = get_option('password', '')
tenant_name = get_option('tenant', 'test')
obclient_bin = get_option('obclient_bin', 'obclient')
java_bin = get_option('java_bin', 'java')
bmsql_classpath = kwargs.get('bmsql_classpath')
if not bmsql_classpath:
jars = [bmsql_jar]
jars.extend(bmsql_libs.split(','))
bmsql_classpath = '.:' + ':'.join(jars)
bmsql_prop_path = kwargs.get('bmsql_prop_path')
stdio.verbose('get bmsql_prop_path: {}'.format(bmsql_prop_path))
warehouses = kwargs.get('warehouses', 0)
stdio.verbose('Check connect ready')
exec_sql_cmd = "%s -h%s -P%s -u%s@%s %s -A %s -e" % (
obclient_bin, host, port, user, tenant_name, ("-p'%s'" % password) if password else '', db_name)
stdio.start_loading('Connect to tenant %s' % tenant_name)
try:
while True:
ret = local_execute_command('%s "%s" -E' % (exec_sql_cmd, 'select version();'))
if ret:
break
time.sleep(10)
stdio.stop_loading('succeed')
except:
stdio.stop_loading('fail')
stdio.exception('')
return
# drop old tables
bmsql_sql_path = kwargs.get('bmsql_sql_path', '')
run_sql(sql_file=os.path.join(bmsql_sql_path, 'tableDrops.sql'), force=True)
retries = 300
pending_free_count = -1
while pending_free_count != 0 and retries > 0:
retries -= 1
sql = 'select pending_free_count from oceanbase.__all_virtual_macro_block_marker_status'
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
ret = cursor.fetchone()
stdio.verbose('sql result: %s' % ret)
pending_free_count = ret.get('pending_free_count', 0) if ret else 0
time.sleep(1)
# create new tables
if not run_sql(sql_file=os.path.join(bmsql_sql_path, 'tableCreates.sql')):
stdio.error('create tables failed')
return False
# load data
stdio.verbose('Start to load data.')
cmd = '{java_bin} -cp {cp} -Dprop={prop} LoadData'.format(java_bin=java_bin, cp=bmsql_classpath, prop=bmsql_prop_path)
stdio.start_progressbar('Load data ', warehouses, widget_type='simple_progress')
try:
stdio.verbose('local execute: %s' % cmd)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while p.poll() is None:
count = get_table_rows('bmsql_warehouse')
if count:
stdio.update_progressbar(min(count, warehouses - 1))
time.sleep(10)
code = p.returncode
output = p.stdout.read().decode()
verbose_msg = 'exited code %s' % code
verbose_msg += ', output:\n%s' % output
except:
output = ''
code = 255
verbose_msg = 'unknown error'
stdio.exception('')
stdio.verbose(verbose_msg)
if code != 0:
stdio.interrupt_progressbar()
stdio.error('Failed to load data.')
return
if re.match(r'.*Worker \d+: ERROR: .*', output, re.S):
stdio.interrupt_progressbar()
stdio.error('Failed to load data.')
return
stdio.finish_progressbar()
# create index
stdio.start_loading('create index')
if not run_sql(sql_file=os.path.join(bmsql_sql_path, 'indexCreates.sql')):
stdio.error('create index failed')
stdio.stop_loading('fail')
return
stdio.stop_loading('succeed')
# build finish
stdio.start_loading('finish build')
if not run_sql(sql_file=os.path.join(bmsql_sql_path, 'buildFinish.sql')):
stdio.error('finish build failed')
stdio.stop_loading('fail')
return
stdio.stop_loading('succeed')
# check result
stdio.start_loading('check data')
try:
assert get_table_rows('bmsql_warehouse') == warehouses, Exception('warehouse num wrong')
assert get_table_rows('bmsql_district') == warehouses * 10, Exception('district num wrong')
stdio.stop_loading('succeed')
except Exception as e:
stdio.stop_loading('fail')
stdio.verbose(e)
stdio.error('check data failed.')
return
return plugin_context.return_true()
此差异已折叠。
此差异已折叠。
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
from time import sleep
def recover(plugin_context, cursor, odp_cursor, *args, **kwargs):
def get_option(key, default=''):
value = getattr(options, key, default)
if value is None:
value = default
return value
def execute(cursor, query, args=None):
msg = query % tuple(args) if args is not None else query
stdio.verbose('execute sql: %s' % msg)
stdio.verbose("query: %s. args: %s" % (query, args))
try:
cursor.execute(query, args)
return cursor.fetchone()
except:
msg = 'execute sql exception: %s' % msg
stdio.exception(msg)
raise Exception(msg)
global stdio
stdio = plugin_context.stdio
options = plugin_context.options
optimization = get_option('optimization') > 0
tenant_name = get_option('tenant', 'test')
tenant_variables_done = kwargs.get('tenant_variables_done', [])
system_configs_done = kwargs.get('system_configs_done', [])
odp_configs_done = kwargs.get('odp_configs_done', [])
tenant_id = kwargs.get('tenant_id')
stdio.verbose(cursor)
stdio.verbose(vars(cursor))
if optimization:
stdio.start_loading('Recover')
update_sql_t = "ALTER TENANT %s SET VARIABLES %%s = %%%%s" % tenant_name
tenant_q = ' tenant="%s"' % tenant_name
if not tenant_id:
sql = "select * from oceanbase.gv$tenant where tenant_name = %s"
stdio.verbose('execute sql: %s' % (sql % tenant_name))
cursor.execute(sql, [tenant_name])
tenant_meta = cursor.fetchone()
if not tenant_meta:
stdio.error('Tenant %s not exists. Use `obd cluster tenant create` to create tenant.' % tenant_name)
return
for config in tenant_variables_done[::-1]:
if config[3](config[1], config[2]):
sql = update_sql_t % config[0]
execute(cursor, sql, [config[2]])
for config in system_configs_done[::-1]:
if config[0] == 'sleep':
sleep(config[1])
continue
if config[3](config[1], config[2]):
sql = 'alter system set %s=%%s' % config[0]
if config[4]:
sql += tenant_q
execute(cursor, sql, [config[2]])
if odp_cursor:
for config in odp_configs_done[::-1]:
if config[3](config[1], config[2]):
sql = 'alter proxyconfig set %s=%%s' % config[0]
execute(odp_cursor, sql, [config[2]])
stdio.stop_loading('succeed')
return plugin_context.return_true()
# log4j.rootLogger=TRACE, CONSOLE, E, T
log4j.rootLogger=INFO, CONSOLE, E
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=INFO
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern= %d{HH:mm:ss,SSS} [%t] %-5p %x %C{1} : %m%n
log4j.appender.E=org.apache.log4j.RollingFileAppender
log4j.appender.E.Threshold=WARN
log4j.appender.E.File=benchmarksql-error.log
log4j.appender.E.MaxFileSize=100MB
log4j.appender.E.MaxBackupIndex=1
log4j.appender.E.layout=org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern= %d{HH:mm:ss,SSS} [%t] %-5p %x %C{1} : %m%n
log4j.appender.T=org.apache.log4j.FileAppender
log4j.appender.T.Threshold=TRACE
log4j.appender.T.File=benchmarksql-trace.log
log4j.appender.T.append=false
log4j.appender.T.layout=org.apache.log4j.PatternLayout
log4j.appender.T.layout.ConversionPattern= %d{HH:mm:ss,SSS} [%t] %-5p %x %C{1} : %m%n
此差异已折叠。
此差异已折叠。
-- ----
-- Extra commands to run after the tables are created, loaded,
-- indexes built and extra's created.
-- ----
create index bmsql_customer_idx1 on bmsql_customer (c_w_id, c_d_id, c_last, c_first) local;
create index bmsql_oorder_idx1 on bmsql_oorder (o_w_id, o_d_id, o_carrier_id, o_id) local;
\ No newline at end of file
alter table bmsql_customer drop index bmsql_customer_idx1;
alter table bmsql_oorder drop index bmsql_oorder_idx1;
\ No newline at end of file
此差异已折叠。
drop table bmsql_config;
drop table bmsql_new_order;
drop table bmsql_order_line;
drop table bmsql_oorder;
drop table bmsql_history;
drop table bmsql_customer;
drop table bmsql_stock;
drop table bmsql_item;
drop table bmsql_district;
drop table bmsql_warehouse;
drop tablegroup tpcc_group;
\ No newline at end of file
create tablegroup if not exists tpch_tg_lineitem_order_group binding true partition by key 1 partitions 192;
create tablegroup if not exists tpch_tg_partsupp_part binding true partition by key 1 partitions 192;
drop table if exists lineitem;
drop table if exists orders;
drop table if exists partsupp;
drop table if exists part;
drop table if exists customer;
drop table if exists supplier;
drop table if exists nation;
drop table if exists region;
drop tablegroup if exists tpch_tg_lineitem_order_group;
drop tablegroup if exists tpch_tg_partsupp_part;
create tablegroup if not exists tpch_tg_lineitem_order_group binding true partition by key 1 partitions cpu_num;
create tablegroup if not exists tpch_tg_partsupp_part binding true partition by key 1 partitions cpu_num;
drop table if exists lineitem;
create table lineitem (
......@@ -21,7 +32,7 @@ drop table if exists lineitem;
l_comment varchar(44) default null,
primary key(l_orderkey, l_linenumber))
tablegroup = tpch_tg_lineitem_order_group
partition by key (l_orderkey) partitions 192;
partition by key (l_orderkey) partitions cpu_num;
create index I_L_ORDERKEY on lineitem(l_orderkey) local;
create index I_L_SHIPDATE on lineitem(l_shipdate) local;
......@@ -38,7 +49,7 @@ drop table if exists orders;
o_comment varchar(79) default null,
primary key (o_orderkey))
tablegroup = tpch_tg_lineitem_order_group
partition by key(o_orderkey) partitions 192;
partition by key(o_orderkey) partitions cpu_num;
create index I_O_ORDERDATE on orders(o_orderdate) local;
......@@ -51,7 +62,7 @@ drop table if exists partsupp;
ps_comment varchar(199) default null,
primary key (ps_partkey, ps_suppkey))
tablegroup tpch_tg_partsupp_part
partition by key(ps_partkey) partitions 192;
partition by key(ps_partkey) partitions cpu_num;
drop table if exists part;
......@@ -67,7 +78,7 @@ drop table if exists part;
p_comment varchar(23) default null,
primary key (p_partkey))
tablegroup tpch_tg_partsupp_part
partition by key(p_partkey) partitions 192;
partition by key(p_partkey) partitions cpu_num;
drop table if exists customer;
......@@ -81,7 +92,7 @@ drop table if exists customer;
c_mktsegment char(10) default null,
c_comment varchar(117) default null,
primary key (c_custkey))
partition by key(c_custkey) partitions 192;
partition by key(c_custkey) partitions cpu_num;
drop table if exists supplier;
create table supplier (
......@@ -93,7 +104,7 @@ drop table if exists supplier;
s_acctbal bigint default null,
s_comment varchar(101) default null,
primary key (s_suppkey)
) partition by key(s_suppkey) partitions 192;
) partition by key(s_suppkey) partitions cpu_num;
drop table if exists nation;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册