未验证 提交 334f1c66 编写于 作者: R Rongfeng Fu 提交者: GitHub

v1.1.0 (#31)

上级 a7e40da0
......@@ -395,8 +395,8 @@ class ClusterStopCommand(ClusterMirrorCommand):
class ClusterDestroyCommand(ClusterMirrorCommand):
def __init__(self):
super(ClusterDestroyCommand, self).__init__('destroy', 'Destroy a cluster had deployed')
self.parser.add_option('-f', '--force-kill', action='store_true', help="force kill when observer is running")
super(ClusterDestroyCommand, self).__init__('destroy', 'Destroy a deployed cluster.')
self.parser.add_option('-f', '--force-kill', action='store_true', help="Force kill the running observer process in the working directory.")
def _do_command(self, obd):
if self.cmds:
......
create resource unit box1 max_cpu 2, max_memory 1073741824, max_iops 128, max_disk_size '5G', max_session_num 64, MIN_CPU=1, MIN_MEMORY=1073741824, MIN_IOPS=128;
create resource pool pool1 unit = 'box1', unit_num = 1;
create tenant ora_tt replica_num = 1, resource_pool_list=('pool1') set ob_tcp_invited_nodes='%', ob_compatibility_mode='oracle';
alter tenant ora_tt set variables autocommit='on';
alter tenant ora_tt set variables nls_date_format='YYYY-MM-DD HH24:MI:SS';
alter tenant ora_tt set variables nls_timestamp_format='YYYY-MM-DD HH24:MI:SS.FF';
alter tenant ora_tt set variables nls_timestamp_tz_format='YYYY-MM-DD HH24:MI:SS.FF TZR TZD';
\ No newline at end of file
system sleep 5;
alter system set balancer_idle_time = '60s';
create user 'admin' IDENTIFIED BY 'admin';
use oceanbase;
create database if not exists test;
use test;
grant all on *.* to 'admin' WITH GRANT OPTION;
set global ob_enable_jit='OFF';
alter system set large_query_threshold='1s';
alter system set syslog_level='info';
alter system set syslog_io_bandwidth_limit='30M';
alter system set trx_try_wait_lock_timeout='0';
alter system set zone_merge_concurrency=0;
alter system set merger_completion_percentage=100;
alter system set trace_log_slow_query_watermark='500ms';
alter system set minor_freeze_times=30;
alter system set clog_sync_time_warn_threshold = '1000ms';
alter system set trace_log_slow_query_watermark = '10s';
alter system set enable_sql_operator_dump = 'false';
alter system set rpc_timeout=1000000000;
create resource unit tpch_box1 min_memory '100g', max_memory '100g', max_disk_size '1000g', max_session_num 64, min_cpu=9, max_cpu=9, max_iops 128, min_iops=128;
create resource pool tpch_pool1 unit = 'tpch_box1', unit_num = 1, zone_list = ('z1', 'z2', 'z3');
create tenant oracle replica_num = 3, resource_pool_list=('tpch_pool1') set ob_tcp_invited_nodes='%', ob_compatibility_mode='oracle';
alter tenant oracle set variables autocommit='on';
alter tenant oracle set variables nls_date_format='yyyy-mm-dd hh24:mi:ss';
alter tenant oracle set variables nls_timestamp_format='yyyy-mm-dd hh24:mi:ss.ff';
alter tenant oracle set variables nls_timestamp_tz_format='yyyy-mm-dd hh24:mi:ss.ff tzr tzd';
alter tenant oracle set variables ob_query_timeout=7200000000;
alter tenant oracle set variables ob_trx_timeout=7200000000;
alter tenant oracle set variables max_allowed_packet=67108864;
alter tenant oracle set variables ob_enable_jit='OFF';
alter tenant oracle set variables ob_sql_work_area_percentage=80;
alter tenant oracle set variables parallel_max_servers=512;
alter tenant oracle set variables parallel_servers_target=512;
select count(*) from oceanbase.__all_server group by zone limit 1 into @num;
set @sql_text = concat('alter resource pool tpch_pool1', ' unit_num = ', @num);
prepare stmt from @sql_text;
execute stmt;
deallocate prepare stmt;
alter session set current_schema = SYS;
create user root IDENTIFIED BY root;
grant all on *.* to root WITH GRANT OPTION;
grant dba to root;
create user test IDENTIFIED BY test;
grant all on *.* to test WITH GRANT OPTION;
grant dba to test;
grant all privileges to test;
create user admin IDENTIFIED BY admin;
grant all on *.* to admin WITH GRANT OPTION;
grant dba to admin;
grant all privileges to admin;
alter user LBACSYS account unlock;
grant all on *.* to LBACSYS WITH GRANT OPTION;
grant dba to LBACSYS;
alter user ORAAUDITOR account unlock;
grant all on *.* to ORAAUDITOR WITH GRANT OPTION;
grant dba to ORAAUDITOR;
alter system set "_enable_split_partition" = 'true';
grant read on directory dd to TEST;
set global secure_file_priv = '';
case 1: commit
show variables like 'autocommit';
Variable_name Value
autocommit ON
drop table if exists t1;
create table t1 (c1 int primary key, c2 varchar(1024));
set autocommit=0;
insert into t1 values (1, '中国');
select * from t1 where c1 = 1 for update;
c1 c2
1 中国
commit;
set autocommit=1;
select * from t1;
c1 c2
1 中国
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
def generate_config(plugin_context, deploy_config, *args, **kwargs):
cluster_config = plugin_context.cluster_config
clients = plugin_context.clients
stdio = plugin_context.stdio
success = True
stdio.start_loading('Generate obproxy configuration')
for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server)
if not server_config.get('home_path'):
stdio.error("obproxy %s: missing configuration 'home_path' in configuration file" % server)
success = False
continue
cluster_config.update_server_conf(server, 'enable_cluster_checkout', False)
if not success:
stdio.stop_loading('fail')
return
ob_cluster_config = None
for comp in ['oceanbase', 'oceanbase-ce']:
if comp in deploy_config.components:
ob_cluster_config = deploy_config.components[comp]
break
if ob_cluster_config:
root_servers = {}
cluster_name = ob_cluster_config.get_global_conf().get('appname')
for server in ob_cluster_config.servers:
config = ob_cluster_config.get_server_conf_with_default(server)
zone = config['zone']
cluster_name = cluster_name if cluster_name else config.get('appname')
if zone not in root_servers:
root_servers[zone] = '%s:%s' % (server.ip, config['mysql_port'])
rs_list = ';'.join([root_servers[zone] for zone in root_servers])
cluster_name = cluster_name if cluster_name else 'obcluster'
for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server)
if not server_config.get('rs_list'):
cluster_config.update_server_conf(server, 'rs_list', rs_list, False)
if not server_config.get('cluster_name'):
cluster_config.update_server_conf(server, 'cluster_name', cluster_name, False)
stdio.stop_loading('succeed')
return plugin_context.return_true()
\ No newline at end of file
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import re
def parse_size(size):
_bytes = 0
if not isinstance(size, str) or size.isdigit():
_bytes = int(size)
else:
units = {"B": 1, "K": 1<<10, "M": 1<<20, "G": 1<<30, "T": 1<<40}
match = re.match(r'([1-9][0-9]*)\s*([B,K,M,G,T])', size.upper())
_bytes = int(match.group(1)) * units[match.group(2)]
return _bytes
def formate_size(size, precision=1):
units = ['B', 'K', 'M', 'G', 'T', 'P']
idx = 0
if precision:
div = 1024.0
formate = '%.' + str(precision) + 'f%s'
else:
div = 1024
formate = '%d%s'
while idx < 5 and size >= 1024:
size /= 1024.0
idx += 1
return formate % (size, units[idx])
def create_tenant(plugin_context, cursor, *args, **kwargs):
def get_option(key, default=''):
value = getattr(options, key, default)
if not value:
value = default
return value
def error(*arg, **kwargs):
stdio.error(*arg, **kwargs)
stdio.stop_loading('fail')
def exception(*arg, **kwargs):
stdio.exception(*arg, **kwargs)
stdio.stop_loading('fail')
cluster_config = plugin_context.cluster_config
stdio = plugin_context.stdio
options = plugin_context.options
name = get_option('tenant_name', 'test')
unit_name = '%s_unit' % name
pool_name = '%s_pool' % name
stdio.start_loading('Create tenant %s' % name)
sql = "select tenant_name from oceanbase.gv$tenant where tenant_name = %s"
try:
stdio.verbose('execute sql: %s' % (sql % name))
cursor.execute(sql, [name])
if cursor.fetchone():
error('Tenant %s already exists' % name)
return
except:
exception('execute sql exception: %s' % (sql % name))
return
zone_list = get_option('zone_list', set())
zone_obs_num = {}
sql = "select zone, count(*) num from oceanbase.__all_server where status = 'active' group by zone"
try:
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
res = cursor.fetchall()
for row in res:
zone_obs_num[str(row['zone'])] = row['num']
except:
exception('execute sql exception: %s' % sql)
return
if not zone_list:
zone_list = zone_obs_num.keys()
if isinstance(zone_list, str):
zones = zone_list.replace(';', ',').split(',')
else:
zones = zone_list
zone_list = "('%s')" % "','".join(zones)
min_unit_num = min(zone_obs_num.items(),key=lambda x: x[1])[1]
unit_num = get_option('unit_num', min_unit_num)
if unit_num > min_unit_num:
return error('resource pool unit num is bigger than zone server count')
cpu_total = 0
mem_total = 0
disk_total = 0
sql = "SELECT min(cpu_total) cpu_total, min(mem_total) mem_total, min(disk_total) disk_total FROM oceanbase.__all_virtual_server_stat where zone in %s"
try:
sql = sql % zone_list
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
except:
exception('execute sql exception: %s' % sql)
return
resource = cursor.fetchone()
cpu_total = resource['cpu_total']
mem_total = resource['mem_total']
disk_total = resource['disk_total']
sql = 'select * from oceanbase.__all_resource_pool order by name'
try:
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
except:
exception('execute sql exception: %s' % sql)
return
units_id = set()
res = cursor.fetchall()
for row in res:
if str(row['name']) == unit_name:
unit_name += '1'
if row['tenant_id'] < 1:
continue
for zone in str(row['zone_list']).replace(';', ',').split(','):
if zone in zones:
units_id.add(row['unit_config_id'])
break
sql = 'select * from oceanbase.__all_unit_config order by name'
try:
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
except:
exception('execute sql exception: %s' % sql)
return
res = cursor.fetchall()
for row in res:
if str(row['name']) == unit_name:
unit_name += '1'
if row['unit_config_id'] in units_id:
cpu_total -= row['max_cpu']
mem_total -= row['max_memory']
# disk_total -= row['max_disk_size']
MIN_CPU = 2
MIN_MEMORY = 1073741824
MIN_DISK_SIZE = 536870912
MIN_IOPS = 128
MIN_SESSION_NUM = 64
if cpu_total < MIN_CPU:
return error('%s: resource not enough: cpu count less than %s' % (zone_list, MIN_CPU))
if mem_total < MIN_MEMORY:
return error('%s: resource not enough: memory less than %s' % (zone_list, formate_size(MIN_MEMORY)))
if disk_total < MIN_DISK_SIZE:
return error('%s: resource not enough: disk space less than %s' % (zone_list, formate_size(MIN_DISK_SIZE)))
max_cpu = get_option('max_cpu', cpu_total)
max_memory = parse_size(get_option('max_memory', mem_total))
max_iops = get_option('max_iops', MIN_IOPS)
max_disk_size = parse_size(get_option('max_disk_size', disk_total))
max_session_num = get_option('max_session_num', MIN_SESSION_NUM)
min_cpu = get_option('min_cpu', max_cpu)
min_memory = parse_size(get_option('min_memory', max_memory))
min_iops = get_option('min_iops', max_iops)
if cpu_total < max_cpu:
return error('resource not enough: cpu (Avail: %s, Need: %s)' % (max_cpu, cpu_total))
if mem_total < max_memory:
return error('resource not enough: memory (Avail: %s, Need: %s)' % (formate_size(max_memory), formate_size(mem_total)))
if disk_total < max_disk_size:
return error('resource not enough: disk space (Avail: %s, Need: %s)' % (formate_size(max_disk_size), formate_size(disk_total)))
if max_iops < MIN_IOPS:
return error('max_iops must greater than %d' % MIN_IOPS)
if max_session_num < MIN_SESSION_NUM:
return error('max_session_num must greater than %d' % MIN_SESSION_NUM)
if max_cpu < min_cpu:
return error('min_cpu must less then max_cpu')
if max_memory < min_memory:
return error('min_memory must less then max_memory')
if max_iops < min_iops:
return error('min_iops must less then max_iops')
zone_num = len(zones)
charset = get_option('charset', '')
collate = get_option('collate', '')
replica_num = get_option('replica_num', zone_num)
logonly_replica_num = get_option('logonly_replica_num', 0)
tablegroup = get_option('tablegroup', '')
primary_zone = get_option('primary_zone', 'RANDOM')
locality = get_option('locality', '')
variables = get_option('variables', '')
if replica_num == 0:
replica_num = zone_num
elif replica_num > zone_num:
return error('replica_num cannot be greater than zone num (%s)' % zone_num)
if not primary_zone:
primary_zone = 'RANDOM'
if logonly_replica_num > replica_num:
return error('logonly_replica_num cannot be greater than replica_num (%s)' % replica_num)
# create resource unit
sql = 'create resource unit %s max_cpu %.1f, max_memory %d, max_iops %d, max_disk_size %d, max_session_num %d, min_cpu %.1f, min_memory %d, min_iops %d'
try:
sql = sql % (unit_name, max_cpu, max_memory, max_iops, max_disk_size, max_session_num, min_cpu, min_memory, min_iops)
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
except:
exception('faild to crate unit, execute sql exception: %s' % sql)
return
# create resource pool
sql = "create resource pool %s unit='%s', unit_num=%d, zone_list=%s" % (pool_name, unit_name, unit_num, zone_list)
try:
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
except:
exception('faild to crate pool, execute sql exception: %s' % sql)
return
# create tenant
sql = "create tenant %s replica_num=%d,zone_list=%s,primary_zone='%s',resource_pool_list=('%s')"
sql = sql % (name, replica_num, zone_list, primary_zone, pool_name)
if charset:
sql += ", charset = '%s'" % charset
if collate:
sql += ", collate = '%s'" % collate
if logonly_replica_num:
sql += ", logonly_replica_num = %d" % logonly_replica_num
if tablegroup:
sql += ", default tablegroup ='%s'" % tablegroup
if locality:
sql += ", locality = '%s'" % locality
if variables:
sql += "set %s" % variables
try:
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
except:
exception('faild to crate tenant, execute sql exception: %s' % sql)
stdio.stop_loading('succeed')
return plugin_context.return_true()
\ No newline at end of file
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
def drop_tenant(plugin_context, cursor, *args, **kwargs):
def error(*arg, **kwargs):
stdio.error(*arg, **kwargs)
stdio.stop_loading('fail')
def exception(*arg, **kwargs):
stdio.exception(*arg, **kwargs)
stdio.stop_loading('fail')
cluster_config = plugin_context.cluster_config
stdio = plugin_context.stdio
options = plugin_context.options
tenant_name = getattr(options, 'tenant_name', '')
if not tenant_name:
error('Pease set tenant name')
return
elif tenant_name == 'sys':
error('Prohibit deleting sys tenant')
return
stdio.start_loading('Drop tenant %s' % tenant_name)
tenant = None
sql = "select * from oceanbase.gv$tenant where tenant_name = %s"
try:
stdio.verbose('execute sql: %s' % (sql % tenant_name))
cursor.execute(sql, [tenant_name])
tenant = cursor.fetchone()
if not tenant:
error('No such Tenant %s' % tenant_name)
return
except:
exception('execute sql exception: %s' % (sql % tenant_name))
return
pool = None
sql = "select * from oceanbase.__all_resource_pool where tenant_id = %d" % tenant['tenant_id']
try:
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
pool = cursor.fetchone()
sql = "drop tenant %s FORCE" % tenant_name
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
if not pool:
return
sql = "drop resource pool %s" % pool['name']
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
except:
exception('execute sql exception: %s' % sql)
return
sql = "select * from oceanbase.__all_unit_config where unit_config_id = %d" % pool['unit_config_id']
try:
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
unit = cursor.fetchone()
if not unit:
return
sql = "drop resource unit %s" % unit['name']
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
except:
exception('execute sql exception: %s' % sql)
return
stdio.stop_loading('succeed')
return plugin_context.return_true()
\ No newline at end of file
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import re, os
def parse_size(size):
_bytes = 0
if not isinstance(size, str) or size.isdigit():
_bytes = int(size)
else:
units = {"B": 1, "K": 1<<10, "M": 1<<20, "G": 1<<30, "T": 1<<40}
match = re.match(r'([1-9][0-9]*)\s*([B,K,M,G,T])', size.upper())
_bytes = int(match.group(1)) * units[match.group(2)]
return _bytes
def format_size(size, precision=1):
units = ['B', 'K', 'M', 'G']
units_num = len(units) - 1
idx = 0
if precision:
div = 1024.0
formate = '%.' + str(precision) + 'f%s'
limit = 1024
else:
div = 1024
limit = 1024
formate = '%d%s'
while idx < units_num and size >= limit:
size /= div
idx += 1
return formate % (size, units[idx])
def get_system_memory(memory_limit):
if memory_limit <= (64 << 30):
system_memory = memory_limit * 0.5
else:
system_memory = memory_limit * 0.4
system_memory = max(4 << 30, system_memory)
return format_size(system_memory, 0)
def generate_config(plugin_context, deploy_config, *args, **kwargs):
cluster_config = plugin_context.cluster_config
clients = plugin_context.clients
stdio = plugin_context.stdio
success = True
stdio.start_loading('Generate observer configuration')
if not cluster_config.get_global_conf().get('appname'):
default_appname = 'obcluster'
if 'obproxy' in deploy_config.components:
obproxy_cluster_config = deploy_config.components['obproxy']
cluster_name = obproxy_cluster_config.get_global_conf().get('cluster_name')
if not cluster_name:
for server in obproxy_cluster_config.servers:
server_config = obproxy_cluster_config.get_server_conf(server)
if server_config.get('cluster_name'):
default_appname = server_config['cluster_name']
break
cluster_config.update_global_conf('appname', default_appname, False)
MIN_MEMORY = 8 << 30
for server in cluster_config.servers:
ip = server.ip
client = clients[server]
server_config = cluster_config.get_server_conf_with_default(server)
user_server_config = cluster_config.get_original_server_conf(server)
if not server_config.get('home_path'):
stdio.error("observer %s: missing configuration 'home_path' in configuration file" % server)
success = False
continue
if user_server_config.get('devname') is None:
if client.is_localhost():
cluster_config.update_server_conf(server, 'devname', 'lo')
else:
devinfo = client.execute_command('cat /proc/net/dev').stdout
interfaces = re.findall('\n\s+(\w+):', devinfo)
for interface in interfaces:
if interface == 'lo':
continue
if client.execute_command('ping -W 1 -c 1 -I %s %s' % (interface, ip)):
cluster_config.update_server_conf(server, 'devname', interface)
break
max_syslog_file_count_default = 4
if user_server_config.get('syslog_level') is None:
cluster_config.update_server_conf(server, 'syslog_level', 'INFO', False)
if user_server_config.get('enable_syslog_recycle') is None:
cluster_config.update_server_conf(server, 'enable_syslog_recycle', True, False)
if user_server_config.get('enable_syslog_wf') is None:
cluster_config.update_server_conf(server, 'enable_syslog_wf', True, False)
if user_server_config.get('max_syslog_file_count') is None:
cluster_config.update_server_conf(server, 'max_syslog_file_count', max_syslog_file_count_default, False)
if server_config.get('cluster_id') is None:
cluster_config.update_server_conf(server, 'cluster_id', 1, False)
dirs = {"home_path": server_config['home_path']}
dirs["data_dir"] = server_config['data_dir'] if server_config.get('data_dir') else os.path.join(server_config['home_path'], 'store')
dirs["redo_dir"] = server_config['redo_dir'] if server_config.get('redo_dir') else dirs["data_dir"]
dirs["clog_dir"] = server_config['clog_dir'] if server_config.get('clog_dir') else os.path.join(dirs["redo_dir"], 'clog')
# memory
auto_set_memory = False
if user_server_config.get('memory_limit_percentage'):
ret = client.execute_command('cat /proc/meminfo')
if ret:
total_memory = 0
for k, v in re.findall('(\w+)\s*:\s*(\d+\s*\w+)', ret.stdout):
if k == 'MemTotal':
total_memory = parse_size(str(v))
memory_limit = int(total_memory * user_server_config.get('memory_limit_percentage') / 100)
else:
if not server_config.get('memory_limit'):
ret = client.execute_command('cat /proc/meminfo')
if ret:
free_memory = 0
for k, v in re.findall('(\w+)\s*:\s*(\d+\s*\w+)', ret.stdout):
if k == 'MemAvailable':
free_memory = parse_size(str(v))
memory_limit = free_memory
if memory_limit < MIN_MEMORY:
stdio.errorn('(%s) not enough memory. (Free: %s, Need: %s)' % (ip, format_size(free_memory), format_size(MIN_MEMORY)))
success = False
continue
memory_limit = max(MIN_MEMORY, memory_limit * 0.9)
server_config['memory_limit'] = format_size(memory_limit, 0)
cluster_config.update_server_conf(server, 'memory_limit', server_config['memory_limit'], False)
else:
stdio.error("%s: fail to get memory info.\nPlease configure 'memory_limit' manually in configuration file")
success = False
continue
else:
try:
memory_limit = parse_size(server_config.get('memory_limit'))
except:
stdio.error('memory_limit must be an integer')
return
auto_set_memory = True
auto_set_system_memory = False
if not user_server_config.get('system_memory'):
auto_set_system_memory = True
cluster_config.update_server_conf(server, 'system_memory', get_system_memory(memory_limit), False)
# cpu
if not user_server_config.get('cpu_count'):
ret = client.execute_command("grep -e 'processor\s*:' /proc/cpuinfo | wc -l")
if ret and ret.stdout.isdigit():
cpu_num = int(ret.stdout)
server_config['cpu_count'] = max(16, int(cpu_num * 0.8))
else:
server_config['cpu_count'] = 16
cluster_config.update_server_conf(server, 'cpu_count', max(16, server_config['cpu_count']), False)
# disk
if not user_server_config.get('datafile_size') or not user_server_config.get('datafile_disk_percentage'):
disk = {'/': 0}
ret = client.execute_command('df --output=size,avail,target')
if ret:
for total, avail, path in re.findall('(\d+)\s+(\d+)\s+(.+)', ret.stdout):
disk[path] = {
'total': int(total) << 10,
'avail': int(avail) << 10,
'need': 0,
}
mounts = {}
for key in dirs:
path = dirs[key]
kp = '/'
for p in disk:
if p in path:
if len(p) > len(kp):
kp = p
mounts[path] = kp
data_dir_mount = mounts[dirs['data_dir']]
data_dir_disk = disk[data_dir_mount]
clog_dir_mount = mounts[dirs['clog_dir']]
clog_dir_disk = disk[clog_dir_mount]
if clog_dir_mount == data_dir_mount:
clog_disk_utilization_threshold_max = 95
disk_free = data_dir_disk['avail']
real_disk_total = data_dir_disk['total']
if mounts[dirs['home_path']] == data_dir_mount:
if user_server_config.get('enable_syslog_recycle') is False:
log_size = real_disk_total * 0.1
else:
log_size = (256 << 20) * user_server_config.get('max_syslog_file_count', max_syslog_file_count_default) * 4
else:
log_size = 0
clog_padding_size = int(real_disk_total * (1 - clog_disk_utilization_threshold_max / 100.0 * 0.8))
padding_size = clog_padding_size + log_size
disk_total = real_disk_total - padding_size
disk_used = real_disk_total - disk_free
clog_disk_size = memory_limit * 4
min_data_file_size = memory_limit * 3
clog_size = int(round(clog_disk_size * 0.64))
min_need = padding_size + clog_disk_size + min_data_file_size
disk_flag = False
if min_need > disk_free:
if auto_set_memory:
if auto_set_system_memory:
min_size = MIN_MEMORY * 7
else:
min_size = max(MIN_MEMORY, parse_size(user_server_config.get('system_memory')) * 2) * 7
min_need = padding_size + min_size
if min_need <= disk_free:
memory_limit = (disk_free - padding_size) / 7
server_config['memory_limit'] = format_size(memory_limit, 0)
cluster_config.update_server_conf(server, 'memory_limit', server_config['memory_limit'], False)
memory_limit = parse_size(server_config['memory_limit'])
clog_disk_size = memory_limit * 4
clog_size = int(round(clog_disk_size * 0.64))
if auto_set_system_memory:
cluster_config.update_server_conf(server, 'system_memory', get_system_memory(memory_limit), False)
disk_flag = True
else:
disk_flag = True
if not disk_flag:
stdio.error('(%s) %s not enough disk space. (Avail: %s, Need: %s). Use `redo_dir` to set other disk for clog' % (ip, kp, format_size(disk_free), format_size(min_need)))
success = False
continue
datafile_size_format = format_size(disk_total - clog_disk_size - disk_used, 0)
datafile_size = parse_size(datafile_size_format)
clog_disk_utilization_threshold = max(80, int(100.0 * (disk_used + datafile_size + padding_size + clog_disk_size * 0.8) / real_disk_total))
clog_disk_utilization_threshold = min(clog_disk_utilization_threshold, clog_disk_utilization_threshold_max)
clog_disk_usage_limit_percentage = min(int(clog_disk_utilization_threshold / 80.0 * 95), 98)
cluster_config.update_server_conf(server, 'datafile_size', datafile_size_format, False)
cluster_config.update_server_conf(server, 'clog_disk_utilization_threshold', clog_disk_utilization_threshold, False)
cluster_config.update_server_conf(server, 'clog_disk_usage_limit_percentage', clog_disk_usage_limit_percentage, False)
else:
datafile_size = max(5 << 30, data_dir_disk['avail'] * 0.8, 0)
cluster_config.update_server_conf(server, 'datafile_size', format_size(datafile_size, 0), False)
if success:
stdio.stop_loading('succeed')
return plugin_context.return_true()
stdio.stop_loading('fail')
\ No newline at end of file
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import re
import os
from time import sleep
try:
import subprocess32 as subprocess
except:
import subprocess
from ssh import LocalClient
stdio = None
def parse_size(size):
_bytes = 0
if not isinstance(size, str) or size.isdigit():
_bytes = int(size)
else:
units = {"B": 1, "K": 1<<10, "M": 1<<20, "G": 1<<30, "T": 1<<40}
match = re.match(r'([1-9][0-9]*)\s*([B,K,M,G,T])', size.upper())
_bytes = int(match.group(1)) * units[match.group(2)]
return _bytes
def format_size(size, precision=1):
units = ['B', 'K', 'M', 'G']
units_num = len(units) - 1
idx = 0
if precision:
div = 1024.0
formate = '%.' + str(precision) + 'f%s'
limit = 1024
else:
div = 1024
limit = 1024
formate = '%d%s'
while idx < units_num and size >= limit:
size /= div
idx += 1
return formate % (size, units[idx])
def exec_cmd(cmd):
stdio.verbose('execute: %s' % cmd)
process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while process.poll() is None:
line = process.stdout.readline()
line = line.strip()
if line:
stdio.print(line.decode("utf8", 'ignore'))
return process.returncode == 0
def run_test(plugin_context, db, cursor, odp_db, odp_cursor=None, *args, **kwargs):
def get_option(key, default=''):
value = getattr(options, key, default)
if not value:
value = default
return value
def execute(cursor, query, args=None):
msg = query % tuple(args) if args is not None else query
stdio.verbose('execute sql: %s' % msg)
# stdio.verbose("query: %s. args: %s" % (query, args))
try:
cursor.execute(query, args)
return cursor.fetchone()
except:
msg = 'execute sql exception: %s' % msg
stdio.exception(msg)
raise Exception(msg)
global stdio
cluster_config = plugin_context.cluster_config
stdio = plugin_context.stdio
options = plugin_context.options
optimization = get_option('optimization', 1) > 0
host = get_option('host', '127.0.0.1')
port = get_option('port', 2881)
mysql_db = get_option('database', 'test')
user = get_option('user', 'root')
tenant_name = get_option('tenant', 'test')
password = get_option('password', '')
table_size = get_option('table_size', 10000)
tables = get_option('tables', 32)
threads = get_option('threads', 150)
time = get_option('time', 60)
interval = get_option('interval', 10)
events = get_option('events', 0)
rand_type = get_option('rand_type', None)
skip_trx = get_option('skip_trx', '').lower()
percentile = get_option('percentile', None)
script_name = get_option('script_name', 'point_select.lua')
obclient_bin = get_option('obclient_bin', 'obclient')
sysbench_bin = get_option('sysbench_bin', 'sysbench')
sysbench_script_dir = get_option('sysbench_script_dir', '/usr/sysbench/share/sysbench')
ret = LocalClient.execute_command('%s --help' % obclient_bin, stdio=stdio)
if not ret:
stdio.error('%s\n%s is not an executable file. Please use `--obclient-bin` to set.\nYou may not have obclient installed' % (ret.stderr, obclient_bin))
return
ret = LocalClient.execute_command('%s --help' % sysbench_bin, stdio=stdio)
if not ret:
stdio.error('%s\n%s is not an executable file. Please use `--sysbench-bin` to set.\nYou may not have ob-sysbench installed' % (ret.stderr, sysbench_bin))
return
if not script_name.endswith('.lua'):
script_name += '.lua'
script_path = os.path.join(sysbench_script_dir, script_name)
if not os.path.exists(script_path):
stdio.error('No such file %s. Please use `--sysbench-script-dir` to set sysbench scrpit dir.\nYou may not have ob-sysbench installed' % script_path)
return
sql = "select * from oceanbase.gv$tenant where tenant_name = %s"
max_cpu = 2
tenant_meta = None
try:
stdio.verbose('execute sql: %s' % (sql % tenant_name))
cursor.execute(sql, [tenant_name])
tenant_meta = cursor.fetchone()
if not tenant_meta:
stdio.error('Tenant %s not exists. Use `obd cluster tenant create` to create tenant.' % tenant_name)
return
sql = "select * from oceanbase.__all_resource_pool where tenant_id = %d" % tenant_meta['tenant_id']
pool = execute(cursor, sql)
sql = "select * from oceanbase.__all_unit_config where unit_config_id = %d" % pool['unit_config_id']
max_cpu = execute(cursor, sql)['max_cpu']
except:
return
sql = "select * from oceanbase.__all_user where user_name = %s"
try:
stdio.verbose('execute sql: %s' % (sql % user))
cursor.execute(sql, [user])
if not cursor.fetchone():
stdio.error('User %s not exists.' % user)
return
except:
return
exec_sql_cmd = "%s -h%s -P%s -u%s@%s %s -A -e" % (obclient_bin, host, port, user, tenant_name, "-p'%s'" if password else '')
ret = LocalClient.execute_command('%s "%s"' % (exec_sql_cmd, 'select version();'), stdio=stdio)
if not ret:
stdio.error(ret.stderr)
return
sql = ''
odp_configs = [
# [配置名, 新值, 旧值, 替换条件: lambda n, o: n != o]
['enable_compression_protocol', False, False, lambda n, o: n != o],
['proxy_mem_limited', format_size(min(max(threads * (8 << 10), 2 << 30), 4 << 30), 0), 0, lambda n, o: parse_size(n) > parse_size(o)],
['enable_prometheus', False, False, lambda n, o: n != o],
['enable_metadb_used', False, False, lambda n, o: n != o],
['enable_standby', False, False, lambda n, o: n != o],
['enable_strict_stat_time', False, False, lambda n, o: n != o],
['use_local_dbconfig', True, True, lambda n, o: n != o],
]
system_configs = [
# [配置名, 新值, 旧值, 替换条件: lambda n, o: n != o, 是否是租户级]
['enable_auto_leader_switch', False, False, lambda n, o: n != o, False],
['enable_one_phase_commit', False, False, lambda n, o: n != o, False],
['weak_read_version_refresh_interval', '5s', '5s', lambda n, o: n != o, False],
['syslog_level', 'PERF', 'PERF', lambda n, o: n != o, False],
['max_syslog_file_count', 100, 100, lambda n, o: n != o, False],
['enable_syslog_recycle', True, True, lambda n, o: n != o, False],
['trace_log_slow_query_watermark', '10s', '10s', lambda n, o: n != o, False],
['large_query_threshold', '1s', '1s', lambda n, o: n != o, False],
['clog_sync_time_warn_threshold', '200ms', '200ms', lambda n, o: n != o, False],
['syslog_io_bandwidth_limit', '10M', '10M', lambda n, o: n != o, False],
['enable_sql_audit', False, False, lambda n, o: n != o, False],
['sleep', 1],
['enable_perf_event', False, False, lambda n, o: n != o, False],
['clog_max_unconfirmed_log_count', 5000, 5000, lambda n, o: n != o, False],
['autoinc_cache_refresh_interval', '86400s', '86400s', lambda n, o: n != o, False],
['enable_early_lock_release', False, False, lambda n, o: n != o, True],
['default_compress_func', 'lz4_1.0', 'lz4_1.0', lambda n, o: n != o, False],
['_clog_aggregation_buffer_amount', 4, 4, lambda n, o: n != o, False],
['_flush_clog_aggregation_buffer_timeout', '1ms', '1ms', lambda n, o: n != o, False],
]
if odp_cursor and optimization:
try:
for config in odp_configs:
sql = 'show proxyconfig like "%s"' % config[0]
ret = execute(odp_cursor, sql)
if ret:
config[2] = ret['value']
if config[3](config[1], config[2]):
sql = 'alter proxyconfig set %s=%%s' % config[0]
execute(odp_cursor, sql, [config[1]])
except:
return
tenant_q = ' tenant="%s"' % tenant_name
server_num = len(cluster_config.servers)
if optimization:
try:
for config in system_configs:
if config[0] == 'sleep':
sleep(config[1])
continue
sql = 'show parameters like "%s"' % config[0]
if config[4]:
sql += tenant_q
ret = execute(cursor, sql)
if ret:
config[2] = ret['value']
if config[3](config[1], config[2]):
sql = 'alter system set %s=%%s' % config[0]
if config[4]:
sql += tenant_q
execute(cursor, sql, [config[1]])
sql = "select count(1) server_num from oceanbase.__all_server where status = 'active'"
ret = execute(cursor, sql)
if ret:
server_num = ret.get("server_num", server_num)
except:
return
parallel_max_servers = max_cpu * 10
parallel_servers_target = int(parallel_max_servers * server_num * 0.8)
tenant_variables = [
# [变量名, 新值, 旧值, 替换条件: lambda n, o: n != o]
['ob_timestamp_service', 1, 1, lambda n, o: n != o],
['autocommit', 1, 1, lambda n, o: n != o],
['ob_query_timeout', 36000000000, 36000000000, lambda n, o: n != o],
['ob_trx_timeout', 36000000000, 36000000000, lambda n, o: n != o],
['max_allowed_packet', 67108864, 67108864, lambda n, o: n != o],
['ob_sql_work_area_percentage', 100, 100, lambda n, o: n != o],
['parallel_max_servers', parallel_max_servers, parallel_max_servers, lambda n, o: n != o],
['parallel_servers_target', parallel_servers_target, parallel_servers_target, lambda n, o: n != o]
]
select_sql_t = "select value from oceanbase.__all_virtual_sys_variable where tenant_id = %d and name = '%%s'" % tenant_meta['tenant_id']
update_sql_t = "ALTER TENANT %s SET VARIABLES %%s = %%%%s" % tenant_name
try:
for config in tenant_variables:
sql = select_sql_t % config[0]
ret = execute(cursor, sql)
if ret:
value = ret['value']
config[2] = int(value) if isinstance(value, str) or value.isdigit() else value
if config[3](config[1], config[2]):
sql = update_sql_t % config[0]
execute(cursor, sql, [config[1]])
except:
return
sysbench_cmd = "cd %s; %s %s --mysql-host=%s --mysql-port=%s --mysql-user=%s@%s --mysql-db=%s" % (sysbench_script_dir, sysbench_bin, script_name, host, port, user, tenant_name, mysql_db)
if password:
sysbench_cmd += ' --mysql-password=%s' % password
if table_size:
sysbench_cmd += ' --table_size=%s' % table_size
if tables:
sysbench_cmd += ' --tables=%s' % tables
if threads:
sysbench_cmd += ' --threads=%s' % threads
if time:
sysbench_cmd += ' --time=%s' % time
if interval:
sysbench_cmd += ' --report-interval=%s' % interval
if events:
sysbench_cmd += ' --events=%s' % events
if rand_type:
sysbench_cmd += ' --rand-type=%s' % rand_type
if skip_trx in ['on', 'off']:
sysbench_cmd += ' --skip_trx=%s' % skip_trx
if percentile:
sysbench_cmd += ' --percentile=%s' % percentile
try:
if exec_cmd('%s cleanup' % sysbench_cmd) and exec_cmd('%s prepare' % sysbench_cmd) and exec_cmd('%s --db-ps-mode=disable run' % sysbench_cmd):
return plugin_context.return_true()
except KeyboardInterrupt:
pass
except:
stdio.exception('')
finally:
try:
if optimization:
for config in tenant_variables[::-1]:
if config[3](config[1], config[2]):
sql = update_sql_t % config[0]
execute(cursor, sql, [config[2]])
for config in system_configs[::-1]:
if config[0] == 'sleep':
sleep(config[1])
continue
if config[3](config[1], config[2]):
sql = 'alter system set %s=%%s' % config[0]
if config[4]:
sql += tenant_q
execute(cursor, sql, [config[2]])
if odp_cursor:
for config in odp_configs[::-1]:
if config[3](config[1], config[2]):
sql = 'alter proxyconfig set %s=%%s' % config[0]
execute(odp_cursor, sql, [config[2]])
except:
pass
......@@ -6,20 +6,24 @@ fi
function _obd_complete_func
{
local cur prev cmd obd_cmd cluster_cmd mirror_cmd test_cmd
local cur prev cmd obd_cmd cluster_cmd tenant_cmd mirror_cmd test_cmd
COMPREPLY=()
cur="${COMP_WORDS[COMP_CWORD]}"
prev="${COMP_WORDS[COMP_CWORD-1]}"
obd_cmd="mirror cluster test update"
cluster_cmd="start deploy redeploy restart reload destroy stop edit-config list display upgrade"
obd_cmd="mirror cluster test update repo"
cluster_cmd="autodeploy tenant start deploy redeploy restart reload destroy stop edit-config list display upgrade"
tenant_cmd="create drop"
mirror_cmd="clone create list update"
test_cmd="mysqltest"
repo_cmd="list"
test_cmd="mysqltest sysbench"
if [[ ${cur} == * ]] ; then
case "${prev}" in
obd);&
test);&
cluster);&
mirror)
tenant);&
mirror);&
repo)
cmd=$(eval echo \$"${prev}_cmd")
COMPREPLY=( $(compgen -W "${cmd}" -- ${cur}) )
;;
......@@ -32,6 +36,18 @@ function _obd_complete_func
compopt -o nospace
COMPREPLY=( $(compgen -o filenames -W "${res}" -- ${cur}) )
;;
*)
if [ "$prev" == "list" ]; then
return 0
else
prev="${COMP_WORDS[COMP_CWORD-2]}"
obd_home=${OBD_HOME:-~/.obd}
if [[ "$prev" == "cluster" || "$prev" == "test" || "$prev" == "tenant" ]]; then
res=`ls -p $obd_home/cluster 2>/dev/null | sed "s#/##"`
compopt -o nospace
COMPREPLY=( $(compgen -o filenames -W "${res}" -- ${cur}) )
fi
fi
esac
return 0
fi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册