提交 ced0d500 编写于 作者: F frf12

v1.6.0

上级 b3ceb6cc
......@@ -25,15 +25,17 @@ import os
import sys
import time
import logging
import textwrap
from logging import handlers
from uuid import uuid1 as uuid
from optparse import OptionParser, OptionGroup, BadOptionError, Option
from optparse import OptionParser, OptionGroup, BadOptionError, Option, IndentedHelpFormatter
from core import ObdHome
from _stdio import IO
from log import Logger
from tool import DirectoryUtil, FileUtil, COMMAND_ENV
from _errno import DOC_LINK_MSG, LockError
from _environ import ENV_DEV_MODE
ROOT_IO = IO(1)
......@@ -49,7 +51,32 @@ FORBIDDEN_VARS = (CONST_OBD_HOME, CONST_OBD_INSTALL_PRE)
OBD_HOME_PATH = os.path.join(os.environ.get(CONST_OBD_HOME, os.getenv('HOME')), '.obd')
COMMAND_ENV.load(os.path.join(OBD_HOME_PATH, '.obd_environ'), ROOT_IO)
DEV_MODE = "OBD_DEV_MODE"
class OptionHelpFormatter(IndentedHelpFormatter):
def format_option(self, option):
result = []
opts = self.option_strings[option]
opt_width = self.help_position - self.current_indent - 2
if len(opts) > opt_width:
opts = "%*s%s\n" % (self.current_indent, "", opts)
indent_first = self.help_position
else: # start help on same line as opts
opts = "%*s%-*s " % (self.current_indent, "", opt_width, opts)
indent_first = 0
result.append(opts)
if option.help:
help_text = self.expand_default(option)
help_lines = help_text.split('\n')
if len(help_lines) == 1:
help_lines = textwrap.wrap(help_text, self.help_width)
result.append("%*s%s\n" % (indent_first, "", help_lines[0]))
result.extend(["%*s%s\n" % (self.help_position, "", line)
for line in help_lines[1:]])
elif opts[-1] != "\n":
result.append("\n")
return "".join(result)
class AllowUndefinedOptionParser(OptionParser):
......@@ -66,12 +93,15 @@ class AllowUndefinedOptionParser(OptionParser):
add_help_option=True,
prog=None,
epilog=None,
allow_undefine=True):
allow_undefine=True,
undefine_warn=True
):
OptionParser.__init__(
self, usage, option_list, option_class, version, conflict_handler,
description, formatter, add_help_option, prog, epilog
)
self.allow_undefine = allow_undefine
self.undefine_warn = undefine_warn
def warn(self, msg, file=None):
if self.IS_TTY:
......@@ -88,7 +118,7 @@ class AllowUndefinedOptionParser(OptionParser):
key = e.opt_str
value = value[len(key)+1:]
setattr(values, key.strip('-').replace('-', '_'), value if value != '' else True)
return self.warn(e)
self.undefine_warn and self.warn(e)
else:
raise e
......@@ -101,7 +131,7 @@ class AllowUndefinedOptionParser(OptionParser):
key = e.opt_str
value = value[len(key)+1:]
setattr(values, key.strip('-').replace('-', '_'), value if value != '' else True)
return self.warn(e)
self.undefine_warn and self.warn(e)
else:
raise e
......@@ -148,7 +178,7 @@ class BaseCommand(object):
self.parser.exit(1)
def _mk_usage(self):
return self.parser.format_help()
return self.parser.format_help(OptionHelpFormatter())
class ObdCommand(BaseCommand):
......@@ -163,7 +193,7 @@ class ObdCommand(BaseCommand):
version_fobj.seek(0)
version = version_fobj.read()
if VERSION != version:
for part in ['plugins', 'config_parser', 'mirror/remote']:
for part in ['plugins', 'config_parser', 'optimize', 'mirror/remote']:
obd_part_dir = os.path.join(self.OBD_PATH, part)
if DirectoryUtil.mkdir(self.OBD_PATH):
root_part_path = os.path.join(self.OBD_INSTALL_PRE, 'usr/obd/', part)
......@@ -177,14 +207,11 @@ class ObdCommand(BaseCommand):
@property
def dev_mode(self):
return COMMAND_ENV.get(DEV_MODE) == "1"
return COMMAND_ENV.get(ENV_DEV_MODE) == "1"
def parse_command(self):
self.parser.allow_undefine = self.dev_mode
return super(ObdCommand, self).parse_command()
def parse_command(self):
self.parser.allow_undefine = self.dev_mode
if self.parser.allow_undefine != True:
self.parser.allow_undefine = self.dev_mode
return super(ObdCommand, self).parse_command()
def do_command(self):
......@@ -196,11 +223,7 @@ class ObdCommand(BaseCommand):
log_dir = os.path.join(self.OBD_PATH, 'log')
DirectoryUtil.mkdir(log_dir)
log_path = os.path.join(log_dir, 'obd')
logger = Logger('obd')
handler = handlers.TimedRotatingFileHandler(log_path, when='midnight', interval=1, backupCount=30)
handler.setFormatter(logging.Formatter("[%%(asctime)s.%%(msecs)03d] [%s] [%%(levelname)s] %%(message)s" % trace_id, "%Y-%m-%d %H:%M:%S"))
logger.addHandler(handler)
ROOT_IO.trace_logger = logger
ROOT_IO.init_trace_logger(log_path, 'obd', trace_id)
obd = ObdHome(self.OBD_PATH, self.dev_mode, ROOT_IO)
ROOT_IO.track_limit += 1
ROOT_IO.verbose('cmd: %s' % self.cmds)
......@@ -294,7 +317,7 @@ class DevModeEnableCommand(HiddenObdCommand):
super(DevModeEnableCommand, self).__init__('enable', 'Enable Dev Mode')
def _do_command(self, obd):
if COMMAND_ENV.set(DEV_MODE, "1", save=True, stdio=obd.stdio):
if COMMAND_ENV.set(ENV_DEV_MODE, "1", save=True, stdio=obd.stdio):
obd.stdio.print("Dev Mode: ON")
return True
return False
......@@ -306,7 +329,7 @@ class DevModeDisableCommand(HiddenObdCommand):
super(DevModeDisableCommand, self).__init__('disable', 'Disable Dev Mode')
def _do_command(self, obd):
if COMMAND_ENV.set(DEV_MODE, "0", save=True, stdio=obd.stdio):
if COMMAND_ENV.set(ENV_DEV_MODE, "0", save=True, stdio=obd.stdio):
obd.stdio.print("Dev Mode: OFF")
return True
return False
......@@ -434,6 +457,11 @@ class MirrorListCommand(ObdCommand):
def __init__(self):
super(MirrorListCommand, self).__init__('list', 'List mirrors.')
def init(self, cmd, args):
super(MirrorListCommand, self).init(cmd, args)
self.parser.set_usage('%s [section name] [options]\n\nExample: %s local' % (self.prev_cmd, self.prev_cmd))
return self
def show_pkg(self, name, pkgs):
ROOT_IO.print_list(
pkgs,
......@@ -469,6 +497,7 @@ class MirrorListCommand(ObdCommand):
lambda x: [x.section_name, x.mirror_type.value, x.enabled, time.strftime("%Y-%m-%d %H:%M", time.localtime(x.repo_age))],
title='Mirror Repository List'
)
ROOT_IO.print("Use `obd mirror list <section name>` for more details")
return True
......@@ -588,6 +617,25 @@ class ClusterCheckForOCPChange(ClusterMirrorCommand):
return self._show_help()
class DemoCommand(ClusterMirrorCommand):
def __init__(self):
super(DemoCommand, self).__init__('demo', 'Quickly start')
self.parser.add_option('-c', '--components', type='string', help="List the components. Multiple components are separated with commas. [oceanbase-ce,obproxy-ce,obagent,prometheus,grafana]\nExample: \nstart oceanbase-ce: obd demo -c oceanbase-ce\n"
+ "start -c oceanbase-ce V3.2.3: obd demo -c oceanbase-ce --oceanbase-ce.version=3.2.3\n"
+ "start oceanbase-ce and obproxy-ce: obd demo -c oceanbase-ce,obproxy-ce", default='oceanbase-ce,obproxy-ce,obagent,prometheus,grafana')
self.parser.allow_undefine = True
self.parser.undefine_warn = False
def _do_command(self, obd):
setattr(self.opts, 'mini', True)
setattr(self.opts, 'force', True)
setattr(self.opts, 'clean', True)
setattr(self.opts, 'force', True)
setattr(self.opts, 'force_delete', True)
return obd.demo(self.opts)
class ClusterAutoDeployCommand(ClusterMirrorCommand):
def __init__(self):
......@@ -931,6 +979,7 @@ class MySQLTestCommand(TestMirrorCommand):
self.parser.add_option('--log-pattern', type='string', help='The pattern for collected servers log ', default='*.log')
self.parser.add_option('--cluster-mode', type='string', help="The mode of mysqltest")
self.parser.add_option('--disable-reboot', action='store_true', help='Never reboot during test.', default=False)
self.parser.add_option('--fast-reboot', action='store_true', help='Reboot using snapshots.', default=False)
def _do_command(self, obd):
if self.cmds:
......@@ -955,14 +1004,16 @@ class SysBenchCommand(TestMirrorCommand):
self.parser.add_option('--sysbench-script-dir', type='string', help='The directory of the sysbench lua script file. [/usr/sysbench/share/sysbench]', default='/usr/sysbench/share/sysbench')
self.parser.add_option('--table-size', type='int', help='Number of data initialized per table. [20000]', default=20000)
self.parser.add_option('--tables', type='int', help='Number of initialization tables. [30]', default=30)
self.parser.add_option('--threads', type='int', help='Number of threads to use. [32]', default=16)
self.parser.add_option('--threads', type='int', help='Number of threads to use. [16]', default=16)
self.parser.add_option('--time', type='int', help='Limit for total execution time in seconds. [60]', default=60)
self.parser.add_option('--interval', type='int', help='Periodically report intermediate statistics with a specified time interval in seconds. 0 disables intermediate reports. [10]', default=10)
self.parser.add_option('--events', type='int', help='Limit for total number of events.')
self.parser.add_option('--rand-type', type='string', help='Random numbers distribution {uniform,gaussian,special,pareto}.')
self.parser.add_option('--percentile', type='int', help='Percentile to calculate in latency statistics. Available values are 1-100. 0 means to disable percentile calculations.')
self.parser.add_option('--skip-trx', dest='{on/off}', type='string', help='Open or close a transaction in a read-only test. ')
self.parser.add_option('--skip-trx', type='string', help='Open or close a transaction in a read-only test. {on/off}')
self.parser.add_option('-O', '--optimization', type='int', help='optimization level {0/1}', default=1)
self.parser.add_option('-S', '--skip-cluster-status-check', action='store_true', help='Skip cluster status check', default=False)
self.parser.add_option('--mysql-ignore-errors', type='string', help='list of errors to ignore, or "all". ', default='1062')
def _do_command(self, obd):
if self.cmds:
......@@ -993,6 +1044,7 @@ class TPCHCommand(TestMirrorCommand):
self.parser.add_option('--dss-config', type='string', help='Directory for dists.dss. [/usr/tpc-h-tools/tpc-h-tools]', default='/usr/tpc-h-tools/tpc-h-tools/')
self.parser.add_option('-O', '--optimization', type='int', help='Optimization level {0/1}. [1]', default=1)
self.parser.add_option('--test-only', action='store_true', help='Only testing SQLs are executed. No initialization is executed.')
self.parser.add_option('-S', '--skip-cluster-status-check', action='store_true', help='Skip cluster status check', default=False)
def _do_command(self, obd):
if self.cmds:
......@@ -1001,6 +1053,41 @@ class TPCHCommand(TestMirrorCommand):
return self._show_help()
class TPCDSCommand(TestMirrorCommand):
def __init__(self):
super(TPCDSCommand, self).__init__('tpcds', 'Run a TPC-DS test for a deployment.')
self.parser.add_option('--component', type='string', help='Components for a test.')
self.parser.add_option('--test-server', type='string', help='The server for a test. By default, the first root server in the component is the test server.')
self.parser.add_option('--user', type='string', help='Username for a test.')
self.parser.add_option('--password', type='string', help='Password for a test.')
self.parser.add_option('-t', '--tenant', type='string', help='Tenant for a test. [test]', default='test')
self.parser.add_option('--mode', type='string', help='Tenant compatibility mode. {mysql,oracle} [mysql]', default='mysql')
self.parser.add_option('--database', type='string', help='Database for a test. [test]', default='test')
self.parser.add_option('--obclient-bin', type='string', help='OBClient bin path. [obclient]', default='obclient')
self.parser.add_option('--tool-dir', type='string', help='tpc-ds tool dir. [/usr/tpc-ds-tools]')
self.parser.add_option('--dsdgen-bin', type='string', help='dsdgen bin path. [$TOOL_DIR/bin/dsdgen]')
self.parser.add_option('--idx-file', type='string', help='tpcds.idx file path. [$TOOL_DIR/bin/tpcds.idx]')
self.parser.add_option('--dsqgen-bin', type='string', help='dsqgen bin path. [$TOOL_DIR/bin/dsqgen]')
self.parser.add_option('--query-templates-dir', type='string', help='Query templates dir. [$TOOL_DIR/query_templates]')
self.parser.add_option('-s', '--scale', type='int', help='Set Scale Factor (SF) to <n>. [1] ', default=1)
self.parser.add_option('--disable-generate', '--dg', action='store_true', help='Do not generate test data.')
self.parser.add_option('-p', '--generate-parallel', help='Generate data parallel number. [0]', default=0)
self.parser.add_option('--tmp-dir', type='string', help='The temporary directory for executing TPC-H. [./tmp]', default='./tmp')
self.parser.add_option('--ddl-path', type='string', help='Directory for DDL files.')
self.parser.add_option('--sql-path', type='string', help='Directory for SQL files.')
self.parser.add_option('--create-foreign-key', '--fk', action='store_true', help='create foreign key.')
self.parser.add_option('--foreign-key-file', '--fk-file', action='store_true', help='SQL file for creating foreign key.')
self.parser.add_option('--remote-dir', type='string', help='Directory for the data file on target observers. Make sure that you have read and write access to the directory when you start observer.')
self.parser.add_option('--test-only', action='store_true', help='Only testing SQLs are executed. No initialization is executed.')
def _do_command(self, obd):
if self.cmds:
return obd.tpcds(self.cmds[0], self.opts)
else:
return self._show_help()
class TPCCCommand(TestMirrorCommand):
def __init__(self):
......@@ -1024,6 +1111,7 @@ class TPCCCommand(TestMirrorCommand):
self.parser.add_option('--run-mins', type='int', help='To run for specified minutes.[10]', default=10)
self.parser.add_option('--test-only', action='store_true', help='Only testing SQLs are executed. No initialization is executed.')
self.parser.add_option('-O', '--optimization', type='int', help='Optimization level {0/1/2}. [1] 0 - No optimization. 1 - Optimize some of the parameters which do not need to restart servers. 2 - Optimize all the parameters and maybe RESTART SERVERS for better performance.', default=1)
self.parser.add_option('-S', '--skip-cluster-status-check', action='store_true', help='Skip cluster status check', default=False)
def _do_command(self, obd):
if self.cmds:
......@@ -1040,6 +1128,7 @@ class TestMajorCommand(MajorCommand):
self.register_command(SysBenchCommand())
self.register_command(TPCHCommand())
self.register_command(TPCCCommand())
# self.register_command(TPCDSCommand())
class DbConnectCommand(HiddenObdCommand):
......@@ -1054,8 +1143,7 @@ class DbConnectCommand(HiddenObdCommand):
self.parser.add_option('-c', '--component', type='string', help='The component used by database connection.')
self.parser.add_option('-s', '--server', type='string',
help='The server used by database connection. The first server in the configuration will be used by default')
self.parser.add_option('-u', '--user', type='string', help='The username used by d'
'atabase connection. [root]', default='root')
self.parser.add_option('-u', '--user', type='string', help='The username used by database connection. [root]', default='root')
self.parser.add_option('-p', '--password', type='string', help='The password used by database connection.')
self.parser.add_option('-t', '--tenant', type='string', help='The tenant used by database connection. [sys]', default='sys')
self.parser.add_option('-D', '--database', type='string', help='The database name used by database connection.')
......@@ -1068,6 +1156,30 @@ class DbConnectCommand(HiddenObdCommand):
return self._show_help()
class DoobaCommand(HiddenObdCommand):
def init(self, cmd, args):
super(DoobaCommand, self).init(cmd, args)
self.parser.set_usage('%s <deploy name> [options]' % self.prev_cmd)
return self
def __init__(self):
super(DoobaCommand, self).__init__('dooba', 'A curses powerful tool for OceanBase admin, more than a monitor')
self.parser.add_option('-c', '--component', type='string', help='The component used by database connection.')
self.parser.add_option('-s', '--server', type='string',
help='The server used by database connection. The first server in the configuration will be used by default')
self.parser.add_option('-u', '--user', type='string', help='The username used by database connection. [root]',
default='root')
self.parser.add_option('-p', '--password', type='string', help='The password used by database connection.')
self.parser.add_option('--dooba-bin', type='string', help='Dooba bin path.')
def _do_command(self, obd):
if self.cmds:
return obd.dooba(self.cmds[0], self.opts)
else:
return self._show_help()
class CommandsCommand(HiddenObdCommand):
def init(self, cmd, args):
......@@ -1093,6 +1205,7 @@ class ToolCommand(HiddenMajorCommand):
super(ToolCommand, self).__init__('tool', 'Tools')
self.register_command(DbConnectCommand())
self.register_command(CommandsCommand())
self.register_command(DoobaCommand())
class BenchMajorCommand(MajorCommand):
......@@ -1121,6 +1234,7 @@ class MainCommand(MajorCommand):
def __init__(self):
super(MainCommand, self).__init__('obd', '')
self.register_command(DevModeMajorCommand())
self.register_command(DemoCommand())
self.register_command(MirrorMajorCommand())
self.register_command(ClusterMajorCommand())
self.register_command(RepositoryMajorCommand())
......
......@@ -22,8 +22,10 @@ from __future__ import absolute_import, division, print_function
import os
import re
import sys
import pickle
import getpass
import hashlib
from copy import deepcopy
from enum import Enum
......@@ -33,12 +35,12 @@ from tool import ConfigUtil, FileUtil, YamlLoader, OrderedDict, COMMAND_ENV
from _manager import Manager
from _repository import Repository
from _stdio import SafeStdio
from _environ import ENV_BASE_DIR
yaml = YamlLoader()
DEFAULT_CONFIG_PARSER_MANAGER = None
ENV = 'env'
BASE_DIR_KEY = "OBD_DEPLOY_BASE_DIR"
class ParserError(Exception):
......@@ -383,6 +385,30 @@ class ClusterConfig(object):
self._depends = {}
self.parser = parser
self._has_package_pattern = None
self._object_hash = None
if sys.version_info.major == 2:
def __hash__(self):
if self._object_hash is None:
m_sum = hashlib.md5()
m_sum.update(str(self.package_hash).encode('utf-8'))
m_sum.update(str(self.get_global_conf()).encode('utf-8'))
for server in self.servers:
m_sum.update(str(self.get_server_conf(server)).encode('utf-8'))
m_sum.update(str(self.depends).encode('utf-8'))
self._object_hash = int(''.join(['%03d' % ord(v) for v in m_sum.digest()]))
return self._object_hash
else:
def __hash__(self):
if self._object_hash is None:
m_sum = hashlib.md5()
m_sum.update(str(self.package_hash).encode('utf-8'))
m_sum.update(str(self.get_global_conf()).encode('utf-8'))
for server in self.servers:
m_sum.update(str(self.get_server_conf(server)).encode('utf-8'))
m_sum.update(str(self.depends).encode('utf-8'))
self._object_hash = (int(''.join(['%03d' % v for v in m_sum.digest()])))
return self._object_hash
def __eq__(self, other):
if not isinstance(other, self.__class__):
......@@ -446,6 +472,9 @@ class ClusterConfig(object):
raise Exception('Circular Dependency: %s and %s' % (self.name, name))
self._depends[name] = cluster_conf
def add_depend_component(self, depend_component_name):
return self._deploy_config.add_depend_for_component(self.name, depend_component_name, save=False)
def del_depend(self, name, component_name):
if component_name in self._depends:
del self._depends[component_name]
......@@ -468,6 +497,8 @@ class ClusterConfig(object):
return False
if server not in self._server_conf:
return False
if self._temp_conf and key in self._temp_conf:
value = self._temp_conf[key].param_type(value).value
if not self._deploy_config.update_component_server_conf(self.name, server, key, value, save):
return False
self._server_conf[server][key] = value
......@@ -478,6 +509,8 @@ class ClusterConfig(object):
def update_global_conf(self, key, value, save=True):
if self._deploy_config is None:
return False
if self._temp_conf and key in self._temp_conf:
value = self._temp_conf[key].param_type(value).value
if not self._deploy_config.update_component_global_conf(self.name, key, value, save):
return False
self._update_global_conf(key, value)
......@@ -488,7 +521,8 @@ class ClusterConfig(object):
def _update_global_conf(self, key, value):
self._original_global_conf[key] = value
self._global_conf[key] = value
if self._global_conf:
self._global_conf[key] = value
def update_rsync_list(self, rsync_list, save=True):
if self._deploy_config is None:
......@@ -558,6 +592,13 @@ class ClusterConfig(object):
self._global_conf = None
self._clear_cache_server()
def _apply_temp_conf(self, conf):
if self._temp_conf:
for key in conf:
if key in self._temp_conf:
conf[key] = self._temp_conf[key].param_type(conf[key]).value
return conf
def get_temp_conf_item(self, key):
if self._temp_conf:
return self._temp_conf.get(key)
......@@ -613,7 +654,9 @@ class ClusterConfig(object):
if self._global_conf is None:
self._global_conf = deepcopy(self._default_conf)
self._global_conf.update(self._get_include_config('config', {}))
self._global_conf.update(self._original_global_conf)
if self._original_global_conf:
self._global_conf.update(self._original_global_conf)
self._global_conf = self._apply_temp_conf(self._global_conf)
return self._global_conf
def _add_base_dir(self, path):
......@@ -622,7 +665,7 @@ class ClusterConfig(object):
path = os.path.join(self._base_dir, path)
else:
raise Exception("`{}` need to use absolute paths. If you want to use relative paths, please enable developer mode "
"and set environment variables {}".format(RsyncConfig.RSYNC, BASE_DIR_KEY))
"and set environment variables {}".format(RsyncConfig.RSYNC, ENV_BASE_DIR))
return path
@property
......@@ -717,9 +760,9 @@ class ClusterConfig(object):
if server not in self._server_conf:
return None
if self._cache_server[server] is None:
conf = deepcopy(self._inner_config.get(server.name, {}))
conf = self._apply_temp_conf(deepcopy(self._inner_config.get(server.name, {})))
conf.update(self.get_global_conf())
conf.update(self._server_conf[server])
conf.update(self._apply_temp_conf(self._server_conf[server]))
self._cache_server[server] = conf
return self._cache_server[server]
......@@ -788,7 +831,7 @@ class DeployConfig(SafeStdio):
self.stdio = stdio
self._ignore_include_error = False
if self.config_parser_manager is None:
raise ParserError('ConfigParaserManager Not Set')
raise ParserError('ConfigParserManager Not Set')
self._load()
@property
......@@ -853,32 +896,35 @@ class DeployConfig(SafeStdio):
return False
def _load(self):
with open(self.yaml_path, 'rb') as f:
depends = {}
self._src_data = self.yaml_loader.load(f)
for key in self._src_data:
if key == 'user':
self.set_user_conf(UserConfig(
ConfigUtil.get_value_from_dict(self._src_data[key], 'username'),
ConfigUtil.get_value_from_dict(self._src_data[key], 'password'),
ConfigUtil.get_value_from_dict(self._src_data[key], 'key_file'),
ConfigUtil.get_value_from_dict(self._src_data[key], 'port', 0, int),
ConfigUtil.get_value_from_dict(self._src_data[key], 'timeout', 0, int),
))
elif key == 'unuse_lib_repository':
self.unuse_lib_repository = self._src_data['unuse_lib_repository']
elif key == 'auto_create_tenant':
self.auto_create_tenant = self._src_data['auto_create_tenant']
elif issubclass(type(self._src_data[key]), dict):
self._add_component(key, self._src_data[key])
depends[key] = self._src_data[key].get('depends', [])
for comp in depends:
conf = self.components[comp]
for name in depends[comp]:
if name == comp:
continue
if name in self.components:
conf.add_depend(name, self.components[name])
try:
with open(self.yaml_path, 'rb') as f:
depends = {}
self._src_data = self.yaml_loader.load(f)
for key in self._src_data:
if key == 'user':
self.set_user_conf(UserConfig(
ConfigUtil.get_value_from_dict(self._src_data[key], 'username'),
ConfigUtil.get_value_from_dict(self._src_data[key], 'password'),
ConfigUtil.get_value_from_dict(self._src_data[key], 'key_file'),
ConfigUtil.get_value_from_dict(self._src_data[key], 'port', 0, int),
ConfigUtil.get_value_from_dict(self._src_data[key], 'timeout', 0, int),
))
elif key == 'unuse_lib_repository':
self.unuse_lib_repository = self._src_data['unuse_lib_repository']
elif key == 'auto_create_tenant':
self.auto_create_tenant = self._src_data['auto_create_tenant']
elif issubclass(type(self._src_data[key]), dict):
self._add_component(key, self._src_data[key])
depends[key] = self._src_data[key].get('depends', [])
for comp in depends:
conf = self.components[comp]
for name in depends[comp]:
if name == comp:
continue
if name in self.components:
conf.add_depend(name, self.components[name])
except:
pass
if not self.user:
self.set_user_conf(UserConfig())
......@@ -889,7 +935,7 @@ class DeployConfig(SafeStdio):
def load_include_file(self, path):
if not os.path.isabs(path):
raise Exception("`{}` need to use absolute path. If you want to use relative paths, please enable developer mode "
"and set environment variables {}".format('include', BASE_DIR_KEY))
"and set environment variables {}".format('include', ENV_BASE_DIR))
if os.path.isfile(path):
with open(path, 'rb') as f:
return self.yaml_loader.load(f)
......@@ -909,7 +955,7 @@ class DeployConfig(SafeStdio):
if parser:
inner_config = parser.extract_inner_config(cluster_config, src_data)
self.inner_config.update_component_config(component_name, inner_config)
def _dump_inner_config(self):
if self.inner_config:
self._separate_config()
......
......@@ -50,7 +50,7 @@ class InitDirFailedErrorMessage(object):
DOC_LINK = '<DOC_LINK>'
DOC_LINK_MSG = 'See {}'.format(DOC_LINK if DOC_LINK else "https://open.oceanbase.com/docs/obd-cn/V1.4.0/10000000000436999 .")
DOC_LINK_MSG = 'See {}'.format(DOC_LINK if DOC_LINK else "https://www.oceanbase.com/product/ob-deployer/error-codes .")
EC_CONFIG_CONFLICT_PORT = OBDErrorCode(1000, 'Configuration conflict {server1}:{port} port is used for {server2}\'s {key}')
EC_CONFLICT_PORT = OBDErrorCode(1001, '{server}:{port} port is already used')
......@@ -76,4 +76,4 @@ EC_OBAGENT_RELOAD_FAILED = OBDErrorCode(4000, 'Fail to reload {server}')
EC_OBAGENT_SEND_CONFIG_FAILED = OBDErrorCode(4001, 'Fail to send config file to {server}')
# WARN CODE
WC_ULIMIT_CHECK = OBDErrorCode(1007, '({server}) The recommended number of {key} is {need} (Current value: %s)')
\ No newline at end of file
WC_ULIMIT_CHECK = OBDErrorCode(1007, '({server}) The recommended number of {key} is {need} (Current value: {now})')
\ No newline at end of file
......@@ -30,7 +30,7 @@ from copy import deepcopy
from _manager import Manager
from _rpm import Version
from ssh import ConcurrentExecutor
from tool import ConfigUtil, DynamicLoading, YamlLoader
from tool import ConfigUtil, DynamicLoading, YamlLoader, FileUtil
yaml = YamlLoader()
......@@ -38,9 +38,11 @@ yaml = YamlLoader()
class PluginType(Enum):
# 插件类型 = 插件加载类
START = 'StartPlugin'
PARAM = 'ParamPlugin'
INSTALL = 'InstallPlugin'
SNAP_CONFIG = 'SnapConfigPlugin'
PY_SCRIPT = 'PyScriptPlugin'
......@@ -125,7 +127,7 @@ class PluginContext(object):
self.options = options
self.dev_mode = dev_mode
self.stdio = stdio
self.concurrent_exector = ConcurrentExecutor(32)
self.concurrent_executor = ConcurrentExecutor(32)
self._return = PluginReturn()
def get_return(self):
......@@ -265,18 +267,28 @@ class PyScriptPlugin(ScriptPlugin):
# def init(self, components, ssh_clients, cluster_config, cmd, options, stdio, *arg, **kwargs):
# pass
class Null(object):
def __init__(self):
pass
class ParamPlugin(Plugin):
class ConfigItemType(object):
TYPE_STR = None
NULL = Null()
def __init__(self, s):
try:
self._origin = s
self._value = 0
self.value = self.NULL
self._format()
if self.value == self.NULL:
self.value = self._origin
except:
raise Exception("'%s' is not %s" % (self._origin, self._type_str))
......@@ -401,10 +413,48 @@ class ParamPlugin(Plugin):
else:
self._value = []
class Dict(ConfigItemType):
def _format(self):
if self._origin:
if not isinstance(self._origin, dict):
raise Exception("Invalid Value")
self._value = self._origin
else:
self._value = self.value = {}
class List(ConfigItemType):
def _format(self):
if self._origin:
if not isinstance(self._origin, list):
raise Exception("Invalid value: {} is not a list.".format(self._origin))
self._value = self._origin
else:
self._value = self.value = []
class StringOrKvList(ConfigItemType):
def _format(self):
if self._origin:
if not isinstance(self._origin, list):
raise Exception("Invalid value: {} is not a list.".format(self._origin))
for item in self._origin:
if not item:
continue
if not isinstance(item, (str, dict)):
raise Exception("Invalid value: {} should be string or key-value format.".format(item))
if isinstance(item, dict):
if len(item.keys()) != 1:
raise Exception("Invalid value: {} should be single key-value format".format(item))
self._value = self._origin
else:
self._value = self.value = []
class Double(ConfigItemType):
def _format(self):
self._value = float(self._origin) if self._origin else 0
self.value = self._value = float(self._origin) if self._origin else 0
class Boolean(ConfigItemType):
......@@ -413,10 +463,15 @@ class ParamPlugin(Plugin):
self._value = self._origin
else:
_origin = str(self._origin).lower()
if _origin.isdigit() or _origin in ['true', 'false']:
if _origin == 'true':
self._value = True
elif _origin == 'false':
self._value = False
elif _origin.isdigit():
self._value = bool(self._origin)
else:
raise Exception('%s is not Boolean')
raise Exception('%s is not Boolean' % _origin)
self.value = self._value
class Integer(ConfigItemType):
......@@ -426,15 +481,15 @@ class ParamPlugin(Plugin):
self._origin = 0
else:
_origin = str(self._origin)
if _origin.isdigit():
self._value = int(_origin)
else:
raise Exception('%s is not Integer')
try:
self.value = self._value = int(_origin)
except:
raise Exception('%s is not Integer' % _origin)
class String(ConfigItemType):
def _format(self):
self._value = str(self._origin) if self._origin else ''
self.value = self._value = str(self._origin) if self._origin else ''
class ConfigItem(object):
......@@ -519,29 +574,35 @@ class ParamPlugin(Plugin):
'MOMENT': ParamPlugin.Moment,
'TIME': ParamPlugin.Time,
'CAPACITY': ParamPlugin.Capacity,
'STRING_LIST': ParamPlugin.StringList
'STRING_LIST': ParamPlugin.StringList,
'DICT': ParamPlugin.Dict,
'LIST': ParamPlugin.List,
'PARAM_LIST': ParamPlugin.StringOrKvList
}
self._src_data = {}
with open(self.def_param_yaml_path, 'rb') as f:
configs = yaml.load(f)
for conf in configs:
param_type = ConfigUtil.get_value_from_dict(conf, 'type', 'STRING').upper()
if param_type in TYPES:
param_type = TYPES[param_type]
else:
param_type = ParamPlugin.String
self._src_data[conf['name']] = ParamPlugin.ConfigItem(
name=conf['name'],
param_type=param_type,
default=ConfigUtil.get_value_from_dict(conf, 'default', None),
min_value=ConfigUtil.get_value_from_dict(conf, 'min_value', None),
max_value=ConfigUtil.get_value_from_dict(conf, 'max_value', None),
modify_limit=ConfigUtil.get_value_from_dict(conf, 'modify_limit', None),
require=ConfigUtil.get_value_from_dict(conf, 'require', False),
need_restart=ConfigUtil.get_value_from_dict(conf, 'need_restart', False),
need_redeploy=ConfigUtil.get_value_from_dict(conf, 'need_redeploy', False)
)
try:
param_type = ConfigUtil.get_value_from_dict(conf, 'type', 'STRING').upper()
if param_type in TYPES:
param_type = TYPES[param_type]
else:
param_type = ParamPlugin.String
self._src_data[conf['name']] = ParamPlugin.ConfigItem(
name=conf['name'],
param_type=param_type,
default=ConfigUtil.get_value_from_dict(conf, 'default', None),
min_value=ConfigUtil.get_value_from_dict(conf, 'min_value', None),
max_value=ConfigUtil.get_value_from_dict(conf, 'max_value', None),
modify_limit=ConfigUtil.get_value_from_dict(conf, 'modify_limit', None),
require=ConfigUtil.get_value_from_dict(conf, 'require', False),
need_restart=ConfigUtil.get_value_from_dict(conf, 'need_restart', False),
need_redeploy=ConfigUtil.get_value_from_dict(conf, 'need_redeploy', False)
)
except:
pass
except:
pass
return self._src_data
......@@ -590,6 +651,40 @@ class ParamPlugin(Plugin):
return self._params_default
class SnapConfigPlugin(Plugin):
PLUGIN_TYPE = PluginType.SNAP_CONFIG
CONFIG_YAML = 'snap_config.yaml'
FLAG_FILE = CONFIG_YAML
_KEYCRE = re.compile(r"\$(\w+)")
def __init__(self, component_name, plugin_path, version, dev_mode):
super(SnapConfigPlugin, self).__init__(component_name, plugin_path, version, dev_mode)
self.config_path = os.path.join(self.plugin_path, self.CONFIG_YAML)
self._config = None
self._file_hash = None
def __hash__(self):
if self._file_hash is None:
self._file_hash = int(''.join(['%03d' % (ord(v) if isinstance(v, str) else v) for v in FileUtil.checksum(self.config_path)]))
return self._file_hash
@property
def config(self):
if self._config is None:
with open(self.config_path, 'rb') as f:
self._config = yaml.load(f)
return self._config
@property
def backup(self):
return self.config.get('backup', [])
@property
def clean(self):
return self.config.get('clean', [])
class InstallPlugin(Plugin):
class FileItemType(Enum):
......@@ -621,6 +716,7 @@ class InstallPlugin(Plugin):
self.file_map_path = os.path.join(self.plugin_path, self.FILES_MAP_YAML)
self._file_map = {}
self._file_map_data = None
self._check_value = None
@classmethod
def var_replace(cls, string, var):
......@@ -644,6 +740,12 @@ class InstallPlugin(Plugin):
return ''.join(done)
@property
def check_value(self):
if self._check_value is None:
self._check_value = os.path.getmtime(self.file_map_path)
return self._check_value
@property
def file_map_data(self):
if self._file_map_data is None:
......
......@@ -25,13 +25,14 @@ import sys
import time
import hashlib
from glob import glob
from multiprocessing import cpu_count
from multiprocessing.pool import Pool
from _rpm import Package, PackageInfo, Version
from _arch import getBaseArch
from tool import DirectoryUtil, FileUtil, YamlLoader
from _manager import Manager
from _plugin import InstallPlugin
from ssh import LocalClient
class LocalPackage(Package):
......@@ -122,15 +123,7 @@ class LocalPackage(Package):
filelinktos.append(os.readlink(target_path))
filemodes.append(-24065)
else:
ret = LocalClient().execute_command('md5sum {}'.format(target_path))
if ret:
m_value = ret.stdout.strip().split(' ')[0].encode('utf-8')
else:
m = hashlib.md5()
with open(target_path, 'rb') as f:
m.update(f.read())
m_value = m.hexdigest().encode(sys.getdefaultencoding())
# raise Exception('Failed to get md5sum for {}, error: {}'.format(target_path, ret.stderr))
m_value = FileUtil.checksum(target_path)
m_sum.update(m_value)
filemd5s.append(m_value)
filelinktos.append('')
......@@ -149,6 +142,73 @@ class LocalPackage(Package):
return self.RpmObject(self.headers, self.files)
class ExtractFileInfo(object):
def __init__(self, src_path, target_path, mode):
self.src_path = src_path
self.target_path = target_path
self.mode = mode
class ParallerExtractWorker(object):
def __init__(self, pkg, files, stdio=None):
self.pkg = pkg
self.files = files
self.stdio = stdio
@staticmethod
def extract(worker):
with worker.pkg.open() as rpm:
for info in worker.files:
if os.path.exists(info.target_path):
continue
fd = rpm.extractfile(info.src_path)
with FileUtil.open(info.target_path, 'wb', stdio=worker.stdio) as f:
FileUtil.copy_fileobj(fd, f)
if info.mode != 0o744:
os.chmod(info.target_path, info.mode)
class ParallerExtractor(object):
MAX_PARALLER = cpu_count()
def __init__(self, pkg, files, stdio=None):
self.pkg = pkg
self.files = files
self.stdio = stdio
def extract(self):
workers = []
file_num = len(self.files)
paraler = min(self.MAX_PARALLER, file_num)
size = min(100, file_num / paraler)
size = max(10, size)
index = 0
while index < file_num:
p_index = index + size
workers.append(ParallerExtractWorker(
self.pkg,
self.files[index:p_index],
stdio=self.stdio
))
index = p_index
pool = Pool(processes=paraler)
try:
results = pool.map(ParallerExtractWorker.extract, workers)
for r in results:
if not r:
return False
except KeyboardInterrupt:
if pool:
pool.close()
pool = None
finally:
pool and pool.close()
class Repository(PackageInfo):
_DATA_FILE = '.data'
......@@ -251,7 +311,7 @@ class Repository(PackageInfo):
self.stdio and getattr(self.stdio, 'print', '%s is a shadow repository' % self)
return False
hash_path = os.path.join(self.repository_dir, '.hash')
if self.hash == pkg.md5 and self.file_check(plugin):
if self.hash == pkg.md5 and self.file_check(plugin) and self.install_time > plugin.check_value:
return True
self.clear()
try:
......@@ -291,6 +351,8 @@ class Repository(PackageInfo):
if path.startswith(n_dir):
need_files[path] = os.path.join(need_dirs[n_dir], path[len(n_dir):])
break
need_extract_files = []
for src_path in need_files:
if src_path not in files:
raise Exception('%s not found in packge' % src_path)
......@@ -299,17 +361,17 @@ class Repository(PackageInfo):
return
idx = files[src_path]
if filemd5s[idx]:
fd = rpm.extractfile(src_path)
self.stdio and getattr(self.stdio, 'verbose', print)('extract %s to %s' % (src_path, target_path))
with FileUtil.open(target_path, 'wb', stdio=self.stdio) as f:
FileUtil.copy_fileobj(fd, f)
mode = filemodes[idx] & 0x1ff
if mode != 0o744:
os.chmod(target_path, mode)
need_extract_files.append(ExtractFileInfo(
src_path,
target_path,
filemodes[idx] & 0x1ff
))
elif filelinktos[idx]:
links[target_path] = filelinktos[idx]
else:
raise Exception('%s is directory' % src_path)
ParallerExtractor(pkg, need_extract_files, stdio=self.stdio).extract()
for link in links:
self.stdio and getattr(self.stdio, 'verbose', print)('link %s to %s' % (links[link], link))
......
......@@ -23,9 +23,13 @@ from __future__ import absolute_import, division, print_function
import os
import signal
import sys
import fcntl
import traceback
import inspect2
import six
import logging
from copy import deepcopy
from logging import handlers
from enum import Enum
from halo import Halo, cursor
......@@ -35,6 +39,8 @@ from progressbar import AdaptiveETA, Bar, SimpleProgress, ETA, FileTransferSpeed
from types import MethodType
from inspect2 import Parameter
from log import Logger
if sys.version_info.major == 3:
raw_input = input
......@@ -55,6 +61,87 @@ class BufferIO(object):
return s
class SysStdin(object):
NONBLOCK = False
STATS = None
FD = None
@classmethod
def fileno(cls):
if cls.FD is None:
cls.FD = sys.stdin.fileno()
return cls.FD
@classmethod
def stats(cls):
if cls.STATS is None:
cls.STATS = fcntl.fcntl(cls.fileno(), fcntl.F_GETFL)
return cls.STATS
@classmethod
def nonblock(cls):
if cls.NONBLOCK is False:
fcntl.fcntl(cls.fileno(), fcntl.F_SETFL, cls.stats() | os.O_NONBLOCK)
cls.NONBLOCK = True
@classmethod
def block(cls):
if cls.NONBLOCK:
fcntl.fcntl(cls.fileno(), fcntl.F_SETFL, cls.stats())
cls.NONBLOCK = True
@classmethod
def readline(cls, blocked=False):
if blocked:
cls.block()
else:
cls.nonblock()
return cls._readline()
@classmethod
def read(cls, blocked=False):
return ''.join(cls.readlines(blocked=blocked))
@classmethod
def readlines(cls, blocked=False):
if blocked:
cls.block()
else:
cls.nonblock()
return cls._readlines()
@classmethod
def _readline(cls):
if cls.NONBLOCK:
try:
for line in sys.stdin:
return line
except IOError:
return ''
finally:
cls.block()
else:
return sys.stdin.readline()
@classmethod
def _readlines(cls):
if cls.NONBLOCK:
lines = []
try:
for line in sys.stdin:
lines.append(line)
except IOError:
pass
finally:
cls.block()
return lines
else:
return sys.stdin.readlines()
class FormtatText(object):
@staticmethod
......@@ -234,11 +321,11 @@ class IO(object):
WARNING_PREV = FormtatText.warning('[WARN]')
ERROR_PREV = FormtatText.error('[ERROR]')
IS_TTY = sys.stdin.isatty()
INPUT = SysStdin
def __init__(self,
level,
msg_lv=MsgLevel.DEBUG,
trace_logger=None,
use_cache=False,
track_limit=0,
root_io=None,
......@@ -246,7 +333,11 @@ class IO(object):
):
self.level = level
self.msg_lv = msg_lv
self.trace_logger = trace_logger
self.log_path = None
self.trace_id = None
self.log_name = 'default'
self.log_path = None
self._trace_logger = None
self._log_cache = [] if use_cache else None
self._root_io = root_io
self.track_limit = track_limit
......@@ -257,6 +348,34 @@ class IO(object):
self._cur_out_obj = self._out_obj
self._before_critical = None
def init_trace_logger(self, log_path, log_name=None, trace_id=None):
if self._trace_logger is None:
self.log_path = log_path
if trace_id:
self.trace_id = trace_id
if log_name:
self.log_name = log_name
def __getstate__(self):
state = {}
for key in self.__dict__:
state[key] = self.__dict__[key]
for key in ['_trace_logger', 'sync_obj', '_out_obj', '_cur_out_obj', '_before_critical']:
state[key] = None
return state
@property
def trace_logger(self):
if self.log_path and self._trace_logger is None:
self._trace_logger = Logger(self.log_name)
handler = handlers.TimedRotatingFileHandler(self.log_path, when='midnight', interval=1, backupCount=30)
if self.trace_id:
handler.setFormatter(logging.Formatter("[%%(asctime)s.%%(msecs)03d] [%s] [%%(levelname)s] %%(message)s" % self.trace_id, "%Y-%m-%d %H:%M:%S"))
else:
handler.setFormatter(logging.Formatter("[%%(asctime)s.%%(msecs)03d] [%%(levelname)s] %%(message)s", "%Y-%m-%d %H:%M:%S"))
self._trace_logger.addHandler(handler)
return self._trace_logger
@property
def log_cache(self):
if self._root_io:
......@@ -417,13 +536,17 @@ class IO(object):
msg_lv = self.msg_lv
key = "%s-%s" % (pid, msg_lv)
if key not in self.sub_ios:
self.sub_ios[key] = self.__class__(
sub_io = self.__class__(
self.level + 1,
msg_lv=msg_lv,
trace_logger=self.trace_logger,
track_limit=self.track_limit,
root_io=self._root_io if self._root_io else self
)
sub_io.log_name = self.log_name
sub_io.log_path = self.log_path
sub_io.trace_id = self.trace_id
sub_io._trace_logger = self.trace_logger
self.sub_ios[key] = sub_io
return self.sub_ios[key]
def print_list(self, ary, field_names=None, exp=lambda x: x if isinstance(x, (list, tuple)) else [x], show_index=False, start=0, **kwargs):
......@@ -445,11 +568,18 @@ class IO(object):
table.add_row(row)
self.print(table)
def read(self, msg='', blocked=False):
if msg:
self._print(MsgLevel.INFO, msg)
return self.INPUT.read(blocked)
def confirm(self, msg):
msg = '%s [y/n]: ' % msg
self.print(msg, end='')
if self.IS_TTY:
while True:
try:
ans = raw_input('%s [y/n]: ' % msg)
ans = raw_input()
if ans == 'y':
return True
if ans == 'n':
......@@ -595,6 +725,8 @@ class StdIO(object):
self._warn_func = getattr(self.io, "warn", print)
def __getattr__(self, item):
if item.startswith('__'):
return super(StdIO, self).__getattribute__(item)
if self.io is None:
return FAKE_RETURN
if item not in self._attrs:
......
此差异已折叠。
MySQL-python==1.2.5
pycryptodome==3.10.1
\ No newline at end of file
pycryptodome==3.10.1
bcrypt==3.1.7
\ No newline at end of file
PyMySQL==1.0.2
pycryptodome==3.10.1
\ No newline at end of file
pycryptodome==3.10.1
bcrypt==4.0.0
configparser>=5.2.0
\ No newline at end of file
......@@ -91,10 +91,10 @@ def check_opt(plugin_context, name, context, *args, **kwargs):
if not clients:
stdio.error("{} server list is empty".format(','.join(components)))
return
if servers is None:
if interactive:
servers = [None, ]
servers = cluster_config.servers[:1]
stdio.verbose("Server {} will be used according to the order in the deploy configuration yaml.".format(servers[0]))
else:
servers = list(clients.keys())
stdio.verbose("Server {} will be used because {} is a non-interactive command".format(", ".join([str(s) for s in servers]), name))
......
......@@ -87,7 +87,9 @@ def check_opt(plugin_context, opt, *args, **kwargs):
ob_component = intersection[0]
global_config = cluster_config.get_depend_config(ob_component)
else:
ob_component = opt["component"]
global_config = cluster_config.get_global_conf()
opt['is_business'] = 1 if ob_component == 'oceanbase' else 0
cursor = opt['cursor']
opt['_enable_static_typing_engine'] = None
if '_enable_static_typing_engine' in global_config:
......
......@@ -22,8 +22,11 @@ from __future__ import absolute_import, division, print_function
import re
import os
from ssh import LocalClient
import time
import hashlib
from ssh import LocalClient
from tool import FileUtil
from _errno import EC_MYSQLTEST_FAILE_NOT_FOUND, EC_MYSQLTEST_PARSE_CMD_FAILED
......@@ -58,21 +61,23 @@ def get_memory_limit(cursor, client):
return 0
def get_root_server(cursor):
try:
cursor.execute('select * from oceanbase.__all_server where status = \'active\' and with_rootserver=1')
return cursor.fetchone()
except:
pass
return None
def init(plugin_context, env, *args, **kwargs):
def get_root_server(cursor):
while True:
try:
cursor.execute('select * from oceanbase.__all_server where status = \'active\' and with_rootserver=1')
return cursor.fetchone()
except:
if load_snap:
time.sleep(0.1)
continue
return None
def exec_sql(cmd):
ret = re.match('(.*\.sql)(?:\|([^\|]*))?(?:\|([^\|]*))?', cmd)
if not ret:
stdio.error(EC_MYSQLTEST_PARSE_CMD_FAILED.format(path=cmd))
return False
return None
cmd = ret.groups()
sql_file_path1 = os.path.join(init_sql_dir, cmd[0])
sql_file_path2 = os.path.join(plugin_init_sql_dir, cmd[0])
......@@ -82,19 +87,27 @@ def init(plugin_context, env, *args, **kwargs):
sql_file_path = sql_file_path2
else:
stdio.error(EC_MYSQLTEST_FAILE_NOT_FOUND.format(file=cmd[0], path='[%s, %s]' % (init_sql_dir, plugin_init_sql_dir)))
return False
exec_sql_cmd = exec_sql_temp % (cmd[1] if cmd[1] else 'root', cmd[2] if cmd[2] else 'oceanbase', sql_file_path)
ret = LocalClient.execute_command(exec_sql_cmd, stdio=stdio)
if ret:
return True
stdio.error('Failed to Excute %s: %s' % (sql_file_path, ret.stderr.strip()))
return False
return None
if load_snap:
exec_sql_cmd = exec_sql_connect % (cmd[1] if cmd[1] else 'root')
else:
exec_sql_cmd = exec_sql_execute % (cmd[1] if cmd[1] else 'root', cmd[2] if cmd[2] else 'oceanbase', sql_file_path)
while True:
ret = LocalClient.execute_command(exec_sql_cmd, stdio=stdio)
if ret:
return sql_file_path
if load_snap:
time.sleep(0.1)
continue
stdio.error('Failed to Excute %s: %s' % (sql_file_path, ret.stderr.strip()))
return None
cluster_config = plugin_context.cluster_config
stdio = plugin_context.stdio
load_snap = env.get('load_snap', False)
cursor = env['cursor']
obclient_bin = env['obclient_bin']
mysqltest_bin = env['mysqltest_bin']
server = env['test_server']
root_server = get_root_server(cursor)
if root_server:
......@@ -105,7 +118,8 @@ def init(plugin_context, env, *args, **kwargs):
return plugin_context.return_false()
init_sql_dir = env['init_sql_dir']
plugin_init_sql_dir = os.path.join(os.path.split(__file__)[0], 'init_sql')
exec_sql_temp = obclient_bin + ' --prompt "OceanBase(\\u@\d)>" -h ' + host + ' -P ' + str(port) + ' -u%s -D%s -c < %s'
exec_sql_execute = obclient_bin + ' --prompt "OceanBase(\\u@\d)>" -h ' + host + ' -P ' + str(port) + ' -u%s -D%s -c < %s'
exec_sql_connect = obclient_bin + ' --prompt "OceanBase(\\u@\d)>" -h ' + host + ' -P ' + str(port) + ' -u%s -e "select 1 from DUAL"'
if 'init_sql_files' in env and env['init_sql_files']:
init_sql = env['init_sql_files'].split(',')
......@@ -116,15 +130,21 @@ def init(plugin_context, env, *args, **kwargs):
client = plugin_context.clients[server]
memory_limit = get_memory_limit(cursor, client)
is_mini = memory_limit and parse_size(memory_limit) < (16<<30)
if is_mini:
init_sql = [exec_mini_init, exec_init_user]
if env['is_business']:
init_sql = [exec_mini_init if is_mini else exec_init, exec_init_user_for_oracle, exec_init_user]
else:
init_sql = [exec_init, exec_init_user]
init_sql = [exec_mini_init if is_mini else exec_init, exec_init_user]
m_sum = hashlib.md5() if not load_snap else None
stdio.start_loading('Execute initialize sql')
for sql in init_sql:
if not exec_sql(sql):
sql_file_path = exec_sql(sql)
if not sql_file_path:
stdio.stop_loading('fail')
return plugin_context.return_false()
m_sum and m_sum.update(FileUtil.checksum(sql_file_path))
stdio.stop_loading('succeed')
if m_sum:
env['init_file_md5'] = m_sum.hexdigest()
return plugin_context.return_true()
......@@ -2,5 +2,3 @@ use oceanbase;
create user 'admin' IDENTIFIED BY 'admin';
grant all on *.* to 'admin' WITH GRANT OPTION;
create database obproxy;
alter system set _enable_split_partition = true;
......@@ -215,6 +215,7 @@ def run_test(plugin_context, env, *args, **kwargs):
continue
run_test_cases.append(test)
if test in reboot_cases:
stdio.print('Reboot cluster because case "{}" is in the reboot cases list.'.format(test))
need_reboot = True
if need_reboot:
need_reboot = False
......@@ -324,6 +325,11 @@ def run_test(plugin_context, env, *args, **kwargs):
opt['slave_cmp'] = 0
opt['result_file'] = result_file
if not opt['is_business']:
ce_result_file = re.sub(r'\.result$', '.ce.result', opt['result_file'])
if os.path.exists(ce_result_file):
opt['result_file'] = ce_result_file
if 'my_host' in opt or 'oracle_host' in opt:
# compare mode
pass
......@@ -335,13 +341,21 @@ def run_test(plugin_context, env, *args, **kwargs):
stdio.verbose('query engine result: {}'.format(result.stdout))
if not result:
stdio.error('engine failed, exit code %s. error msg: %s' % (result.code, result.stderr))
obmysql_ms0_dev = str(opt['host'])
if ':' in opt['host']:
# todo: obproxy没有网卡设备选项,可能会遇到问题。如果obproxy支持IPv6后续进行改造
devname = cluster_config.get_server_conf(opt['test_server']).get('devname')
if devname:
obmysql_ms0_dev = '{}%{}'.format(opt['host'], devname)
update_env = {
'OBMYSQL_PORT': str(opt['port']),
'OBMYSQL_MS0': str(opt['host']),
'OBMYSQL_MS0_DEV': obmysql_ms0_dev,
'OBMYSQL_PWD': str(opt['password']),
'OBMYSQL_USR': opt['user'],
'PATH': os.getenv('PATH'),
'OBSERVER_DIR': cluster_config.get_server_conf(opt['test_server'])['home_path']
'OBSERVER_DIR': cluster_config.get_server_conf(opt['test_server'])['home_path'],
'IS_BUSINESS': str(opt['is_business'])
}
test_env = deepcopy(os.environ.copy())
test_env.update(update_env)
......@@ -442,5 +456,4 @@ def run_test(plugin_context, env, *args, **kwargs):
# retry
is_retry = True
need_reboot = True
return return_true(finished=True)
return return_true(finished=True)
\ No newline at end of file
......@@ -19,6 +19,7 @@
from __future__ import absolute_import, division, print_function
import socket
def display(plugin_context, cursor, *args, **kwargs):
stdio = plugin_context.stdio
......@@ -34,9 +35,12 @@ def display(plugin_context, cursor, *args, **kwargs):
else:
auth = '--user %s:%s' % (config['http_basic_auth_user'], config['http_basic_auth_password'])
cmd = '''curl %s -H "Content-Type:application/json" -L "http://%s:%s/metrics/stat"''' % (auth, server.ip, config['server_port'])
ip = server.ip
if ip == '127.0.0.1':
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
result.append({
'ip': server.ip,
'ip': ip,
'status': 'active' if client.execute_command(cmd) else 'inactive',
'server_port': config['server_port'],
'pprof_port': config['pprof_port']
......
......@@ -21,7 +21,7 @@
from __future__ import absolute_import, division, print_function
def generate_config(plugin_context, deploy_config, *args, **kwargs):
def generate_config(plugin_context, deploy_config, auto_depend=False, *args, **kwargs):
cluster_config = plugin_context.cluster_config
clients = plugin_context.clients
stdio = plugin_context.stdio
......@@ -60,6 +60,11 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
cluster_config.update_server_conf(server, 'ob_monitor_status', 'inactive', False)
else:
cluster_config.update_global_conf('ob_monitor_status', 'inactive', False)
if auto_depend:
for depend in depends:
if cluster_config.add_depend_component(depend):
cluster_config.update_global_conf('ob_monitor_status', 'active', False)
break
stdio.stop_loading('succeed')
plugin_context.return_true()
......@@ -50,7 +50,17 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if need_clean:
client.execute_command("pkill -9 -u `whoami` -f '^%s/bin/monagent -c conf/monagent.yaml'" % home_path)
ret = client.execute_command('rm -fr %s' % home_path)
if client.execute_command('bash -c \'if [[ "$(ls -d {0} 2>/dev/null)" != "" && ! -O {0} ]]; then exit 0; else exit 1; fi\''.format(home_path)):
owner = client.execute_command("ls -ld %s | awk '{print $3}'" % home_path).stdout.strip()
global_ret = False
err_msg = ' {} is not empty, and the owner is {}'.format(home_path, owner)
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=err_msg))
continue
need_clean = True
if need_clean:
client.execute_command("pkill -9 -u `whoami` -f '^%s/bin/monagent -c conf/monagent.yaml'" % home_path)
ret = client.execute_command('rm -fr %s' % home_path, timeout=-1)
if not ret:
global_ret = False
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=ret.stderr))
......
......@@ -151,7 +151,7 @@
- name: monitor_password
require: false
type: STRING
default: NULL
default: ''
min_value: NULL
max_value: NULL
need_restart: false
......@@ -237,4 +237,18 @@
max_value: NULL
need_restart: false
description_en: whether to disable the basic authentication for the debug interface. True is to disable. False is to enable.
description_local: 是否禁用 debug 接口的basic auth 认证,true 表示禁用,false 表示不禁用
\ No newline at end of file
description_local: 是否禁用 debug 接口的basic auth 认证,true 表示禁用,false 表示不禁用
- name: target_sync_configs
require: false
type: PARAM_LIST
need_restart: true
description_en:
description_local: '''将地址同步至指定远端目录
target_sync_configs:
- host: 192.168.1.1
target_dir: /data/prometheus/targets
user: user1
port: 22
# password: *****
key_file: xxxxx
'''
\ No newline at end of file
......@@ -35,8 +35,7 @@ def reload(plugin_context, repository_dir, new_cluster_config, *args, **kwargs):
clients = plugin_context.clients
servers = cluster_config.servers
yaml = YamlLoader(stdio)
config_map = {
config_map = {
"monitor_password": "root_password",
"sql_port": "mysql_port",
"rpc_port": "rpc_port",
......@@ -62,13 +61,13 @@ def reload(plugin_context, repository_dir, new_cluster_config, *args, **kwargs):
with open(path) as f:
data = yaml.load(f)['configs']
for config in data:
key = config.get('value')
if key and isinstance(key, dict):
key = list(key.keys())[0]
config_kv[key] = key
value = config.get('value')
key = config.get('key')
if key and value and isinstance(value, dict):
value = list(value.keys())[0]
config_kv[value] = key
global_ret = True
stdio.start_load('Reload obagent')
stdio.start_loading('Reload obagent')
for server in servers:
change_conf = deepcopy(global_change_conf)
client = clients[server]
......@@ -111,8 +110,8 @@ def reload(plugin_context, repository_dir, new_cluster_config, *args, **kwargs):
stdio.error(EC_OBAGENT_RELOAD_FAILED.format(server=server))
if global_ret:
stdio.stop_load('succeed')
stdio.stop_loading('succeed')
return plugin_context.return_true()
else:
stdio.stop_load('fail')
stdio.stop_loading('fail')
return
......@@ -26,7 +26,7 @@ import os
class Restart(object):
def __init__(self, plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config=None, new_clients=None):
def __init__(self, plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config=None, new_clients=None, deploy_name=None):
self.local_home_path = local_home_path
self.plugin_context = plugin_context
self.components = plugin_context.components
......@@ -42,6 +42,7 @@ class Restart(object):
self.new_clients = new_clients
self.new_cluster_config = new_cluster_config
self.sub_io = self.stdio.sub_io()
self.deploy_name = deploy_name
def dir_read_check(self, client, path):
if not client.execute_command('cd %s' % path):
......@@ -70,7 +71,7 @@ class Restart(object):
cluster_config = self.new_cluster_config if self.new_cluster_config else self.cluster_config
self.stdio.verbose('Call %s for %s' % (self.start_plugin, self.repository))
if not self.start_plugin(self.components, clients, cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io, local_home_path=self.local_home_path, repository_dir=self.repository.repository_dir):
if not self.start_plugin(self.components, clients, cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io, local_home_path=self.local_home_path, repository_dir=self.repository.repository_dir, deploy_name=self.deploy_name):
self.rollback()
self.stdio.stop_loading('stop_loading', 'fail')
return False
......@@ -87,8 +88,11 @@ class Restart(object):
new_client.execute_command('sudo chown -R %s: %s' % (client.config.username, home_path))
def restart(plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config=None, new_clients=None, rollback=False, *args, **kwargs):
task = Restart(plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config, new_clients)
def restart(plugin_context, local_home_path, start_plugin, reload_plugin, stop_plugin, connect_plugin, display_plugin, repository, new_cluster_config=None, new_clients=None, rollback=False, deploy_name=None, *args, **kwargs):
task = Restart(plugin_context=plugin_context, local_home_path=local_home_path, start_plugin=start_plugin,
reload_plugin=reload_plugin, stop_plugin=stop_plugin, connect_plugin=connect_plugin,
display_plugin=display_plugin, repository=repository, new_cluster_config=new_cluster_config,
new_clients=new_clients, deploy_name=deploy_name)
call = task.rollback if rollback else task.restart
if call():
plugin_context.return_true()
......@@ -33,6 +33,7 @@ from copy import deepcopy
from Crypto import Random
from Crypto.Cipher import AES
from ssh import SshClient, SshConfig
from tool import YamlLoader
from _errno import *
......@@ -136,7 +137,7 @@ def generate_aes_b64_key():
return base64.b64encode(key.encode('utf-8'))
def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
def start(plugin_context, local_home_path, repository_dir, deploy_name=None, *args, **kwargs):
global stdio
cluster_config = plugin_context.cluster_config
clients = plugin_context.clients
......@@ -183,7 +184,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if key and isinstance(key, dict):
key = list(key.keys())[0]
need_encrypted.append(key)
targets = []
for server in cluster_config.servers:
client = clients[server]
server_config = deepcopy(cluster_config.get_server_conf(server))
......@@ -192,7 +193,8 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
home_path = server_config['home_path']
remote_pid_path = '%s/run/obagent-%s-%s.pid' % (home_path, server.ip, server_config["server_port"])
pid_path[server] = remote_pid_path
server_port = int(server_config['server_port'])
targets.append('{}:{}'.format(server.ip, server_port))
remote_pid = client.execute_command("cat %s" % pid_path[server]).stdout.strip()
if remote_pid and client.execute_command('ls /proc/%s' % remote_pid):
continue
......@@ -260,8 +262,8 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
'compress': True if server_config.get('log_compress', True) else False
},
'server': {
'address': '0.0.0.0:%d' % int(server_config.get('server_port', 8088)),
'adminAddress': '0.0.0.0:%d' % int(server_config.get('pprof_port', 8089)),
'address': '0.0.0.0:%d' % server_port,
'adminAddress': '0.0.0.0:%d' % int(server_config['pprof_port']),
'runDir': 'run'
},
'cryptoMethod': server_config['crypto_method'] if server_config.get('crypto_method').lower() in ['aes', 'plain'] else 'plain',
......@@ -305,5 +307,43 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
stdio.warn(msg)
plugin_context.return_false()
else:
global_config = cluster_config.get_global_conf()
target_sync_configs = global_config.get('target_sync_configs', [])
stdio.verbose('start to sync target config')
data = [{'targets': targets}]
default_ssh_config = None
for client in clients.values():
default_ssh_config = client.config
break
for target_sync_config in target_sync_configs:
host = None
target_dir = None
try:
host = target_sync_config.get('host')
target_dir = target_sync_config.get('target_dir')
if not host or not target_dir:
continue
ssh_config_keys = ['username', 'password', 'port', 'key_file', 'timeout']
auth_keys = ['username', 'password', 'key_file']
for key in auth_keys:
if key in target_sync_config:
config = SshConfig(host)
break
else:
config = deepcopy(default_ssh_config)
for key in ssh_config_keys:
if key in target_sync_config:
setattr(config, key, target_sync_config[key])
with tempfile.NamedTemporaryFile(suffix='.yaml') as f:
yaml.dump(data, f)
f.flush()
file_name = '{}.yaml'.format(deploy_name or hash(cluster_config))
file_path = os.path.join(target_dir, file_name)
remote_client = SshClient(config)
remote_client.connect()
remote_client.put_file(f.name, file_path)
except:
stdio.warn('failed to sync target to {}:{}'.format(host, target_dir))
stdio.exception('')
stdio.stop_loading('succeed')
plugin_context.return_true(need_bootstrap=False)
......@@ -151,7 +151,7 @@
- name: monitor_password
require: false
type: STRING
default: NULL
default: ''
min_value: NULL
max_value: NULL
need_restart: false
......@@ -264,4 +264,10 @@
max_value: NULL
need_restart: false
description_en: Working directory for OceanBase Database, needed when log alarm is enabled.
description_local: OceanBase 安装路径, 当日志报警开启时需要
\ No newline at end of file
description_local: OceanBase 安装路径, 当日志报警开启时需要
- name: target_sync_configs
require: false
type: LIST
need_restart: true
description_en:
description_local: 将地址同步至指定远端目录
\ No newline at end of file
......@@ -33,6 +33,7 @@ from copy import deepcopy
from Crypto import Random
from Crypto.Cipher import AES
from ssh import SshClient, SshConfig
from tool import YamlLoader
from _errno import *
......@@ -136,7 +137,7 @@ def generate_aes_b64_key():
return base64.b64encode(key.encode('utf-8'))
def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
def start(plugin_context, local_home_path, repository_dir, deploy_name=None, *args, **kwargs):
global stdio
cluster_config = plugin_context.cluster_config
clients = plugin_context.clients
......@@ -184,7 +185,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if key and isinstance(key, dict):
key = list(key.keys())[0]
need_encrypted.append(key)
targets = []
for server in cluster_config.servers:
client = clients[server]
server_config = deepcopy(cluster_config.get_server_conf(server))
......@@ -193,7 +194,8 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
home_path = server_config['home_path']
remote_pid_path = '%s/run/obagent-%s-%s.pid' % (home_path, server.ip, server_config["server_port"])
pid_path[server] = remote_pid_path
server_port = int(server_config['server_port'])
targets.append('{}:{}'.format(server.ip, server_port))
remote_pid = client.execute_command("cat %s" % pid_path[server]).stdout.strip()
if remote_pid and client.execute_command('ls /proc/%s' % remote_pid):
continue
......@@ -255,8 +257,8 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
'compress': True if server_config.get('log_compress', True) else False
},
'server': {
'address': '0.0.0.0:%d' % int(server_config.get('server_port', 8088)),
'adminAddress': '0.0.0.0:%d' % int(server_config.get('pprof_port', 8089)),
'address': '0.0.0.0:%d' % server_port,
'adminAddress': '0.0.0.0:%d' % int(server_config['pprof_port']),
'runDir': 'run'
},
'cryptoMethod': server_config['crypto_method'] if server_config.get('crypto_method').lower() in ['aes', 'plain'] else 'plain',
......@@ -300,5 +302,43 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
stdio.warn(msg)
plugin_context.return_false()
else:
global_config = cluster_config.get_global_conf()
target_sync_configs = global_config.get('target_sync_configs', [])
stdio.verbose('start to sync target config')
data = [{'targets': targets}]
default_ssh_config = None
for client in clients.values():
default_ssh_config = client.config
break
for target_sync_config in target_sync_configs:
host = None
target_dir = None
try:
host = target_sync_config.get('host')
target_dir = target_sync_config.get('target_dir')
if not host or not target_dir:
continue
ssh_config_keys = ['username', 'password', 'port', 'key_file', 'timeout']
auth_keys = ['username', 'password', 'key_file']
for key in auth_keys:
if key in target_sync_config:
config = SshConfig(host)
break
else:
config = deepcopy(default_ssh_config)
for key in ssh_config_keys:
if key in target_sync_config:
setattr(config, key, target_sync_config[key])
with tempfile.NamedTemporaryFile(suffix='.yaml') as f:
yaml.dump(data, f)
f.flush()
file_name = '{}.yaml'.format(deploy_name or hash(cluster_config))
file_path = os.path.join(target_dir, file_name)
remote_client = SshClient(config)
remote_client.connect(stdio=stdio)
remote_client.put_file(f.name, file_path, stdio=stdio)
except:
stdio.warn('failed to sync target to {}:{}'.format(host, target_dir))
stdio.exception('')
stdio.stop_loading('succeed')
plugin_context.return_true(need_bootstrap=False)
......@@ -44,4 +44,25 @@ def display(plugin_context, cursor, *args, **kwargs):
result.append(data)
stdio.print_list(result, ['ip', 'port', 'prometheus_port', 'status'],
lambda x: [x['ip'], x['listen_port'], x['prometheus_listen_port'], x['status']], title='obproxy')
server = servers[0]
with_observer = False
server_config = cluster_config.get_server_conf(server)
cmd = ''
for comp in ['oceanbase', 'oceanbase-ce']:
if comp in cluster_config.depends:
ob_config = cluster_config.get_depend_config(comp)
if not ob_config:
continue
password = ob_config.get('root_password', '')
with_observer = True
cmd = 'obclient -h%s -P%s -uroot %s-Doceanbase' % (server.ip, server_config['listen_port'], '-p%s ' % password if password else '')
break
if not with_observer:
password = server_config.get('obproxy_sys_password', '')
cmd = 'obclient -h%s -P%s -uroot@proxysys %s-Doceanbase' % (server.ip, server_config['listen_port'], '-p%s ' % password if password else '')
stdio.print(cmd)
plugin_context.return_true()
......@@ -21,7 +21,7 @@
from __future__ import absolute_import, division, print_function
def generate_config(plugin_context, deploy_config, *args, **kwargs):
def generate_config(plugin_context, deploy_config, auto_depend=False, *args, **kwargs):
cluster_config = plugin_context.cluster_config
clients = plugin_context.clients
stdio = plugin_context.stdio
......@@ -44,12 +44,25 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
cluster_config.update_global_conf('skip_proxy_sys_private_check', True, False)
if 'enable_strict_kernel_release' not in global_config:
cluster_config.update_global_conf('enable_strict_kernel_release', False, False)
if getattr(plugin_context.options, 'mini', False):
if 'proxy_mem_limited' not in global_config:
cluster_config.update_global_conf('proxy_mem_limited', '200M', False)
ob_comps = ['oceanbase', 'oceanbase-ce']
ob_cluster_config = None
for comp in ['oceanbase', 'oceanbase-ce']:
for comp in ob_comps:
if comp in cluster_config.depends:
stdio.stop_loading('succeed')
return plugin_context.return_true()
if comp in deploy_config.components:
ob_cluster_config = deploy_config.components[comp]
break
if auto_depend:
for depend in ['oceanbase', 'oceanbase-ce']:
if cluster_config.add_depend_component(depend):
stdio.stop_loading('succeed')
return plugin_context.return_true()
if ob_cluster_config:
root_servers = {}
......
......@@ -51,14 +51,25 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if need_clean:
client.execute_command("pkill -9 -u `whoami` -f '^bash {home_path}/obproxyd.sh {home_path} {ip} {port} daemon$'".format(home_path=home_path, ip=server.ip, port=server_config.get('listen_port')))
client.execute_command("pkill -9 -u `whoami` -f '^%s/bin/obproxy --listen_port %s'" % (home_path, server_config.get('listen_port')))
ret = client.execute_command('rm -fr %s' % home_path)
if client.execute_command('bash -c \'if [[ "$(ls -d {0} 2>/dev/null)" != "" && ! -O {0} ]]; then exit 0; else exit 1; fi\''.format(home_path)):
owner = client.execute_command("ls -ld %s | awk '{print $3}'" % home_path).stdout.strip()
global_ret = False
err_msg = ' {} is not empty, and the owner is {}'.format(home_path, owner)
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=err_msg))
continue
need_clean = True
if need_clean:
client.execute_command("pkill -9 -u `whoami` -f '^bash {home_path}/obproxyd.sh {home_path} {ip} {port} daemon$'".format(home_path=home_path, ip=server.ip, port=server_config.get('listen_port')))
client.execute_command("pkill -9 -u `whoami` -f '^%s/bin/obproxy --listen_port %s'" % (home_path, server_config.get('listen_port')))
ret = client.execute_command('rm -fr %s' % home_path, timeout=-1)
if not ret:
global_ret = False
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=ret.stderr))
continue
if not client.execute_command("bash -c 'mkdir -p %s/{run,bin,lib}'" % home_path):
global_ret = False
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.NOT_EMPTY.format(path=home_path)))
stdio.error(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=InitDirFailedErrorMessage.PERMISSION_DENIED.format(path=home_path)))
if global_ret:
stdio.stop_loading('succeed')
......
......@@ -372,22 +372,16 @@
- name: local_bound_ip
type: STRING
default: 0.0.0.0
max_value: ''
min_value: ''
need_restart: true
description_en: local bound ip(any)
- name: obproxy_config_server_url
type: STRING
default: ''
max_value: ''
min_value: ''
need_restart: true
description_en: url of config info(rs list and so on)
- name: proxy_service_mode
type: STRING
default: ''
max_value: ''
min_value: ''
need_restart: true
description_en: "proxy deploy and service mode: 1.client(default); 2.server"
- name: proxy_id
......@@ -400,8 +394,6 @@
- name: app_name
type: STRING
default: undefined
max_value: ''
min_value: ''
need_restart: true
description_en: current application name which proxy works for, need defined, only modified when restart
- name: enable_metadb_used
......@@ -414,8 +406,6 @@
- name: rootservice_cluster_name
type: STRING
default: undefined
max_value: ''
min_value: ''
need_restart: true
description_en: default cluster name for rootservice_list
- name: prometheus_cost_ms_unit
......@@ -435,21 +425,15 @@
- name: obproxy_sys_password
type: STRING
default: ''
max_value: ''
min_value: ''
need_restart: false
description_en: password pf obproxy sys user
- name: observer_sys_password
type: STRING
default: ''
max_value: ''
min_value: ''
need_restart: false
description_en: password of observer proxyro user
- name: observer_root_password
type: STRING
default: ''
max_value: ''
min_value: ''
need_restart: false
description_en: password of observer root user
\ No newline at end of file
......@@ -55,18 +55,17 @@ class Restart(object):
# self.cursors = None
# self.dbs = None
def connect(self):
if self.cursors is None:
self.stdio.verbose('Call %s for %s' % (self.connect_plugin, self.repository))
self.sub_io.start_loading('Connect to obproxy')
ret = self.connect_plugin(self.components, self.clients, self.cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io)
if not ret:
self.sub_io.stop_loading('fail')
return False
self.sub_io.stop_loading('succeed')
# self.close()
self.cursors = ret.get_return('cursor')
self.dbs = ret.get_return('connect')
def connect(self, cluster_config):
self.stdio.verbose('Call %s for %s' % (self.connect_plugin, self.repository))
self.sub_io.start_loading('Connect to obproxy')
ret = self.connect_plugin(self.components, self.clients, cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io)
if not ret:
self.sub_io.stop_loading('fail')
return False
self.sub_io.stop_loading('succeed')
# self.close()
self.cursors = ret.get_return('cursor')
self.dbs = ret.get_return('connect')
return True
def dir_read_check(self, client, path):
......@@ -77,6 +76,12 @@ class Restart(object):
def restart(self):
clients = self.clients
if self.new_cluster_config:
if not self.connect(self.cluster_config):
return False
self.stdio.verbose('Call %s for %s' % (self.reload_plugin, self.repository))
self.reload_plugin(self.components, self.clients, self.cluster_config, [], {}, self.sub_io, cursor=self.cursors, new_cluster_config=self.new_cluster_config, repository_dir=self.repository.repository_dir)
self.stdio.verbose('Call %s for %s' % (self.stop_plugin, self.repository))
if not self.stop_plugin(self.components, clients, self.cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io):
self.stdio.stop_loading('stop_loading', 'fail')
......@@ -103,16 +108,12 @@ class Restart(object):
self.stdio.stop_loading('stop_loading', 'fail')
return False
if self.connect():
if self.connect(cluster_config):
if self.bootstrap_plugin:
self.stdio.verbose('Call %s for %s' % (self.bootstrap_plugin, self.repository))
self.bootstrap_plugin(self.components, clients, cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io, cursor=self.cursors)
self.stdio.verbose('Call %s for %s' % (self.display_plugin, self.repository))
ret = self.display_plugin(self.components, clients, cluster_config, self.plugin_context.cmd, self.plugin_context.options, self.sub_io, cursor=self.cursors)
if self.new_cluster_config:
self.stdio.verbose('Call %s for %s' % (self.reload_plugin, self.repository))
self.reload_plugin(self.components, self.clients, self.cluster_config, [], {}, self.sub_io,
cursor=self.cursors, new_cluster_config=self.new_cluster_config, repository_dir=self.repository.repository_dir)
return ret
return False
......
......@@ -26,6 +26,7 @@ import time
def display(plugin_context, cursor, *args, **kwargs):
stdio = plugin_context.stdio
stdio.start_loading('Wait for observer init')
cluster_config = plugin_context.cluster_config
try:
while True:
try:
......@@ -34,6 +35,9 @@ def display(plugin_context, cursor, *args, **kwargs):
if servers:
stdio.print_list(servers, ['ip', 'version', 'port', 'zone', 'status'],
lambda x: [x['svr_ip'], x['build_version'].split('_')[0], x['inner_port'], x['zone'], x['status']], title='observer')
password = cluster_config.get_global_conf().get('root_password', '')
cmd = 'obclient -h%s -P%s -uroot %s-Doceanbase' % (servers[0]['svr_ip'], servers[0]['inner_port'], '-p%s ' % password if password else '')
stdio.print(cmd)
stdio.stop_loading('succeed')
return plugin_context.return_true()
except Exception as e:
......
......@@ -2,6 +2,9 @@
target_path: bin/observer
type: bin
mode: 755
- src_path: ./home/admin/oceanbase/bin
target_path: bin
type: dir
- src_path: ./home/admin/oceanbase/etc
target_path: etc
type: dir
......
......@@ -89,6 +89,33 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
cluster_config.update_global_conf('appname', default_appname, False)
MIN_MEMORY = 8 << 30
MIN_CPU_COUNT = 16
clog_disk_utilization_threshold_max = 95
clog_disk_usage_limit_percentage_max = 98
global_config = cluster_config.get_original_global_conf()
if getattr(plugin_context.options, 'mini', False):
if not global_config.get('memory_limit_percentage') and not global_config.get('memory_limit'):
cluster_config.update_global_conf('memory_limit', format_size(MIN_MEMORY, 0), False)
if not global_config.get('datafile_size') and not global_config.get('datafile_disk_percentage'):
cluster_config.update_global_conf('datafile_size', '20G', False)
if not global_config.get('clog_disk_utilization_threshold'):
cluster_config.update_global_conf('clog_disk_utilization_threshold', clog_disk_utilization_threshold_max, False)
if not global_config.get('clog_disk_usage_limit_percentage'):
cluster_config.update_global_conf('clog_disk_usage_limit_percentage', clog_disk_usage_limit_percentage_max, False)
max_syslog_file_count_default = 4
if global_config.get('syslog_level') is None:
cluster_config.update_global_conf('syslog_level', 'INFO', False)
if global_config.get('enable_syslog_recycle') is None:
cluster_config.update_global_conf('enable_syslog_recycle', True, False)
if global_config.get('enable_syslog_wf') is None:
cluster_config.update_global_conf('enable_syslog_wf', True, False)
if global_config.get('max_syslog_file_count') is None:
cluster_config.update_global_conf('max_syslog_file_count', max_syslog_file_count_default, False)
if global_config.get('cluster_id') is None:
cluster_config.update_global_conf('cluster_id', 1, False)
for server in cluster_config.servers:
ip = server.ip
client = clients[server]
......@@ -112,18 +139,6 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
cluster_config.update_server_conf(server, 'devname', interface)
break
max_syslog_file_count_default = 4
if user_server_config.get('syslog_level') is None:
cluster_config.update_server_conf(server, 'syslog_level', 'INFO', False)
if user_server_config.get('enable_syslog_recycle') is None:
cluster_config.update_server_conf(server, 'enable_syslog_recycle', True, False)
if user_server_config.get('enable_syslog_wf') is None:
cluster_config.update_server_conf(server, 'enable_syslog_wf', True, False)
if user_server_config.get('max_syslog_file_count') is None:
cluster_config.update_server_conf(server, 'max_syslog_file_count', max_syslog_file_count_default, False)
if server_config.get('cluster_id') is None:
cluster_config.update_server_conf(server, 'cluster_id', 1, False)
dirs = {"home_path": server_config['home_path']}
dirs["data_dir"] = server_config['data_dir'] if server_config.get('data_dir') else os.path.join(server_config['home_path'], 'store')
dirs["redo_dir"] = server_config['redo_dir'] if server_config.get('redo_dir') else dirs["data_dir"]
......@@ -162,10 +177,10 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
else:
try:
memory_limit = parse_size(server_config.get('memory_limit'))
auto_set_memory = True
except:
stdio.error('memory_limit must be an integer')
return
auto_set_memory = True
auto_set_system_memory = False
if not user_server_config.get('system_memory'):
......@@ -177,12 +192,14 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
ret = client.execute_command("grep -e 'processor\s*:' /proc/cpuinfo | wc -l")
if ret and ret.stdout.strip().isdigit():
cpu_num = int(ret.stdout)
server_config['cpu_count'] = max(16, int(cpu_num - 2))
server_config['cpu_count'] = max(MIN_CPU_COUNT, int(cpu_num - 2))
else:
server_config['cpu_count'] = 16
server_config['cpu_count'] = MIN_CPU_COUNT
cluster_config.update_server_conf(server, 'cpu_count', server_config['cpu_count'], False)
elif server_config['cpu_count'] < MIN_CPU_COUNT:
cluster_config.update_server_conf(server, 'cpu_count', MIN_CPU_COUNT, False)
stdio.warn('(%s): automatically adjust the cpu_count %s' % (server, MIN_CPU_COUNT))
cluster_config.update_server_conf(server, 'cpu_count', max(16, server_config['cpu_count']), False)
# disk
if not server_config.get('datafile_size') and not user_server_config.get('datafile_disk_percentage'):
disk = {'/': 0}
......@@ -224,7 +241,6 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
clog_dir_disk = disk[clog_dir_mount]
if clog_dir_mount == data_dir_mount:
clog_disk_utilization_threshold_max = 95
disk_free = data_dir_disk['avail']
real_disk_total = data_dir_disk['total']
if mounts[dirs['home_path']] == data_dir_mount:
......@@ -274,7 +290,7 @@ def generate_config(plugin_context, deploy_config, *args, **kwargs):
datafile_size = parse_size(datafile_size_format)
clog_disk_utilization_threshold = max(80, int(100.0 * (disk_used + datafile_size + padding_size + clog_disk_size * 0.8) / real_disk_total))
clog_disk_utilization_threshold = min(clog_disk_utilization_threshold, clog_disk_utilization_threshold_max)
clog_disk_usage_limit_percentage = min(int(clog_disk_utilization_threshold / 80.0 * 95), 98)
clog_disk_usage_limit_percentage = min(int(clog_disk_utilization_threshold / 80.0 * 95), clog_disk_usage_limit_percentage_max)
cluster_config.update_server_conf(server, 'datafile_size', datafile_size_format, False)
cluster_config.update_server_conf(server, 'clog_disk_utilization_threshold', clog_disk_utilization_threshold, False)
......
......@@ -37,7 +37,7 @@ def critical(*arg, **kwargs):
def init_dir(server, client, key, path, link_path=None):
if force:
ret = client.execute_command('rm -fr %s' % path)
ret = client.execute_command('rm -fr %s' % path, timeout=-1)
if not ret:
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='%s path' % key, msg=ret.stderr))
return False
......@@ -118,7 +118,17 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if need_clean:
client.execute_command(
"pkill -9 -u `whoami` -f '^%s/bin/observer -p %s'" % (home_path, server_config['mysql_port']))
ret = client.execute_command('rm -fr %s/*' % home_path)
if client.execute_command('bash -c \'if [[ "$(ls -d {0} 2>/dev/null)" != "" && ! -O {0} ]]; then exit 0; else exit 1; fi\''.format(home_path)):
owner = client.execute_command("ls -ld %s | awk '{print $3}'" % home_path).stdout.strip()
err_msg = ' {} is not empty, and the owner is {}'.format(home_path, owner)
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=err_msg))
continue
need_clean = True
if need_clean:
client.execute_command(
"pkill -9 -u `whoami` -f '^%s/bin/observer -p %s'" % (home_path, server_config['mysql_port']))
ret = client.execute_command('rm -fr %s/*' % home_path, timeout=-1)
if not ret:
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='home path', msg=ret.stderr))
continue
......@@ -134,7 +144,7 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if ret:
data_path = server_config['data_dir']
if need_clean:
ret = client.execute_command('rm -fr %s/*' % data_path)
ret = client.execute_command('rm -fr %s/*' % data_path, timeout=-1)
if not ret:
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='data dir', msg=InitDirFailedErrorMessage.PERMISSION_DENIED.format(path=data_path)))
continue
......@@ -154,7 +164,7 @@ def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
# init_dir(server, client, key, server_config['%s_dir' % key], os.path.join(data_path, key))
log_dir = server_config['%s_dir' % key]
if force:
ret = client.execute_command('rm -fr %s/*' % log_dir)
ret = client.execute_command('rm -fr %s/*' % log_dir, timeout=-1)
if not ret:
critical(EC_FAIL_TO_INIT_PATH.format(server=server, key='%s dir' % key, msg=InitDirFailedErrorMessage.PERMISSION_DENIED.format(path=log_dir)))
continue
......
......@@ -96,7 +96,7 @@ class Restart(object):
def wait(self):
if not self.connect():
return False
self.stdio.verbose('server cneck')
self.stdio.verbose('server check')
self.broken_sql("select * from oceanbase.__all_server where status != 'active' or stop_time > 0 or start_service_time = 0")
self.broken_sql("select * from oceanbase.__all_virtual_clog_stat where is_in_sync= 0 and is_offline = 0")
return True
......
......@@ -34,35 +34,6 @@ from ssh import LocalClient
stdio = None
def parse_size(size):
_bytes = 0
if not isinstance(size, str) or size.isdigit():
_bytes = int(size)
else:
units = {"B": 1, "K": 1<<10, "M": 1<<20, "G": 1<<30, "T": 1<<40}
match = re.match(r'([1-9][0-9]*)\s*([B,K,M,G,T])', size.upper())
_bytes = int(match.group(1)) * units[match.group(2)]
return _bytes
def format_size(size, precision=1):
units = ['B', 'K', 'M', 'G']
units_num = len(units) - 1
idx = 0
if precision:
div = 1024.0
format = '%.' + str(precision) + 'f%s'
limit = 1024
else:
div = 1024
limit = 1024
format = '%d%s'
while idx < units_num and size >= limit:
size /= div
idx += 1
return format % (size, units[idx])
def exec_cmd(cmd):
stdio.verbose('execute: %s' % cmd)
process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
......@@ -74,31 +45,20 @@ def exec_cmd(cmd):
return process.returncode == 0
def run_test(plugin_context, db, cursor, odp_db, odp_cursor=None, *args, **kwargs):
def run_test(plugin_context, *args, **kwargs):
def get_option(key, default=''):
if key in opt_keys:
opt_keys.remove(key)
value = getattr(options, key, default)
if value is None:
value = default
return value
def execute(cursor, query, args=None):
msg = query % tuple(args) if args is not None else query
stdio.verbose('execute sql: %s' % msg)
# stdio.verbose("query: %s. args: %s" % (query, args))
try:
cursor.execute(query, args)
return cursor.fetchone()
except:
msg = 'execute sql exception: %s' % msg
stdio.exception(msg)
raise Exception(msg)
global stdio
cluster_config = plugin_context.cluster_config
stdio = plugin_context.stdio
options = plugin_context.options
optimization = get_option('optimization') > 0
ob_optimization = get_option('ob_optimization')
opt_keys = list(vars(options).keys())
for used_key in ['component', 'test_server', 'skip_cluster_status_check', 'obclient_bin', 'optimization']:
opt_keys.remove(used_key)
host = get_option('host', '127.0.0.1')
port = get_option('port', 2881)
......@@ -116,157 +76,10 @@ def run_test(plugin_context, db, cursor, odp_db, odp_cursor=None, *args, **kwarg
skip_trx = get_option('skip_trx', '').lower()
percentile = get_option('percentile', None)
script_name = get_option('script_name', 'oltp_point_select.lua')
obclient_bin = get_option('obclient_bin', 'obclient')
sysbench_bin = get_option('sysbench_bin', 'sysbench')
sysbench_script_dir = get_option('sysbench_script_dir', '/usr/sysbench/share/sysbench')
if tenant_name == 'sys':
stdio.error('DO NOT use sys tenant for testing.')
return
ret = LocalClient.execute_command('%s --help' % obclient_bin, stdio=stdio)
if not ret:
stdio.error('%s\n%s is not an executable file. Please use `--obclient-bin` to set.\nYou may not have obclient installed' % (ret.stderr, obclient_bin))
return
ret = LocalClient.execute_command('%s --help' % sysbench_bin, stdio=stdio)
if not ret:
stdio.error('%s\n%s is not an executable file. Please use `--sysbench-bin` to set.\nYou may not have ob-sysbench installed' % (ret.stderr, sysbench_bin))
return
if not script_name.endswith('.lua'):
script_name += '.lua'
script_path = os.path.join(sysbench_script_dir, script_name)
if not os.path.exists(script_path):
stdio.error('No such file %s. Please use `--sysbench-script-dir` to set sysbench scrpit dir.\nYou may not have ob-sysbench installed' % script_path)
return
sql = "select * from oceanbase.gv$tenant where tenant_name = %s"
max_cpu = 2
tenant_meta = None
try:
stdio.verbose('execute sql: %s' % (sql % tenant_name))
cursor.execute(sql, [tenant_name])
tenant_meta = cursor.fetchone()
if not tenant_meta:
stdio.error('Tenant %s not exists. Use `obd cluster tenant create` to create tenant.' % tenant_name)
return
sql = "select * from oceanbase.__all_resource_pool where tenant_id = %d" % tenant_meta['tenant_id']
pool = execute(cursor, sql)
sql = "select * from oceanbase.__all_unit_config where unit_config_id = %d" % pool['unit_config_id']
max_cpu = execute(cursor, sql)['max_cpu']
except:
stdio.exception('')
return
exec_sql_cmd = "%s -h%s -P%s -u%s@%s %s -A -e" % (obclient_bin, host, port, user, tenant_name, ("-p'%s'" % password) if password else '')
ret = LocalClient.execute_command('%s "%s"' % (exec_sql_cmd, 'create database if not exists %s;' % mysql_db), stdio=stdio)
if not ret:
stdio.error(ret.stderr)
return
sql = ''
odp_configs_done = []
system_configs_done = []
tenant_variables_done = []
odp_configs = [
# [配置名, 新值, 旧值, 替换条件: lambda n, o: n != o]
# ['enable_compression_protocol', False, False, lambda n, o: n != o],
['proxy_mem_limited', format_size(min(max(threads * (8 << 10), 2 << 30), 4 << 30), 0), 0, lambda n, o: parse_size(n) > parse_size(o)],
['enable_prometheus', False, False, lambda n, o: n != o],
['enable_metadb_used', False, False, lambda n, o: n != o],
['enable_standby', False, False, lambda n, o: n != o],
['enable_strict_stat_time', False, False, lambda n, o: n != o],
['use_local_dbconfig', True, True, lambda n, o: n != o],
]
system_configs = [
# [配置名, 新值, 旧值, 替换条件: lambda n, o: n != o, 是否是租户级]
['enable_auto_leader_switch', False, False, lambda n, o: n != o, False],
['enable_one_phase_commit', False, False, lambda n, o: n != o, False],
['weak_read_version_refresh_interval', '5s', '5s', lambda n, o: n != o, False],
['syslog_level', 'PERF', 'PERF', lambda n, o: n != o, False],
['max_syslog_file_count', 100, 100, lambda n, o: n != o, False],
['enable_syslog_recycle', True, True, lambda n, o: n != o, False],
['trace_log_slow_query_watermark', '10s', '10s', lambda n, o: n != o, False],
['large_query_threshold', '1s', '1s', lambda n, o: n != o, False],
['clog_sync_time_warn_threshold', '200ms', '200ms', lambda n, o: n != o, False],
['syslog_io_bandwidth_limit', '10M', '10M', lambda n, o: n != o, False],
['enable_sql_audit', False, False, lambda n, o: n != o, False],
['sleep', 1],
['enable_perf_event', False, False, lambda n, o: n != o, False],
['clog_max_unconfirmed_log_count', 5000, 5000, lambda n, o: n != o, False],
['autoinc_cache_refresh_interval', '86400s', '86400s', lambda n, o: n != o, False],
['enable_early_lock_release', False, False, lambda n, o: n != o, True],
['default_compress_func', 'lz4_1.0', 'lz4_1.0', lambda n, o: n != o, False],
['_clog_aggregation_buffer_amount', 4, 4, lambda n, o: n != o, False],
['_flush_clog_aggregation_buffer_timeout', '1ms', '1ms', lambda n, o: n != o, False],
]
try:
if odp_cursor and optimization:
for config in odp_configs:
sql = 'show proxyconfig like "%s"' % config[0]
ret = execute(odp_cursor, sql)
if ret:
config[2] = ret['value']
if config[3](config[1], config[2]):
sql = 'alter proxyconfig set %s=%%s' % config[0]
odp_configs_done.append(config)
execute(odp_cursor, sql, [config[1]])
tenant_q = ' tenant="%s"' % tenant_name
server_num = len(cluster_config.servers)
if optimization and ob_optimization:
for config in system_configs:
if config[0] == 'sleep':
sleep(config[1])
system_configs_done.append(config)
continue
sql = 'show parameters like "%s"' % config[0]
if config[4]:
sql += tenant_q
ret = execute(cursor, sql)
if ret:
config[2] = ret['value']
if config[3](config[1], config[2]):
sql = 'alter system set %s=%%s' % config[0]
if config[4]:
sql += tenant_q
system_configs_done.append(config)
execute(cursor, sql, [config[1]])
sql = "select count(1) server_num from oceanbase.__all_server where status = 'active'"
ret = execute(cursor, sql)
if ret:
server_num = ret.get("server_num", server_num)
parallel_max_servers = int(max_cpu * 10)
parallel_servers_target = int(max_cpu * server_num * 8)
tenant_variables = [
# [变量名, 新值, 旧值, 替换条件: lambda n, o: n != o]
['ob_timestamp_service', 1, 1, lambda n, o: n != o],
['autocommit', 1, 1, lambda n, o: n != o],
['ob_query_timeout', 36000000000, 36000000000, lambda n, o: n != o],
['ob_trx_timeout', 36000000000, 36000000000, lambda n, o: n != o],
['max_allowed_packet', 67108864, 67108864, lambda n, o: n != o],
['ob_sql_work_area_percentage', 100, 100, lambda n, o: n != o],
['parallel_max_servers', parallel_max_servers, parallel_max_servers, lambda n, o: n != o],
['parallel_servers_target', parallel_servers_target, parallel_servers_target, lambda n, o: n != o]
]
select_sql_t = "select value from oceanbase.__all_virtual_sys_variable where tenant_id = %d and name = '%%s'" % tenant_meta['tenant_id']
update_sql_t = "ALTER TENANT %s SET VARIABLES %%s = %%%%s" % tenant_name
for config in tenant_variables:
sql = select_sql_t % config[0]
ret = execute(cursor, sql)
if ret:
value = ret['value']
config[2] = int(value) if isinstance(value, str) and value.isdigit() else value
if config[3](config[1], config[2]):
sql = update_sql_t % config[0]
tenant_variables_done.append(config)
execute(cursor, sql, [config[1]])
sysbench_cmd = "cd %s; %s %s --mysql-host=%s --mysql-port=%s --mysql-user=%s@%s --mysql-db=%s" % (sysbench_script_dir, sysbench_bin, script_name, host, port, user, tenant_name, mysql_db)
if password:
......@@ -289,35 +102,11 @@ def run_test(plugin_context, db, cursor, odp_db, odp_cursor=None, *args, **kwarg
sysbench_cmd += ' --skip_trx=%s' % skip_trx
if percentile:
sysbench_cmd += ' --percentile=%s' % percentile
for opt_key in opt_keys:
sysbench_cmd += ' --%s=%s' % (opt_key.replace('_', '-'), getattr(options, opt_key))
if exec_cmd('%s cleanup' % sysbench_cmd) and exec_cmd('%s prepare' % sysbench_cmd) and exec_cmd('%s --db-ps-mode=disable run' % sysbench_cmd):
return plugin_context.return_true()
except KeyboardInterrupt:
pass
except:
stdio.exception('')
finally:
try:
if optimization:
for config in tenant_variables_done[::-1]:
if config[3](config[1], config[2]):
sql = update_sql_t % config[0]
execute(cursor, sql, [config[2]])
for config in system_configs_done[::-1]:
if config[0] == 'sleep':
sleep(config[1])
continue
if config[3](config[1], config[2]):
sql = 'alter system set %s=%%s' % config[0]
if config[4]:
sql += tenant_q
execute(cursor, sql, [config[2]])
if odp_cursor:
for config in odp_configs_done[::-1]:
if config[3](config[1], config[2]):
sql = 'alter proxyconfig set %s=%%s' % config[0]
execute(odp_cursor, sql, [config[2]])
except:
pass
stdio.exception('')
\ No newline at end of file
......@@ -175,34 +175,11 @@ def build(plugin_context, cursor, odp_cursor, *args, **kwargs):
# load data
stdio.verbose('Start to load data.')
cmd = '{java_bin} -cp {cp} -Dprop={prop} LoadData'.format(java_bin=java_bin, cp=bmsql_classpath, prop=bmsql_prop_path)
stdio.start_progressbar('Load data ', warehouses, widget_type='simple_progress')
try:
stdio.verbose('local execute: %s' % cmd)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while p.poll() is None:
count = get_table_rows('bmsql_warehouse')
if count:
stdio.update_progressbar(min(count, warehouses - 1))
time.sleep(10)
code = p.returncode
output = p.stdout.read().decode()
verbose_msg = 'exited code %s' % code
verbose_msg += ', output:\n%s' % output
subprocess.call(cmd, shell=True, stderr=subprocess.STDOUT)
except:
output = ''
code = 255
verbose_msg = 'unknown error'
stdio.exception('')
stdio.verbose(verbose_msg)
if code != 0:
stdio.interrupt_progressbar()
stdio.error(EC_TPCC_LOAD_DATA_FAILED)
return
if re.match(r'.*Worker \d+: ERROR: .*', output, re.S):
stdio.interrupt_progressbar()
stdio.error(EC_TPCC_LOAD_DATA_FAILED)
return
stdio.finish_progressbar()
stdio.exception('failed to load data')
# create index
stdio.start_loading('create index')
......
......@@ -174,6 +174,7 @@ def pre_test(plugin_context, cursor, odp_cursor, *args, **kwargs):
cpu_count = int(serv.get('cpu_total', 0) + 2)
min_cpu = cpu_count if min_cpu is None else min(cpu_count, min_cpu)
cpu_total += cpu_count
server_num = len(all_services)
except Exception as e:
stdio.exception(e)
stdio.error('Fail to get server status')
......@@ -278,5 +279,15 @@ def pre_test(plugin_context, cursor, odp_cursor, *args, **kwargs):
warehouses=warehouses,
cpu_total=cpu_total,
max_memory=max_memory,
max_cpu=max_cpu
max_cpu=max_cpu,
tenant_id=tenant_meta['tenant_id'],
tenant=tenant_name,
tmp_dir=tmp_dir,
server_num=server_num,
obclient_bin=obclient_bin,
host=host,
port=port,
user=user,
password=password,
database=db_name
)
......@@ -32,7 +32,25 @@ from ssh import LocalClient
from tool import DirectoryUtil
def pre_test(plugin_context, *args, **kwargs):
def format_size(size, precision=1):
units = ['B', 'K', 'M', 'G']
units_num = len(units) - 1
idx = 0
if precision:
div = 1024.0
format = '%.' + str(precision) + 'f%s'
limit = 1024
else:
div = 1024
limit = 1024
format = '%d%s'
while idx < units_num and size >= limit:
size /= div
idx += 1
return format % (size, units[idx])
def pre_test(plugin_context, cursor, *args, **kwargs):
def get_option(key, default=''):
value = getattr(options, key, default)
if not value:
......@@ -50,6 +68,18 @@ def pre_test(plugin_context, *args, **kwargs):
stdio.verbose('get %s_path: %s' % (key, path))
return path if path else default
def execute(cursor, query, args=None):
msg = query % tuple(args) if args is not None else query
stdio.verbose('execute sql: %s' % msg)
stdio.verbose("query: %s. args: %s" % (query, args))
try:
cursor.execute(query, args)
return cursor.fetchone()
except:
msg = 'execute sql exception: %s' % msg
stdio.exception(msg)
raise Exception(msg)
def local_execute_command(command, env=None, timeout=None):
return LocalClient.execute_command(command, env, timeout, stdio)
......@@ -65,6 +95,12 @@ def pre_test(plugin_context, *args, **kwargs):
disable_transfer = get_option('disable_transfer', False)
remote_tbl_dir = get_option('remote_tbl_dir')
tenant_name = get_option('tenant', 'test')
host = get_option('host', '127.0.0.1')
port = get_option('port', 2881)
mysql_db = get_option('database', 'test')
user = get_option('user', 'root')
password = get_option('password', '')
if tenant_name == 'sys':
stdio.error('DO NOT use sys tenant for testing.')
return
......@@ -91,8 +127,35 @@ def pre_test(plugin_context, *args, **kwargs):
stdio.verbose('set tmp_dir: %s' % tmp_dir)
setattr(options, 'tmp_dir', tmp_dir)
sql = "select * from oceanbase.gv$tenant where tenant_name = %s"
try:
stdio.verbose('execute sql: %s' % (sql % tenant_name))
cursor.execute(sql, [tenant_name])
tenant_meta = cursor.fetchone()
if not tenant_meta:
stdio.error('Tenant %s not exists. Use `obd cluster tenant create` to create tenant.' % tenant_name)
return
sql = "select * from oceanbase.__all_resource_pool where tenant_id = %d" % tenant_meta['tenant_id']
pool = execute(cursor, sql)
sql = "select * from oceanbase.__all_unit_config where unit_config_id = %d" % pool['unit_config_id']
tenant_unit = execute(cursor, sql)
max_cpu = tenant_unit['max_cpu']
min_memory = tenant_unit['min_memory']
unit_count = pool['unit_count']
except:
stdio.error('fail to get tenant info')
return
server_num = len(cluster_config.servers)
sql = "select count(1) server_num from oceanbase.__all_server where status = 'active'"
ret = execute(cursor, sql)
if ret:
server_num = ret.get("server_num", server_num)
if get_option('test_only'):
return plugin_context.return_true()
return plugin_context.return_true(
max_cpu=max_cpu, min_memory=min_memory, unit_count=unit_count, server_num=server_num, tenant=tenant_name,
tenant_id=tenant_meta['tenant_id'], format_size=format_size
)
if not remote_tbl_dir:
stdio.error('Please use --remote-tbl-dir to set a dir for remote tbl files')
......@@ -144,6 +207,11 @@ def pre_test(plugin_context, *args, **kwargs):
stdio.stop_loading('succeed')
stdio.verbose('set tbl_path: %s' % tbl_path)
setattr(options, 'tbl_path', tbl_path)
return plugin_context.return_true()
return plugin_context.return_true(
obclient_bin=obclient_bin, host=host, port=port, user=user, password=password, database=mysql_db,
max_cpu=max_cpu, min_memory=min_memory, unit_count=unit_count, server_num=server_num, tenant=tenant_name,
tenant_id=tenant_meta['tenant_id'], format_size=format_size
)
......@@ -45,24 +45,6 @@ def parse_size(size):
return _bytes
def format_size(size, precision=1):
units = ['B', 'K', 'M', 'G']
units_num = len(units) - 1
idx = 0
if precision:
div = 1024.0
format = '%.' + str(precision) + 'f%s'
limit = 1024
else:
div = 1024
limit = 1024
format = '%d%s'
while idx < units_num and size >= limit:
size /= div
idx += 1
return format % (size, units[idx])
def exec_cmd(cmd):
stdio.verbose('execute: %s' % cmd)
process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
......@@ -101,7 +83,6 @@ def run_test(plugin_context, db, cursor, *args, **kwargs):
clients = plugin_context.clients
options = plugin_context.options
optimization = get_option('optimization') > 0
not_test_only = not get_option('test_only')
host = get_option('host', '127.0.0.1')
......@@ -118,30 +99,10 @@ def run_test(plugin_context, db, cursor, *args, **kwargs):
sql_path = sorted(sql_path, key=lambda x: (len(x), x))
sql = "select * from oceanbase.gv$tenant where tenant_name = %s"
max_cpu = 2
max_cpu = kwargs.get('max_cpu', 2)
tenant_id = kwargs.get('tenant_id')
unit_count = kwargs.get('unit_count', 0)
cpu_total = 0
min_memory = 0
unit_count = 0
tenant_meta = None
tenant_unit = None
try:
stdio.verbose('execute sql: %s' % (sql % tenant_name))
cursor.execute(sql, [tenant_name])
tenant_meta = cursor.fetchone()
if not tenant_meta:
stdio.error('Tenant %s not exists. Use `obd cluster tenant create` to create tenant.' % tenant_name)
return
sql = "select * from oceanbase.__all_resource_pool where tenant_id = %d" % tenant_meta['tenant_id']
pool = execute(cursor, sql)
sql = "select * from oceanbase.__all_unit_config where unit_config_id = %d" % pool['unit_config_id']
tenant_unit = execute(cursor, sql)
max_cpu = tenant_unit['max_cpu']
min_memory = tenant_unit['min_memory']
unit_count = pool['unit_count']
except:
stdio.error('fail to get tenant info')
return
if not_test_only:
sql_cmd_prefix = '%s -h%s -P%s -u%s@%s %s -A' % (obclient_bin, host, port, user, tenant_name, ("-p'%s'" % password) if password else '')
......@@ -158,7 +119,6 @@ def run_test(plugin_context, db, cursor, *args, **kwargs):
stdio.error(ret.stderr)
return
for server in cluster_config.servers:
client = clients[server]
ret = client.execute_command("grep -e 'processor\s*:' /proc/cpuinfo | wc -l")
......@@ -167,96 +127,17 @@ def run_test(plugin_context, db, cursor, *args, **kwargs):
else:
server_config = cluster_config.get_server_conf(server)
cpu_total += int(server_config.get('cpu_count', 0))
sql = ''
system_configs_done = []
tenant_variables_done = []
try:
cache_wash_threshold = format_size(int(min_memory * 0.2), 0)
system_configs = [
# [配置名, 新值, 旧值, 替换条件: lambda n, o: n != o, 是否是租户级]
['syslog_level', 'PERF', 'PERF', lambda n, o: n != o, False],
['max_syslog_file_count', 100, 100, lambda n, o: n != o, False],
['enable_syslog_recycle', True, True, lambda n, o: n != o, False],
['enable_merge_by_turn', False, False, lambda n, o: n != o, False],
['trace_log_slow_query_watermark', '100s', '100s', lambda n, o: n != o, False],
['max_kept_major_version_number', 1, 1, lambda n, o: n != o, False],
['enable_sql_operator_dump', True, True, lambda n, o: n != o, False],
['_hash_area_size', '3g', '3g', lambda n, o: n != o, False],
['memstore_limit_percentage', 50, 50, lambda n, o: n != o, False],
['enable_rebalance', False, False, lambda n, o: n != o, False],
['memory_chunk_cache_size', '1g', '1g', lambda n, o: n != o, False],
['minor_freeze_times', 5, 5, lambda n, o: n != o, False],
['merge_thread_count', 20, 20, lambda n, o: n != o, False],
['cache_wash_threshold', cache_wash_threshold, cache_wash_threshold, lambda n, o: n != o, False],
['ob_enable_batched_multi_statement', True, True, lambda n, o: n != o, False],
]
tenant_q = ' tenant="%s"' % tenant_name
server_num = len(cluster_config.servers)
if optimization:
for config in system_configs:
if config[0] == 'sleep':
time.sleep(config[1])
system_configs_done.append(config)
continue
sql = 'show parameters like "%s"' % config[0]
if config[4]:
sql += tenant_q
ret = execute(cursor, sql)
if ret:
config[2] = ret['value']
if config[3](config[1], config[2]):
sql = 'alter system set %s=%%s' % config[0]
if config[4]:
sql += tenant_q
system_configs_done.append(config)
execute(cursor, sql, [config[1]])
sql = "select count(1) server_num from oceanbase.__all_server where status = 'active'"
ret = execute(cursor, sql)
if ret:
server_num = ret.get("server_num", server_num)
parallel_max_servers = min(int(max_cpu * 10), 1800)
parallel_servers_target = int(max_cpu * server_num * 8)
tenant_variables = [
# [变量名, 新值, 旧值, 替换条件: lambda n, o: n != o]
['ob_sql_work_area_percentage', 80, 80, lambda n, o: n != o],
['optimizer_use_sql_plan_baselines', True, True, lambda n, o: n != o],
['optimizer_capture_sql_plan_baselines', True, True, lambda n, o: n != o],
['ob_query_timeout', 36000000000, 36000000000, lambda n, o: n != o],
['ob_trx_timeout', 36000000000, 36000000000, lambda n, o: n != o],
['max_allowed_packet', 67108864, 67108864, lambda n, o: n != o],
['secure_file_priv', "", "", lambda n, o: n != o],
['parallel_max_servers', parallel_max_servers, parallel_max_servers, lambda n, o: n != o],
['parallel_servers_target', parallel_servers_target, parallel_servers_target, lambda n, o: n != o]
]
select_sql_t = "select value from oceanbase.__all_virtual_sys_variable where tenant_id = %d and name = '%%s'" % tenant_meta['tenant_id']
update_sql_t = "ALTER TENANT %s SET VARIABLES %%s = %%%%s" % tenant_name
for config in tenant_variables:
sql = select_sql_t % config[0]
ret = execute(cursor, sql)
if ret:
value = ret['value']
config[2] = int(value) if isinstance(value, str) and value.isdigit() else value
if config[3](config[1], config[2]):
sql = update_sql_t % config[0]
tenant_variables_done.append(config)
execute(cursor, sql, [config[1]])
else:
sql = "select value from oceanbase.__all_virtual_sys_variable where tenant_id = %d and name = 'secure_file_priv'" % tenant_meta['tenant_id']
ret = execute(cursor, sql)['value']
if ret is None:
stdio.error('Access denied. Please set `secure_file_priv` to "".')
return
if ret:
for path in tbl_path:
if not path.startswith(ret):
stdio.error('Access denied. Please set `secure_file_priv` to "".')
return
sql = "select value from oceanbase.__all_virtual_sys_variable where tenant_id = %d and name = 'secure_file_priv'" % tenant_id
ret = execute(cursor, sql)['value']
if ret is None:
stdio.error('Access denied. Please set `secure_file_priv` to "".')
return
if ret:
for path in tbl_path:
if not path.startswith(ret):
stdio.error('Access denied. Please set `secure_file_priv` to "".')
return
parallel_num = int(max_cpu * unit_count)
......@@ -331,7 +212,7 @@ def run_test(plugin_context, db, cursor, *args, **kwargs):
for path in sql_path:
_, fn = os.path.split(path)
log_path = os.path.join(tmp_dir, '%s.log' % fn)
ret = local_execute_command('source %s | %s -c > %s' % (path, sql_cmd_prefix, log_path))
ret = local_execute_command('echo source %s | %s -c > %s' % (path, sql_cmd_prefix, log_path))
if not ret:
raise Exception(ret.stderr)
stdio.stop_loading('succeed')
......@@ -350,28 +231,9 @@ def run_test(plugin_context, db, cursor, *args, **kwargs):
if not ret:
raise Exception(ret.stderr)
stdio.print('Total Cost: %.1fs' % total_cost)
return plugin_context.return_true()
except KeyboardInterrupt:
stdio.stop_loading('fail')
except Exception as e:
stdio.stop_loading('fail')
stdio.exception(str(e))
finally:
try:
if optimization:
for config in tenant_variables_done[::-1]:
if config[3](config[1], config[2]):
sql = update_sql_t % config[0]
execute(cursor, sql, [config[2]])
for config in system_configs_done[::-1]:
if config[0] == 'sleep':
time.sleep(config[1])
continue
if config[3](config[1], config[2]):
sql = 'alter system set %s=%%s' % config[0]
if config[4]:
sql += tenant_q
execute(cursor, sql, [config[2]])
except:
pass
requests==2.24.0
rpmfile==1.0.8
paramiko==2.7.2
paramiko==2.10.1
backports.lzma==0.0.14
MySQL-python==1.2.5
ruamel.yaml.clib==0.2.2
......@@ -13,4 +13,5 @@ halo==0.0.30
pycryptodome==3.10.1
inspect2==0.1.2
six==1.16.0
pyinstaller==3.6
\ No newline at end of file
pyinstaller==3.6
bcrypt==3.1.7
\ No newline at end of file
rpmfile==1.0.8
paramiko==2.7.2
paramiko==2.10.1
requests==2.25.1
PyMySQL==1.0.2
ruamel.yaml==0.17.4
......@@ -11,3 +11,5 @@ pycryptodome==3.10.1
inspect2==0.1.2
six==1.16.0
pyinstaller>=4.3
bcrypt==4.0.0
configparser>=5.2.0
\ No newline at end of file
......@@ -23,6 +23,7 @@ from __future__ import absolute_import, division, print_function
import enum
import getpass
import os
import tempfile
import warnings
from glob import glob
......@@ -39,8 +40,9 @@ from multiprocessing.queues import Empty
from multiprocessing import Queue, Process
from multiprocessing.pool import ThreadPool
from tool import COMMAND_ENV, DirectoryUtil
from tool import COMMAND_ENV, DirectoryUtil, FileUtil
from _stdio import SafeStdio
from _environ import ENV_DISABLE_RSYNC
__all__ = ("SshClient", "SshConfig", "LocalClient", "ConcurrentExecutor")
......@@ -104,6 +106,9 @@ class ConcurrentExecutor(object):
self.futures.append(ret)
return ret
def size(self):
return len(self.futures)
@staticmethod
def execute(future):
client = SshClient(future.client.config, future.stdio)
......@@ -160,10 +165,23 @@ class LocalClient(SafeStdio):
if os.path.exists(os.path.dirname(local_dir)) and not glob(local_dir):
stdio.verbose("%s is empty" % local_dir)
return True
if LocalClient.execute_command('mkdir -p %s && cp -fr %s %s' % (remote_dir, local_dir, remote_dir), stdio=stdio):
if LocalClient.execute_command('mkdir -p %s && cp -frL %s %s' % (remote_dir, local_dir, remote_dir), stdio=stdio):
return True
return False
@staticmethod
def write_file(content, file_path, mode='w', stdio=None):
stdio.verbose('write {} to {}'.format(content, file_path))
try:
with FileUtil.open(file_path, mode, stdio=stdio) as f:
f.write(content)
f.flush()
return True
except:
stdio.exception('')
return False
@staticmethod
def get_file(local_path, remote_path, stdio=None):
return LocalClient.put_file(remote_path, local_path, stdio=stdio)
......@@ -231,7 +249,7 @@ class SshClient(SafeStdio):
stdio.verbose('%s@%s delete env %s' % (self.config.username, self.config.host, key))
del self.env[key]
self._update_env()
def __str__(self):
return '%s@%s:%d' % (self.config.username, self.config.host, self.config.port)
......@@ -336,12 +354,12 @@ class SshClient(SafeStdio):
verbose_msg = '%s execute: %s ' % (self.config, command)
stdio.verbose(verbose_msg, end='')
command = '%s %s;echo -e "\n$?\c"' % (self.env_str, command.strip(';'))
command = '%s %s;echo -e "\n$?\c"' % (self.env_str, command.strip(';').lstrip('\n'))
return self._execute_command(command, retry=3, timeout=timeout, stdio=stdio)
@property
def disable_rsync(self):
return COMMAND_ENV.get("OBD_DISABLE_RSYNC") == "1"
return COMMAND_ENV.get(ENV_DISABLE_RSYNC) == "1"
@property
def remote_transporter(self):
......@@ -367,6 +385,22 @@ class SshClient(SafeStdio):
return False
return self._put_file(local_path, remote_path, stdio=stdio)
def write_file(self, content, file_path, mode='w', stdio=None):
if self._is_local():
return LocalClient.write_file(content, file_path, mode, stdio)
return self._write_file(content, file_path, mode, stdio)
def _write_file(self, content, file_path, mode='w', stdio=None):
stdio.verbose('write {} to {}: {}'.format(content, self, file_path))
try:
with tempfile.NamedTemporaryFile(mode=mode) as f:
f.write(content)
f.flush()
return self.put_file(f.name, file_path, stdio=stdio)
except:
stdio.exception('')
return False
@property
def _put_file(self):
if self.remote_transporter == RemoteTransporter.RSYNC:
......
......@@ -31,6 +31,8 @@ import signal
import shutil
import re
import json
import hashlib
from io import BytesIO
from ruamel.yaml import YAML, YAMLContextManager, representer
......@@ -297,6 +299,25 @@ class FileUtil(object):
COPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024
@staticmethod
def checksum(target_path, stdio=None):
from ssh import LocalClient
if not os.path.isfile(target_path):
info = 'No such file: ' + target_path
if stdio:
getattr(stdio, 'error', print)(info)
return False
else:
raise IOError(info)
ret = LocalClient.execute_command('md5sum {}'.format(target_path), stdio=stdio)
if ret:
return ret.stdout.strip().split(' ')[0].encode('utf-8')
else:
m = hashlib.md5()
with open(target_path, 'rb') as f:
m.update(f.read())
return m.hexdigest().encode(sys.getdefaultencoding())
@staticmethod
def copy_fileobj(fsrc, fdst):
fsrc_read = fsrc.read
......@@ -465,6 +486,18 @@ class YamlLoader(YAML):
self.stdio.exception('Parsing error:\n%s' % e)
raise e
def loads(self, yaml_content):
try:
stream = BytesIO()
yaml_content = str(yaml_content).encode()
stream.write(yaml_content)
stream.seek(0)
return self.load(stream)
except Exception as e:
if getattr(self.stdio, 'exception', False):
self.stdio.exception('Parsing error:\n%s' % e)
raise e
def dump(self, data, stream=None, transform=None):
try:
return super(YamlLoader, self).dump(data, stream=stream, transform=transform)
......@@ -473,6 +506,20 @@ class YamlLoader(YAML):
self.stdio.exception('dump error:\n%s' % e)
raise e
def dumps(self, data, transform=None):
try:
stream = BytesIO()
self.dump(data, stream=stream, transform=transform)
stream.seek(0)
content = stream.read()
if sys.version_info.major == 2:
return content
return content.decode()
except Exception as e:
if getattr(self.stdio, 'exception', False):
self.stdio.exception('dumps error:\n%s' % e)
raise e
_KEYCRE = re.compile(r"\$(\w+)")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册