未验证 提交 5b551d65 编写于 作者: R Rongfeng Fu 提交者: GitHub

v1.1 (#28)

* 1.1

* auto deploy example

* update sysbench script name
上级 f42a7180
此差异已折叠。
...@@ -142,7 +142,9 @@ class ClusterConfig(object): ...@@ -142,7 +142,9 @@ class ClusterConfig(object):
def get_unconfigured_require_item(self, server): def get_unconfigured_require_item(self, server):
items = [] items = []
config = self.get_server_conf(server) config = self.get_server_conf(server)
for key in self._default_conf: for key in self._temp_conf:
if not self._temp_conf[key].require:
continue
if key in config: if key in config:
continue continue
items.append(key) items.append(key)
...@@ -176,7 +178,7 @@ class ClusterConfig(object): ...@@ -176,7 +178,7 @@ class ClusterConfig(object):
self._default_conf = {} self._default_conf = {}
self._temp_conf = temp_conf self._temp_conf = temp_conf
for key in self._temp_conf: for key in self._temp_conf:
if self._temp_conf[key].require: if self._temp_conf[key].require and self._temp_conf[key].default is not None:
self._default_conf[key] = self._temp_conf[key].default self._default_conf[key] = self._temp_conf[key].default
self.set_global_conf(self._global_conf) # 更新全局配置 self.set_global_conf(self._global_conf) # 更新全局配置
...@@ -204,6 +206,9 @@ class ClusterConfig(object): ...@@ -204,6 +206,9 @@ class ClusterConfig(object):
self._cache_server[server] = conf self._cache_server[server] = conf
return self._cache_server[server] return self._cache_server[server]
def get_original_server_conf(self, server):
return self._server_conf.get(server)
class DeployStatus(Enum): class DeployStatus(Enum):
...@@ -246,6 +251,7 @@ class DeployConfig(object): ...@@ -246,6 +251,7 @@ class DeployConfig(object):
def __init__(self, yaml_path, yaml_loader=yaml): def __init__(self, yaml_path, yaml_loader=yaml):
self._user = None self._user = None
self.unuse_lib_repository = False self.unuse_lib_repository = False
self.auto_create_tenant = False
self.components = {} self.components = {}
self._src_data = None self._src_data = None
self.yaml_path = yaml_path self.yaml_path = yaml_path
...@@ -263,6 +269,13 @@ class DeployConfig(object): ...@@ -263,6 +269,13 @@ class DeployConfig(object):
return self._dump() return self._dump()
return True return True
def set_auto_create_tenant(self, status):
if self.auto_create_tenant != status:
self.auto_create_tenant = status
self._src_data['auto_create_tenant'] = status
return self._dump()
return True
def _load(self): def _load(self):
try: try:
with open(self.yaml_path, 'rb') as f: with open(self.yaml_path, 'rb') as f:
...@@ -278,7 +291,9 @@ class DeployConfig(object): ...@@ -278,7 +291,9 @@ class DeployConfig(object):
)) ))
elif key == 'unuse_lib_repository': elif key == 'unuse_lib_repository':
self.unuse_lib_repository = self._src_data['unuse_lib_repository'] self.unuse_lib_repository = self._src_data['unuse_lib_repository']
else: elif key == 'auto_create_tenant':
self.auto_create_tenant = self._src_data['auto_create_tenant']
elif issubclass(type(self._src_data[key]), dict):
self._add_component(key, self._src_data[key]) self._add_component(key, self._src_data[key])
except: except:
pass pass
......
...@@ -299,6 +299,18 @@ class Repository(PackageInfo): ...@@ -299,6 +299,18 @@ class Repository(PackageInfo):
return True return True
class RepositoryVO(object):
def __init__(self, name, version, release, arch, md5, path, tags=[]):
self.name = name
self.version = version
self.release = release
self.arch = arch
self.md5 = md5
self.path = path
self.tags = tags
class ComponentRepository(object): class ComponentRepository(object):
def __init__(self, name, repository_dir, stdio=None): def __init__(self, name, repository_dir, stdio=None):
...@@ -368,6 +380,15 @@ class ComponentRepository(object): ...@@ -368,6 +380,15 @@ class ComponentRepository(object):
return self.get_repository_by_version(version, tag) return self.get_repository_by_version(version, tag)
return None return None
def get_repositories(self, version=None):
if not version:
version = '*'
repositories = []
path_partten = os.path.join(self.repository_dir, version, '*')
for path in glob(path_partten):
repositories.append(Repository(self.name, path, self.stdio))
return repositories
class RepositoryManager(Manager): class RepositoryManager(Manager):
...@@ -379,21 +400,52 @@ class RepositoryManager(Manager): ...@@ -379,21 +400,52 @@ class RepositoryManager(Manager):
self.repositories = {} self.repositories = {}
self.component_repositoies = {} self.component_repositoies = {}
def get_repositoryies(self, name): def _get_repository_vo(self, repository):
repositories = {} return RepositoryVO(
path_partten = os.path.join(self.path, name, '*') repository.name,
repository.version,
repository.release,
repository.arch,
repository.md5,
repository.repository_dir,
[]
)
def get_repositories_view(self, name=None):
if name:
repositories = self.get_component_repositoy(name).get_repositories()
else:
repositories = []
path_partten = os.path.join(self.path, '*')
for path in glob(path_partten): for path in glob(path_partten):
_, version = os.path.split(path) _, name = os.path.split(path)
Repository = Repository(name, path, version, self.stdio) repositories += self.get_component_repositoy(name).get_repositories()
repositories_vo = {}
for repository in repositories:
if repository.is_shadow_repository():
repository_ist = self.get_instance_repository_from_shadow(repository)
if repository_ist not in repositories_vo:
repositories_vo[repository_ist] = self._get_repository_vo(repository)
_, tag = os.path.split(repository.repository_dir)
repositories_vo[repository_ist].tags.append(tag)
elif repository not in repositories_vo:
repositories_vo[repository] = self._get_repository_vo(repository)
return list(repositories_vo.values())
def get_component_repositoy(self, name):
if name not in self.component_repositoies:
path = os.path.join(self.path, name)
self.component_repositoies[name] = ComponentRepository(name, path, self.stdio)
return self.component_repositoies[name]
def get_repository_by_version(self, name, version, tag=None, instance=True): def get_repository_by_version(self, name, version, tag=None, instance=True):
if not tag: if not tag:
tag = name tag = name
path = os.path.join(self.path, name, version, tag) path = os.path.join(self.path, name, version, tag)
if path not in self.repositories: if path not in self.repositories:
if name not in self.component_repositoies: component_repositoy = self.get_component_repositoy(name)
self.component_repositoies[name] = ComponentRepository(name, os.path.join(self.path, name), self.stdio) repository = component_repositoy.get_repository(version, tag)
repository = self.component_repositoies[name].get_repository(version, tag)
if repository: if repository:
self.repositories[repository.repository_dir] = repository self.repositories[repository.repository_dir] = repository
self.repositories[path] = repository self.repositories[path] = repository
...@@ -404,10 +456,9 @@ class RepositoryManager(Manager): ...@@ -404,10 +456,9 @@ class RepositoryManager(Manager):
def get_repository(self, name, version=None, tag=None, instance=True): def get_repository(self, name, version=None, tag=None, instance=True):
if version: if version:
return self.get_repository_by_version(name, version, tag) return self.get_repository_by_version(name, version, tag)
if name not in self.component_repositoies:
path = os.path.join(self.path, name) component_repositoy = self.get_component_repositoy(name)
self.component_repositoies[name] = ComponentRepository(name, path, self.stdio) repository = component_repositoy.get_repository(version, tag)
repository = self.component_repositoies[name].get_repository(version, tag)
if repository: if repository:
self.repositories[repository.repository_dir] = repository self.repositories[repository.repository_dir] = repository
return self.get_instance_repository_from_shadow(repository) if repository and instance else repository return self.get_instance_repository_from_shadow(repository) if repository and instance else repository
......
...@@ -445,7 +445,7 @@ class IO(object): ...@@ -445,7 +445,7 @@ class IO(object):
self.error(msg) self.error(msg)
else: else:
msg and self.error(msg) msg and self.error(msg)
self._log(MsgLevel.VERBOSE, '\n'.join(exception_msg)) self._log(MsgLevel.ERROR, '\n'.join(exception_msg))
else: else:
def exception(self, msg, *args, **kwargs): def exception(self, msg, *args, **kwargs):
ei = sys.exc_info() ei = sys.exc_info()
...@@ -462,5 +462,5 @@ class IO(object): ...@@ -462,5 +462,5 @@ class IO(object):
self.error(msg) self.error(msg)
else: else:
msg and self.error(msg) msg and self.error(msg)
self._log(MsgLevel.VERBOSE, ''.join(lines)) self._log(MsgLevel.ERROR, ''.join(lines))
此差异已折叠。
## Only need to configure when remote login is required
# user:
# username: your username
# password: your password if need
# key_file: your ssh-key file path if need
# port: your ssh port, default 22
# timeout: ssh connection timeout (second), default 30
oceanbase-ce:
servers:
- name: z1
# Please don't use hostname, only IP can be supported
ip: 172.19.33.2
- name: z2
ip: 172.19.33.3
- name: z3
ip: 172.19.33.4
global:
# Please set devname as the network adaptor's name whose ip is in the setting of severs.
# if set severs as "127.0.0.1", please set devname as "lo"
# if current ip is 192.168.1.10, and the ip's network adaptor's name is "eth0", please use "eth0"
devname: eth0
# if current hardware's memory capacity is smaller than 50G, please use the setting of "mini-single-example.yaml" and do a small adjustment.
memory_limit: 64G
datafile_disk_percentage: 20
syslog_level: INFO
enable_syslog_wf: false
enable_syslog_recycle: true
max_syslog_file_count: 4
cluster_id: 1
# root_password: # root user password
# In this example , support multiple ob process in single node, so different process use different ports.
# If deploy ob cluster in multiple nodes, the port and path setting can be same.
z1:
mysql_port: 2881
rpc_port: 2882
home_path: /root/observer
zone: zone1
z2:
mysql_port: 2881
rpc_port: 2882
home_path: /root/observer
zone: zone2
z3:
mysql_port: 2881
rpc_port: 2882
home_path: /root/observer
zone: zone3
## Only need to configure when remote login is required
# user:
# username: your username
# password: your password if need
# key_file: your ssh-key file path if need
# port: your ssh port, default 22
# timeout: ssh connection timeout (second), default 30
oceanbase-ce:
servers:
- name: z1
# Please don't use hostname, only IP can be supported
ip: 192.168.1.2
- name: z2
ip: 192.168.1.3
- name: z3
ip: 192.168.1.4
global:
# The working directory for OceanBase Database. OceanBase Database is started under this directory. This is a required field.
home_path: /root/observer
# The directory for data storage. The default value is $home_path/store.
# data_dir: /data
# The directory for clog, ilog, and slog. The default value is the same as the data_dir value.
# redo_dir: /redo
# External port for OceanBase Database. The default value is 2881.
# mysql_port: 2881
# Internal port for OceanBase Database. The default value is 2882.
# rpc_port: 2882
# Defines the zone for an observer. The default value is zone1.
# zone: zone1
# The maximum running memory for an observer. When ignored, autodeploy calculates this value based on the current server available resource.
# memory_limit: 58G
# The percentage of the maximum available memory to the total memory. This value takes effect only when memory_limit is 0. The default value is 80.
# memory_limit_percentage: 80
# The reserved system memory. system_memory is reserved for general tenants. The default value is 30G. Autodeploy calculates this value based on the current server available resource.
# system_memory: 22G
# The size of a data file. When ignored, autodeploy calculates this value based on the current server available resource.
# datafile_size: 200G
# The percentage of the data_dir space to the total disk space. This value takes effect only when datafile_size is 0. The default value is 90.
# datafile_disk_percentage: 90
# System log level. The default value is INFO.
# syslog_level: INFO
# Print system logs whose levels are higher than WARNING to a separate log file. The default value is true. The default value for autodeploy mode is false.
# enable_syslog_wf: false
# Enable auto system log recycling or not. The default value is false. The default value for autodeploy mode is on.
# enable_syslog_recycle: true
# The maximum number of reserved log files before enabling auto recycling. When set to 0, no logs are deleted. The default value for autodeploy mode is 4.
# max_syslog_file_count: 4
# Cluster name for OceanBase Database. The default value is obcluster. When you deploy OceanBase Database and obproxy, this value must be the same as the cluster_name for obproxy.
# appname: obcluster
# Password for root. The default value is empty.
# root_password:
# Password for proxyro. proxyro_password must be the same as observer_sys_password. The default value is empty.
# proxyro_password:
z1:
zone: zone1
z2:
zone: zone2
z3:
zone: zone3
obproxy:
servers:
- 192.168.1.5
global:
# The working directory for obproxy. Obproxy is started under this directory. This is a required field.
home_path: /root/obproxy
# External port. The default value is 2883.
# listen_port: 2883
# The Prometheus port. The default value is 2884.
# prometheus_listen_port: 2884
# rs_list is the root server list for observers. The default root server is the first server in the zone.
# The format for rs_list is observer_ip:observer_mysql_port;observer_ip:observer_mysql_port.
# Ignore this value in autodeploy mode.
# rs_list: 127.0.0.1:2881
# Cluster name for the proxy OceanBase Database. The default value is obcluster. This value must be set to the same with the appname for OceanBase Database.
# cluster_name: obcluster
# Password for obproxy system tenant. The default value is empty.
# obproxy_sys_password:
# Password for proxyro. proxyro_password must be the same with proxyro_password. The default value is empty.
# observer_sys_password:
\ No newline at end of file
## Only need to configure when remote login is required
# user:
# username: your username
# password: your password if need
# key_file: your ssh-key file path if need
# port: your ssh port, default 22
# timeout: ssh connection timeout (second), default 30
oceanbase-ce:
servers:
# Please don't use hostname, only IP can be supported
- 192.168.1.3
global:
# The working directory for OceanBase Database. OceanBase Database is started under this directory. This is a required field.
home_path: /root/observer
# The directory for data storage. The default value is $home_path/store.
# data_dir: /data
# The directory for clog, ilog, and slog. The default value is the same as the data_dir value.
# redo_dir: /redo
# External port for OceanBase Database. The default value is 2881.
# mysql_port: 2881
# Internal port for OceanBase Database. The default value is 2882.
# rpc_port: 2882
# Defines the zone for an observer. The default value is zone1.
# zone: zone1
# The maximum running memory for an observer. When ignored, autodeploy calculates this value based on the current server available resource.
# memory_limit: 58G
# The percentage of the maximum available memory to the total memory. This value takes effect only when memory_limit is 0. The default value is 80.
# memory_limit_percentage: 80
# The reserved system memory. system_memory is reserved for general tenants. The default value is 30G. Autodeploy calculates this value based on the current server available resource.
# system_memory: 22G
# The size of a data file. When ignored, autodeploy calculates this value based on the current server available resource.
# datafile_size: 200G
# The percentage of the data_dir space to the total disk space. This value takes effect only when datafile_size is 0. The default value is 90.
# datafile_disk_percentage: 90
# System log level. The default value is INFO.
# syslog_level: INFO
# Print system logs whose levels are higher than WARNING to a separate log file. The default value is true. The default value for autodeploy mode is false.
# enable_syslog_wf: false
# Enable auto system log recycling or not. The default value is false. The default value for autodeploy mode is on.
# enable_syslog_recycle: true
# The maximum number of reserved log files before enabling auto recycling. When set to 0, no logs are deleted. The default value for autodeploy mode is 4.
# max_syslog_file_count: 4
# Cluster name for OceanBase Database. The default value is obcluster. When you deploy OceanBase Database and obproxy, this value must be the same as the cluster_name for obproxy.
# appname: obcluster
# Password for root. The default value is empty.
# root_password:
# Password for proxyro. proxyro_password must be the same as observer_sys_password. The default value is empty.
# proxyro_password:
\ No newline at end of file
## Only need to configure when remote login is required
# user:
# username: your username
# password: your password if need
# key_file: your ssh-key file path if need
# port: your ssh port, default 22
# timeout: ssh connection timeout (second), default 30
oceanbase-ce:
servers:
# Please don't use hostname, only IP can be supported
- 192.168.1.3
global:
# The working directory for OceanBase Database. OceanBase Database is started under this directory. This is a required field.
home_path: /root/observer
# The directory for data storage. The default value is $home_path/store.
# data_dir: /data
# The directory for clog, ilog, and slog. The default value is the same as the data_dir value.
# redo_dir: /redo
# External port for OceanBase Database. The default value is 2881.
# mysql_port: 2881
# Internal port for OceanBase Database. The default value is 2882.
# rpc_port: 2882
# Defines the zone for an observer. The default value is zone1.
# zone: zone1
# The maximum running memory for an observer. When ignored, autodeploy calculates this value based on the current server available resource.
# memory_limit: 58G
# The percentage of the maximum available memory to the total memory. This value takes effect only when memory_limit is 0. The default value is 80.
# memory_limit_percentage: 80
# The reserved system memory. system_memory is reserved for general tenants. The default value is 30G. Autodeploy calculates this value based on the current server available resource.
# system_memory: 22G
# The size of a data file. When ignored, autodeploy calculates this value based on the current server available resource.
# datafile_size: 200G
# The percentage of the data_dir space to the total disk space. This value takes effect only when datafile_size is 0. The default value is 90.
# datafile_disk_percentage: 90
# System log level. The default value is INFO.
# syslog_level: INFO
# Print system logs whose levels are higher than WARNING to a separate log file. The default value is true. The default value for autodeploy mode is false.
# enable_syslog_wf: false
# Enable auto system log recycling or not. The default value is false. The default value for autodeploy mode is on.
# enable_syslog_recycle: true
# The maximum number of reserved log files before enabling auto recycling. When set to 0, no logs are deleted. The default value for autodeploy mode is 4.
# max_syslog_file_count: 4
# Cluster name for OceanBase Database. The default value is obcluster. When you deploy OceanBase Database and obproxy, this value must be the same as the cluster_name for obproxy.
# appname: obcluster
# Password for root. The default value is empty.
# root_password:
# Password for proxyro. proxyro_password must be the same as observer_sys_password. The default value is empty.
# proxyro_password:
obproxy:
servers:
- 192.168.1.2
global:
# The working directory for obproxy. Obproxy is started under this directory. This is a required field.
home_path: /root/obproxy
# External port. The default value is 2883.
# listen_port: 2883
# The Prometheus port. The default value is 2884.
# prometheus_listen_port: 2884
# rs_list is the root server list for observers. The default root server is the first server in the zone.
# The format for rs_list is observer_ip:observer_mysql_port;observer_ip:observer_mysql_port.
# Ignore this value in autodeploy mode.
# rs_list: 127.0.0.1:2881
# Cluster name for the proxy OceanBase Database. The default value is obcluster. This value must be set to the same with the appname for OceanBase Database.
# cluster_name: obcluster
# Password for obproxy system tenant. The default value is empty.
# obproxy_sys_password:
# Password for proxyro. proxyro_password must be the same with proxyro_password. The default value is empty.
# observer_sys_password:
\ No newline at end of file
...@@ -32,7 +32,6 @@ spm=['spm.spm_expr','spm.spm_with_acs_new_plan_not_ok'] ...@@ -32,7 +32,6 @@ spm=['spm.spm_expr','spm.spm_with_acs_new_plan_not_ok']
merge_into=['merge_into.merge_insert','merge_into.merge_into_normal', 'merge_into.merge_subquery', 'merge_into.merge_update'] merge_into=['merge_into.merge_insert','merge_into.merge_into_normal', 'merge_into.merge_subquery', 'merge_into.merge_update']
# TODO bin.lb:
# Temporary failure, remove this after updatable view commit. # Temporary failure, remove this after updatable view commit.
updatable_view = ['view.is_views', 'create_frommysql' ] updatable_view = ['view.is_views', 'create_frommysql' ]
......
--disable_query_log --disable_query_log
set @@session.explicit_defaults_for_timestamp=off; set @@session.explicit_defaults_for_timestamp=off;
--enable_query_log --enable_query_log
# owner: jim.wjh
# owner group: SQL3
# description: foobar
--echo case 1: commit --echo case 1: commit
connect (conn1,$OBMYSQL_MS0,$OBMYSQL_USR,$OBMYSQL_PWD,test,$OBMYSQL_PORT); connect (conn1,$OBMYSQL_MS0,$OBMYSQL_USR,$OBMYSQL_PWD,test,$OBMYSQL_PORT);
connection conn1; connection conn1;
......
...@@ -27,13 +27,14 @@ def bootstrap(plugin_context, cursor, *args, **kwargs): ...@@ -27,13 +27,14 @@ def bootstrap(plugin_context, cursor, *args, **kwargs):
for server in cluster_config.servers: for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server) server_config = cluster_config.get_server_conf(server)
for key in ['observer_sys_password', 'obproxy_sys_password']: for key in ['observer_sys_password', 'obproxy_sys_password']:
if key in server_config and server_config[key]: if server_config.get(key):
try:
sql = 'alter proxyconfig set %s = %%s' % key sql = 'alter proxyconfig set %s = %%s' % key
value = None
try:
value = str(server_config[key]) value = str(server_config[key])
stdio.verbose('execute sql: %s' % (sql % value)) stdio.verbose('execute sql: %s' % (sql % value))
cursor[server].execute(sql, [value]) cursor[server].execute(sql, [value])
except: except:
stdio.exception('execute sql exception') stdio.exception('execute sql exception: %s' % (sql % (value)))
stdio.warm('failed to set %s for obproxy(%s)' % (key, server)) stdio.warm('failed to set %s for obproxy(%s)' % (key, server))
plugin_context.return_true() plugin_context.return_true()
...@@ -20,17 +20,27 @@ ...@@ -20,17 +20,27 @@
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
def init(plugin_context, *args, **kwargs): def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
cluster_config = plugin_context.cluster_config cluster_config = plugin_context.cluster_config
clients = plugin_context.clients clients = plugin_context.clients
stdio = plugin_context.stdio stdio = plugin_context.stdio
global_ret = True global_ret = True
stdio.start_loading('Initializes cluster work home')
for server in cluster_config.servers: for server in cluster_config.servers:
server_config = cluster_config.get_server_conf(server) server_config = cluster_config.get_server_conf(server)
client = clients[server] client = clients[server]
home_path = server_config['home_path'] home_path = server_config['home_path']
stdio.print('%s init cluster work home', server) remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
if not client.execute_command('mkdir -p %s/run' % (home_path)): remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
stdio.verbose('%s init cluster work home', server)
if not (client.execute_command("bash -c 'mkdir -p %s/{run,bin,lib}'" % (home_path)) \
and client.execute_command("if [ -d %s/bin ]; then ln -s %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -s %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))):
global_ret = False global_ret = False
stdio.print('fail to init %s home path', server) stdio.verbose('fail to init %s home path', server)
global_ret and plugin_context.return_true()
if global_ret:
stdio.stop_loading('succeed')
plugin_context.return_true()
else:
stdio.stop_loading('fail')
\ No newline at end of file
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
max_value: 65535 max_value: 65535
need_restart: true need_restart: true
description_en: obproxy prometheus listen port description_en: obproxy prometheus listen port
description_local: SQL服务协议端口号 description_local: 提供prometheus服务端口号
- name: appname - name: appname
require: false require: false
type: STRING type: STRING
......
...@@ -49,6 +49,7 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs): ...@@ -49,6 +49,7 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs):
stdio.verbose('apply new configuration') stdio.verbose('apply new configuration')
success_conf = {} success_conf = {}
sql = '' sql = ''
value = None
for key in global_change_conf: for key in global_change_conf:
success_conf[key] = [] success_conf[key] = []
for server in servers: for server in servers:
...@@ -56,13 +57,13 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs): ...@@ -56,13 +57,13 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs):
continue continue
try: try:
sql = 'alter proxyconfig set %s = %%s' % key sql = 'alter proxyconfig set %s = %%s' % key
value = change_conf[server][key] value = change_conf[server][key] if change_conf[server].get(key) is not None else ''
stdio.verbose('execute sql: %s' % (sql % value)) stdio.verbose('execute sql: %s' % (sql % value))
cursor[server].execute(sql, [value]) cursor[server].execute(sql, [value])
success_conf[key].append(server) success_conf[key].append(server)
except: except:
global_ret = False global_ret = False
stdio.exception('execute sql exception: %s' % sql) stdio.exception('execute sql exception: %s' % (sql % value))
for key in success_conf: for key in success_conf:
if global_change_conf[key] == servers_num == len(success_conf): if global_change_conf[key] == servers_num == len(success_conf):
cluster_config.update_global_conf(key, value, False) cluster_config.update_global_conf(key, value, False)
......
...@@ -49,14 +49,13 @@ def confirm_port(client, pid, port): ...@@ -49,14 +49,13 @@ def confirm_port(client, pid, port):
def confirm_command(client, pid, command): def confirm_command(client, pid, command):
command = command.replace(' ', '').strip() command = command.replace(' ', '').strip()
if client.execute_command('cmd=`cat /proc/%s/cmdline`; if [ "$cmd" != "%s" ]; then exit 1; fi' % (pid, command)): if client.execute_command('bash -c \'cmd=`cat /proc/%s/cmdline`; if [ "$cmd" != "%s" ]; then exot 1; fi\'' % (pid, command)):
return True return True
return False return False
def confirm_home_path(client, pid, home_path): def confirm_home_path(client, pid, home_path):
if client.execute_command('path=`ls -l /proc/%s | grep cwd | awk -F\'-> \' \'{print $2}\'`; if [ "$path" != "%s" ]; then exit 1; fi' % if client.execute_command('path=`ls -l /proc/%s | grep cwd | awk -F\'-> \' \'{print $2}\'`; bash -c \'if [ "$path" != "%s" ]; then exit 1; fi\'' % (pid, home_path)):
(pid, home_path)):
return True return True
return False return False
...@@ -77,6 +76,7 @@ def is_started(client, remote_bin_path, port, home_path, command): ...@@ -77,6 +76,7 @@ def is_started(client, remote_bin_path, port, home_path, command):
return False return False
return confirm_home_path(client, pid, home_path) and confirm_command(client, pid, command) return confirm_home_path(client, pid, home_path) and confirm_command(client, pid, command)
def start(plugin_context, local_home_path, repository_dir, *args, **kwargs): def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
global stdio global stdio
cluster_config = plugin_context.cluster_config cluster_config = plugin_context.cluster_config
...@@ -85,9 +85,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs): ...@@ -85,9 +85,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
clusters_cmd = {} clusters_cmd = {}
real_cmd = {} real_cmd = {}
pid_path = {} pid_path = {}
remote_bin_path = {}
need_bootstrap = True need_bootstrap = True
bin_path = os.path.join(repository_dir, 'bin/obproxy')
error = False error = False
for server in cluster_config.servers: for server in cluster_config.servers:
...@@ -99,17 +97,20 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs): ...@@ -99,17 +97,20 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if error: if error:
return plugin_context.return_false() return plugin_context.return_false()
servers_remote_home_path = {}
stdio.start_loading('Start obproxy') stdio.start_loading('Start obproxy')
for server in cluster_config.servers: for server in cluster_config.servers:
client = clients[server] client = clients[server]
remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
servers_remote_home_path[server] = remote_home_path
remote_bin_path[server] = bin_path.replace(local_home_path, remote_home_path)
server_config = cluster_config.get_server_conf(server) server_config = cluster_config.get_server_conf(server)
pid_path[server] = "%s/run/obproxy-%s-%s.pid" % (server_config['home_path'], server.ip, server_config["listen_port"]) home_path = server_config['home_path']
if client.execute_command("bash -c 'if [ -f %s/bin/obproxy ]; then exit 1; else exit 0; fi;'" % home_path):
remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
client.execute_command("bash -c 'mkdir -p %s/{bin,lib}'" % (home_path))
client.execute_command("ln -s %s/bin/* %s/bin" % (remote_repository_dir, home_path))
client.execute_command("ln -s %s/lib/* %s/lib" % (remote_repository_dir, home_path))
pid_path[server] = "%s/run/obproxy-%s-%s.pid" % (home_path, server.ip, server_config["listen_port"])
not_opt_str = [ not_opt_str = [
'listen_port', 'listen_port',
'prometheus_listen_port', 'prometheus_listen_port',
...@@ -128,8 +129,8 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs): ...@@ -128,8 +129,8 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if key in server_config: if key in server_config:
value = get_value(key) value = get_value(key)
cmd.append('--%s %s' % (key, value)) cmd.append('--%s %s' % (key, value))
real_cmd[server] = '%s %s' % (remote_bin_path[server], ' '.join(cmd)) real_cmd[server] = '%s/bin/obproxy %s' % (home_path, ' '.join(cmd))
clusters_cmd[server] = 'cd %s; %s' % (server_config['home_path'], real_cmd[server]) clusters_cmd[server] = 'cd %s; %s' % (home_path, real_cmd[server])
for server in clusters_cmd: for server in clusters_cmd:
client = clients[server] client = clients[server]
...@@ -149,8 +150,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs): ...@@ -149,8 +150,7 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
return plugin_context.return_false() return plugin_context.return_false()
stdio.verbose('starting %s obproxy', server) stdio.verbose('starting %s obproxy', server)
remote_repository_path = repository_dir.replace(local_home_path, remote_home_path) client.add_env('LD_LIBRARY_PATH', '%s/lib:' % server_config['home_path'], True)
client.add_env('LD_LIBRARY_PATH', '%s/lib:' % remote_repository_path, True)
ret = client.execute_command(clusters_cmd[server]) ret = client.execute_command(clusters_cmd[server])
client.add_env('LD_LIBRARY_PATH', '', True) client.add_env('LD_LIBRARY_PATH', '', True)
if not ret: if not ret:
......
...@@ -72,8 +72,8 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs): ...@@ -72,8 +72,8 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs):
stdio.verbose('%s port check' % server) stdio.verbose('%s port check' % server)
for key in ['listen_port', 'prometheus_listen_port']: for key in ['listen_port', 'prometheus_listen_port']:
port = int(server_config[key]) port = int(server_config[key])
if port in ports:
alert_f = alert if key == 'prometheus_listen_port' else critical alert_f = alert if key == 'prometheus_listen_port' else critical
if port in ports:
alert_f('Configuration conflict %s: %s port is used for %s\'s %s' % (server, port, ports[port]['server'], ports[port]['key'])) alert_f('Configuration conflict %s: %s port is used for %s\'s %s' % (server, port, ports[port]['server'], ports[port]['key']))
continue continue
ports[port] = { ports[port] = {
...@@ -81,7 +81,7 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs): ...@@ -81,7 +81,7 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs):
'key': key 'key': key
} }
if get_port_socket_inode(client, port): if get_port_socket_inode(client, port):
critical('%s:%s port is already used' % (ip, port)) alert_f('%s:%s port is already used' % (ip, port))
if success: if success:
stdio.stop_loading('succeed') stdio.stop_loading('succeed')
......
...@@ -37,6 +37,9 @@ def bootstrap(plugin_context, cursor, *args, **kwargs): ...@@ -37,6 +37,9 @@ def bootstrap(plugin_context, cursor, *args, **kwargs):
floor_servers[zone] = [] floor_servers[zone] = []
bootstrap.append('REGION "sys_region" ZONE "%s" SERVER "%s:%s"' % (server_config['zone'], server.ip, server_config['rpc_port'])) bootstrap.append('REGION "sys_region" ZONE "%s" SERVER "%s:%s"' % (server_config['zone'], server.ip, server_config['rpc_port']))
try: try:
sql = 'set session ob_query_timeout=1000000000'
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
sql = 'alter system bootstrap %s' % (','.join(bootstrap)) sql = 'alter system bootstrap %s' % (','.join(bootstrap))
stdio.start_loading('Cluster bootstrap') stdio.start_loading('Cluster bootstrap')
stdio.verbose('execute sql: %s' % sql) stdio.verbose('execute sql: %s' % sql)
...@@ -48,7 +51,7 @@ def bootstrap(plugin_context, cursor, *args, **kwargs): ...@@ -48,7 +51,7 @@ def bootstrap(plugin_context, cursor, *args, **kwargs):
cursor.execute(sql) cursor.execute(sql)
global_conf = cluster_config.get_global_conf() global_conf = cluster_config.get_global_conf()
if 'proxyro_password' in global_conf or 'obproxy' in plugin_context.components: if 'proxyro_password' in global_conf or 'obproxy' in plugin_context.components:
value = global_conf['proxyro_password'] if 'proxyro_password' in global_conf else '' value = global_conf['proxyro_password'] if global_conf.get('proxyro_password') is not None else ''
sql = 'create user "proxyro" IDENTIFIED BY "%s"' % value sql = 'create user "proxyro" IDENTIFIED BY "%s"' % value
stdio.verbose(sql) stdio.verbose(sql)
cursor.execute(sql) cursor.execute(sql)
......
...@@ -40,8 +40,9 @@ def destroy(plugin_context, *args, **kwargs): ...@@ -40,8 +40,9 @@ def destroy(plugin_context, *args, **kwargs):
server_config = cluster_config.get_server_conf(server) server_config = cluster_config.get_server_conf(server)
stdio.verbose('%s work path cleaning', server) stdio.verbose('%s work path cleaning', server)
clean(server, server_config['home_path']) clean(server, server_config['home_path'])
if 'data_dir' in server_config: for key in ['data_dir', 'redo_dir', 'clog_dir', 'ilog_dir', 'slog_dir']:
clean(server, server_config['data_dir']) if server_config.get(key):
clean(server, server_config[key])
if global_ret: if global_ret:
stdio.stop_loading('succeed') stdio.stop_loading('succeed')
plugin_context.return_true() plugin_context.return_true()
......
...@@ -20,9 +20,7 @@ ...@@ -20,9 +20,7 @@
from __future__ import absolute_import, division, print_function from __future__ import absolute_import, division, print_function
import sys
import time import time
from prettytable import PrettyTable
def display(plugin_context, cursor, *args, **kwargs): def display(plugin_context, cursor, *args, **kwargs):
......
...@@ -57,7 +57,7 @@ def init_dir(server, client, key, path, link_path=None): ...@@ -57,7 +57,7 @@ def init_dir(server, client, key, path, link_path=None):
return False return False
def init(plugin_context, *args, **kwargs): def init(plugin_context, local_home_path, repository_dir, *args, **kwargs):
global stdio, force global stdio, force
cluster_config = plugin_context.cluster_config cluster_config = plugin_context.cluster_config
clients = plugin_context.clients clients = plugin_context.clients
...@@ -65,6 +65,7 @@ def init(plugin_context, *args, **kwargs): ...@@ -65,6 +65,7 @@ def init(plugin_context, *args, **kwargs):
servers_dirs = {} servers_dirs = {}
force = getattr(plugin_context.options, 'force', False) force = getattr(plugin_context.options, 'force', False)
stdio.verbose('option `force` is %s' % force) stdio.verbose('option `force` is %s' % force)
stdio.start_loading('Initializes cluster work home')
for server in cluster_config.servers: for server in cluster_config.servers:
ip = server.ip ip = server.ip
if ip not in servers_dirs: if ip not in servers_dirs:
...@@ -73,15 +74,24 @@ def init(plugin_context, *args, **kwargs): ...@@ -73,15 +74,24 @@ def init(plugin_context, *args, **kwargs):
server_config = cluster_config.get_server_conf(server) server_config = cluster_config.get_server_conf(server)
client = clients[server] client = clients[server]
home_path = server_config['home_path'] home_path = server_config['home_path']
if 'data_dir' not in server_config: remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
if not server_config.get('data_dir'):
server_config['data_dir'] = '%s/store' % home_path server_config['data_dir'] = '%s/store' % home_path
if 'clog_dir' not in server_config: if not server_config.get('redo_dir'):
server_config['clog_dir'] = '%s/clog' % server_config['data_dir'] server_config['redo_dir'] = server_config['data_dir']
if 'ilog_dir' not in server_config: if not server_config.get('clog_dir'):
server_config['ilog_dir'] = '%s/ilog' % server_config['data_dir'] server_config['clog_dir'] = '%s/clog' % server_config['redo_dir']
if 'slog_dir' not in server_config: if not server_config.get('ilog_dir'):
server_config['slog_dir'] = '%s/slog' % server_config['data_dir'] server_config['ilog_dir'] = '%s/ilog' % server_config['redo_dir']
for key in ['home_path', 'data_dir', 'clog_dir', 'ilog_dir', 'slog_dir']: if not server_config.get('slog_dir'):
server_config['slog_dir'] = '%s/slog' % server_config['redo_dir']
if server_config['redo_dir'] == server_config['data_dir']:
keys = ['home_path', 'data_dir', 'clog_dir', 'ilog_dir', 'slog_dir']
else:
keys = ['home_path', 'data_dir', 'redo_dir', 'clog_dir', 'ilog_dir', 'slog_dir']
for key in keys:
path = server_config[key] path = server_config[key]
if path in dirs: if path in dirs:
critical('Configuration conflict %s: %s is used for %s\'s %s' % (server, path, dirs[path]['server'], dirs[path]['key'])) critical('Configuration conflict %s: %s is used for %s\'s %s' % (server, path, dirs[path]['server'], dirs[path]['key']))
...@@ -91,7 +101,7 @@ def init(plugin_context, *args, **kwargs): ...@@ -91,7 +101,7 @@ def init(plugin_context, *args, **kwargs):
'key': key, 'key': key,
} }
stdio.print('%s initializes cluster work home' % server) stdio.verbose('%s initializes cluster work home' % server)
if force: if force:
ret = client.execute_command('rm -fr %s/*' % home_path) ret = client.execute_command('rm -fr %s/*' % home_path)
if not ret: if not ret:
...@@ -105,7 +115,9 @@ def init(plugin_context, *args, **kwargs): ...@@ -105,7 +115,9 @@ def init(plugin_context, *args, **kwargs):
continue continue
else: else:
critical('fail to init %s home path: create %s failed' % (server, home_path)) critical('fail to init %s home path: create %s failed' % (server, home_path))
ret = client.execute_command('bash -c "mkdir -p %s/{etc,admin,.conf,log}"' % home_path) ret = client.execute_command('bash -c "mkdir -p %s/{etc,admin,.conf,log,bin,lib}"' % home_path) \
and client.execute_command("if [ -d %s/bin ]; then ln -s %s/bin/* %s/bin; fi" % (remote_repository_dir, remote_repository_dir, home_path)) \
and client.execute_command("if [ -d %s/lib ]; then ln -s %s/lib/* %s/lib; fi" % (remote_repository_dir, remote_repository_dir, home_path))
if ret: if ret:
data_path = server_config['data_dir'] data_path = server_config['data_dir']
if force: if force:
...@@ -121,7 +133,7 @@ def init(plugin_context, *args, **kwargs): ...@@ -121,7 +133,7 @@ def init(plugin_context, *args, **kwargs):
continue continue
else: else:
critical('fail to init %s data path: create %s failed' % (server, data_path)) critical('fail to init %s data path: create %s failed' % (server, data_path))
ret = client.execute_command('mkdir -p %s/sstable' % data_path) ret = client.execute_command('bash -c "mkdir -p %s/sstable"' % data_path)
if ret: if ret:
link_path = '%s/store' % home_path link_path = '%s/store' % home_path
client.execute_command("if [ ! '%s' -ef '%s' ]; then ln -sf %s %s; fi" % (data_path, link_path, data_path, link_path)) client.execute_command("if [ ! '%s' -ef '%s' ]; then ln -sf %s %s; fi" % (data_path, link_path, data_path, link_path))
...@@ -151,4 +163,9 @@ def init(plugin_context, *args, **kwargs): ...@@ -151,4 +163,9 @@ def init(plugin_context, *args, **kwargs):
critical('failed to initialize %s date path' % (server)) critical('failed to initialize %s date path' % (server))
else: else:
critical('fail to init %s home path: %s permission denied' % (server, ret.stderr)) critical('fail to init %s home path: %s permission denied' % (server, ret.stderr))
global_ret and plugin_context.return_true()
if global_ret:
stdio.stop_loading('succeed')
plugin_context.return_true()
else:
stdio.stop_loading('fail')
\ No newline at end of file
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
- name: cluster_id - name: cluster_id
require: true require: true
type: INT type: INT
default: 1
min_value: 1 min_value: 1
max_value: 4294901759 max_value: 4294901759
need_restart: true need_restart: true
...@@ -21,13 +22,20 @@ ...@@ -21,13 +22,20 @@
need_redeploy: true need_redeploy: true
description_en: the directory for the data file description_en: the directory for the data file
description_local: 存储sstable等数据的目录 description_local: 存储sstable等数据的目录
- name: redo_dir
type: STRING
min_value: NULL
max_value: NULL
need_redeploy: true
description_en: the directory for the redo file
description_local: 存储clog, iclog, slog数据的目录
- name: clog_dir - name: clog_dir
type: STRING type: STRING
min_value: NULL min_value: NULL
max_value: NULL max_value: NULL
need_redeploy: true need_redeploy: true
description_en: the directory for the clog file description_en: the directory for the clog file
description_local: 存储clog数据的目录 description_local: 存储clog数据的目录, clog 应该与 ilog 同盘
- name: slog_dir - name: slog_dir
type: STRING type: STRING
min_value: NULL min_value: NULL
...@@ -52,7 +60,7 @@ ...@@ -52,7 +60,7 @@
- name: rpc_port - name: rpc_port
require: true require: true
type: INT type: INT
default: 2500 default: 2882
min_value: 1025 min_value: 1025
max_value: 65535 max_value: 65535
need_restart: true need_restart: true
...@@ -61,7 +69,7 @@ ...@@ -61,7 +69,7 @@
- name: mysql_port - name: mysql_port
require: true require: true
type: INT type: INT
default: 2880 default: 2881
min_value: 1025 min_value: 1025
max_value: 65535 max_value: 65535
need_restart: true need_restart: true
...@@ -1641,7 +1649,7 @@ ...@@ -1641,7 +1649,7 @@
min_value: 0 min_value: 0
max_value: NULL max_value: NULL
section: OBSERVER section: OBSERVER
need_restart: false need_restart: true
description_en: the number of CPUs in the system. If this parameter is set to zero, the number will be set according to sysconf; otherwise, this parameter is used. description_en: the number of CPUs in the system. If this parameter is set to zero, the number will be set according to sysconf; otherwise, this parameter is used.
description_local: 系统CPU总数,如果设置为0,将自动检测 description_local: 系统CPU总数,如果设置为0,将自动检测
- name: auto_delete_expired_backup - name: auto_delete_expired_backup
......
...@@ -39,32 +39,37 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs): ...@@ -39,32 +39,37 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs):
cluster_server[server] = '%s:%s' % (server.ip, config['rpc_port']) cluster_server[server] = '%s:%s' % (server.ip, config['rpc_port'])
stdio.verbose('compare configuration of %s' % (server)) stdio.verbose('compare configuration of %s' % (server))
for key in new_config: for key in new_config:
if key not in config or config[key] != new_config[key]: n_value = new_config[key]
change_conf[server][key] = new_config[key] if key not in config or config[key] != n_value:
change_conf[server][key] = n_value
if key not in global_change_conf: if key not in global_change_conf:
global_change_conf[key] = 1 global_change_conf[key] = {'value': n_value, 'count': 1}
else: elif n_value == global_change_conf[key]['value']:
global_change_conf[key] += 1 global_change_conf[key]['count'] += 1
servers_num = len(servers) servers_num = len(servers)
stdio.verbose('apply new configuration') stdio.verbose('apply new configuration')
for key in global_change_conf: for key in global_change_conf:
sql = '' msg = ''
try: try:
if key in ['proxyro_password', 'root_password']: if key in ['proxyro_password', 'root_password']:
if global_change_conf[key] != servers_num: if global_change_conf[key]['count'] != servers_num:
stdio.warn('Invalid: proxyro_password is not a single server configuration item') stdio.warn('Invalid: proxyro_password is not a single server configuration item')
continue continue
value = change_conf[server][key] value = change_conf[server][key] if change_conf[server].get(key) is not None else ''
user = key.split('_')[0] user = key.split('_')[0]
sql = 'alter user "%s" IDENTIFIED BY "%s"' % (user, value if value else '') msg = sql = 'CREATE USER IF NOT EXISTS %s IDENTIFIED BY "%s"' % (user, value)
stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql)
msg = sql = 'alter user "%s" IDENTIFIED BY "%s"' % (user, value)
stdio.verbose('execute sql: %s' % sql) stdio.verbose('execute sql: %s' % sql)
cursor.execute(sql) cursor.execute(sql)
continue continue
if global_change_conf[key] == servers_num: if global_change_conf[key]['count'] == servers_num:
sql = 'alter system set %s = %%s' % key sql = 'alter system set %s = %%s' % key
value = change_conf[server][key] value = change_conf[server][key]
stdio.verbose('execute sql: %s' % (sql % value)) msg = sql % value
stdio.verbose('execute sql: %s' % msg)
cursor.execute(sql, [value]) cursor.execute(sql, [value])
cluster_config.update_global_conf(key, value, False) cluster_config.update_global_conf(key, value, False)
continue continue
...@@ -72,13 +77,14 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs): ...@@ -72,13 +77,14 @@ def reload(plugin_context, cursor, new_cluster_config, *args, **kwargs):
if key not in change_conf[server]: if key not in change_conf[server]:
continue continue
sql = 'alter system set %s = %%s server=%%s' % key sql = 'alter system set %s = %%s server=%%s' % key
value = change_conf[server][key] value = (change_conf[server][key], cluster_server[server])
stdio.verbose('execute sql: %s' % (sql % (value, server))) msg = sql % value
cursor.execute(sql, [value, server]) stdio.verbose('execute sql: %s' % msg)
cursor.execute(sql, value)
cluster_config.update_server_conf(server,key, value, False) cluster_config.update_server_conf(server,key, value, False)
except: except:
global_ret = False global_ret = False
stdio.exception('execute sql exception: %s' % sql) stdio.exception('execute sql exception: %s' % msg)
cursor.execute('alter system reload server') cursor.execute('alter system reload server')
cursor.execute('alter system reload zone') cursor.execute('alter system reload zone')
......
...@@ -64,7 +64,6 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs): ...@@ -64,7 +64,6 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
stdio = plugin_context.stdio stdio = plugin_context.stdio
clusters_cmd = {} clusters_cmd = {}
need_bootstrap = True need_bootstrap = True
bin_path = os.path.join(repository_dir, 'bin/observer')
root_servers = {} root_servers = {}
global_config = cluster_config.get_global_conf() global_config = cluster_config.get_global_conf()
appname = global_config['appname'] if 'appname' in global_config else None appname = global_config['appname'] if 'appname' in global_config else None
...@@ -92,26 +91,22 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs): ...@@ -92,26 +91,22 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
root_servers[zone] = '%s:%s:%s' % (server.ip, config['rpc_port'], config['mysql_port']) root_servers[zone] = '%s:%s:%s' % (server.ip, config['rpc_port'], config['mysql_port'])
rs_list_opt = '-r \'%s\'' % ';'.join([root_servers[zone] for zone in root_servers]) rs_list_opt = '-r \'%s\'' % ';'.join([root_servers[zone] for zone in root_servers])
servers_remote_home_path = {}
for server in cluster_config.servers: for server in cluster_config.servers:
client = clients[server] client = clients[server]
remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
servers_remote_home_path[server] = remote_home_path
remote_bin_path = bin_path.replace(local_home_path, remote_home_path)
server_config = cluster_config.get_server_conf(server) server_config = cluster_config.get_server_conf(server)
home_path = server_config['home_path']
req_check = ['home_path', 'cluster_id'] if client.execute_command("bash -c 'if [ -f %s/bin/observer ]; then exit 1; else exit 0; fi;'" % home_path):
for key in req_check: remote_home_path = client.execute_command('echo $HOME/.obd').stdout.strip()
if key not in server_config: remote_repository_dir = repository_dir.replace(local_home_path, remote_home_path)
stdio.stop_loading('fail') client.execute_command("bash -c 'mkdir -p %s/{bin,lib}'" % (home_path))
stdio.print('%s %s is empty', server, key) client.execute_command("ln -s %s/bin/* %s/bin" % (remote_repository_dir, home_path))
return plugin_context.return_false() client.execute_command("ln -s %s/lib/* %s/lib" % (remote_repository_dir, home_path))
home_path = server_config['home_path'] if not server_config.get('data_dir'):
if 'data_dir' not in server_config:
server_config['data_dir'] = '%s/store' % home_path server_config['data_dir'] = '%s/store' % home_path
if client.execute_command('ls %s/clog' % server_config['data_dir']).stdout.strip(): if client.execute_command('ls %s/ilog/' % server_config['data_dir']).stdout.strip():
need_bootstrap = False need_bootstrap = False
remote_pid_path = '%s/run/observer.pid' % home_path remote_pid_path = '%s/run/observer.pid' % home_path
...@@ -151,13 +146,13 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs): ...@@ -151,13 +146,13 @@ def start(plugin_context, local_home_path, repository_dir, *args, **kwargs):
if key in server_config: if key in server_config:
value = get_value(key) value = get_value(key)
cmd.append('%s %s' % (not_opt_str[key], value)) cmd.append('%s %s' % (not_opt_str[key], value))
clusters_cmd[server] = 'cd %s; %s %s' % (home_path, remote_bin_path, ' '.join(cmd)) clusters_cmd[server] = 'cd %s; %s/bin/observer %s' % (home_path, home_path, ' '.join(cmd))
for server in clusters_cmd: for server in clusters_cmd:
client = clients[server] client = clients[server]
server_config = cluster_config.get_server_conf(server)
stdio.verbose('starting %s observer', server) stdio.verbose('starting %s observer', server)
remote_repository_path = repository_dir.replace(local_home_path, remote_home_path) client.add_env('LD_LIBRARY_PATH', '%s/lib:' % server_config['home_path'], True)
client.add_env('LD_LIBRARY_PATH', '%s/lib:' % remote_repository_path, True)
ret = client.execute_command(clusters_cmd[server]) ret = client.execute_command(clusters_cmd[server])
client.add_env('LD_LIBRARY_PATH', '', True) client.add_env('LD_LIBRARY_PATH', '', True)
if not ret: if not ret:
......
...@@ -22,6 +22,7 @@ from __future__ import absolute_import, division, print_function ...@@ -22,6 +22,7 @@ from __future__ import absolute_import, division, print_function
import os import os
import re import re
import time
stdio = None stdio = None
...@@ -58,7 +59,17 @@ def formate_size(size): ...@@ -58,7 +59,17 @@ def formate_size(size):
return '%.1f%s' % (size, units[idx]) return '%.1f%s' % (size, units[idx])
def start_check(plugin_context, strict_check=False, *args, **kwargs): def time_delta(client):
time_st = time.time() * 1000
time_srv = int(client.execute_command('date +%s%N').stdout) / 1000000
time_ed = time.time() * 1000
time_it = time_ed - time_st
time_srv -= time_it
return time_srv - time_st
def _start_check(plugin_context, strict_check=False, *args, **kwargs):
def alert(*arg, **kwargs): def alert(*arg, **kwargs):
global success global success
if strict_check: if strict_check:
...@@ -78,6 +89,8 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs): ...@@ -78,6 +89,8 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs):
servers_port = {} servers_port = {}
servers_memory = {} servers_memory = {}
servers_disk = {} servers_disk = {}
servers_clog_mount = {}
servers_net_inferface = {}
server_num = len(cluster_config.servers) server_num = len(cluster_config.servers)
stdio.start_loading('Check before start observer') stdio.start_loading('Check before start observer')
for server in cluster_config.servers: for server in cluster_config.servers:
...@@ -95,10 +108,14 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs): ...@@ -95,10 +108,14 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs):
if ip not in servers_port: if ip not in servers_port:
servers_disk[ip] = {} servers_disk[ip] = {}
servers_port[ip] = {} servers_port[ip] = {}
servers_clog_mount[ip] = {}
servers_net_inferface[ip] = {}
servers_memory[ip] = {'num': 0, 'percentage': 0} servers_memory[ip] = {'num': 0, 'percentage': 0}
memory = servers_memory[ip] memory = servers_memory[ip]
ports = servers_port[ip] ports = servers_port[ip]
disk = servers_disk[ip] disk = servers_disk[ip]
clog_mount = servers_clog_mount[ip]
inferfaces = servers_net_inferface[ip]
stdio.verbose('%s port check' % server) stdio.verbose('%s port check' % server)
for key in ['mysql_port', 'rpc_port']: for key in ['mysql_port', 'rpc_port']:
port = int(server_config[key]) port = int(server_config[key])
...@@ -112,25 +129,50 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs): ...@@ -112,25 +129,50 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs):
if get_port_socket_inode(client, port): if get_port_socket_inode(client, port):
critical('%s:%s port is already used' % (ip, port)) critical('%s:%s port is already used' % (ip, port))
if 'memory_limit' in server_config: if 'memory_limit' in server_config:
try:
memory['num'] += parse_size(server_config['memory_limit']) memory['num'] += parse_size(server_config['memory_limit'])
except:
critical('memory_limit must be an integer')
return
elif 'memory_limit_percentage' in server_config: elif 'memory_limit_percentage' in server_config:
try:
memory['percentage'] += int(parse_size(server_config['memory_limit_percentage'])) memory['percentage'] += int(parse_size(server_config['memory_limit_percentage']))
except:
critical('memory_limit_percentage must be an integer')
return
else: else:
memory['percentage'] += 80 memory['percentage'] += 80
data_path = server_config['data_dir'] if 'data_dir' in server_config else os.path.join(server_config['home_path'], 'store') data_path = server_config['data_dir'] if server_config.get('data_dir') else os.path.join(server_config['home_path'], 'store')
redo_dir = server_config['redo_dir'] if server_config.get('redo_dir') else data_path
clog_dir = server_config['clog_dir'] if server_config.get('clog_dir') else os.path.join(redo_dir, 'clog')
if not client.execute_command('ls %s/sstable/block_file' % data_path): if not client.execute_command('ls %s/sstable/block_file' % data_path):
if data_path in disk: if data_path in disk:
critical('Same Path: %s in %s and %s' % (data_path, server, disk[data_path]['server'])) critical('Same Path: %s in %s and %s' % (data_path, server, disk[data_path]['server']))
continue continue
if clog_dir in clog_mount:
critical('Same Path: %s in %s and %s' % (clog_dir, server, clog_mount[clog_dir]['server']))
continue
disk[data_path] = { disk[data_path] = {
'need': 90, 'need': 90,
'server': server 'server': server
} }
clog_mount[clog_dir] = {
'threshold': server_config.get('clog_disk_utilization_threshold', 80) / 100.0,
'server': server
}
if 'datafile_size' in server_config and server_config['datafile_size']: if 'datafile_size' in server_config and server_config['datafile_size']:
disk[data_path]['need'] = server_config['datafile_size'] disk[data_path]['need'] = server_config['datafile_size']
elif 'datafile_disk_percentage' in server_config and server_config['datafile_disk_percentage']: elif 'datafile_disk_percentage' in server_config and server_config['datafile_disk_percentage']:
disk[data_path]['need'] = int(server_config['datafile_disk_percentage']) disk[data_path]['need'] = int(server_config['datafile_disk_percentage'])
devname = server_config.get('devname')
if devname:
if not client.execute_command("grep -e ' %s:' /proc/net/dev" % devname):
critical('%s No such net interface: %s' % (server, devname))
if devname not in inferfaces:
inferfaces[devname] = []
inferfaces[devname].append(ip)
for ip in servers_clients: for ip in servers_clients:
client = servers_clients[ip] client = servers_clients[ip]
ret = client.execute_command('cat /proc/sys/fs/aio-max-nr /proc/sys/fs/aio-nr') ret = client.execute_command('cat /proc/sys/fs/aio-max-nr /proc/sys/fs/aio-nr')
...@@ -179,9 +221,10 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs): ...@@ -179,9 +221,10 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs):
if ret: if ret:
for total, avail, path in re.findall('(\d+)\s+(\d+)\s+(.+)', ret.stdout): for total, avail, path in re.findall('(\d+)\s+(\d+)\s+(.+)', ret.stdout):
disk[path] = { disk[path] = {
'toatl': int(total) << 10, 'total': int(total) << 10,
'avail': int(avail) << 10, 'avail': int(avail) << 10,
'need': 0 'need': 0,
'threshold': 2
} }
for path in servers_disk[ip]: for path in servers_disk[ip]:
kp = '/' kp = '/'
...@@ -191,16 +234,77 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs): ...@@ -191,16 +234,77 @@ def start_check(plugin_context, strict_check=False, *args, **kwargs):
kp = p kp = p
need = servers_disk[ip][path]['need'] need = servers_disk[ip][path]['need']
if isinstance(need, int): if isinstance(need, int):
disk[kp]['need'] += disk[kp]['toatl'] * need / 100 disk[kp]['need'] += disk[kp]['total'] * need / 100
else: else:
try:
disk[kp]['need'] += parse_size(need) disk[kp]['need'] += parse_size(need)
except:
critical('datafile_size must be an integer')
return
for path in servers_clog_mount[ip]:
kp = '/'
for p in disk:
if p in path:
if len(p) > len(kp):
kp = p
disk[kp]['threshold'] = min(disk[kp]['threshold'], servers_clog_mount[ip][path]['threshold'])
for p in disk: for p in disk:
total = disk[p]['total']
avail = disk[p]['avail'] avail = disk[p]['avail']
need = disk[p]['need'] need = disk[p]['need']
threshold = disk[p]['threshold']
if need > 0 and threshold < 2:
alert('(%s) clog and data use the same disk (%s)' % (ip, p))
if need > avail: if need > avail:
critical('(%s) %s not enough disk space. (Avail: %s, Need: %s)' % (ip, kp, formate_size(avail), formate_size(need))) critical('(%s) %s not enough disk space. (Avail: %s, Need: %s)' % (ip, p, formate_size(avail), formate_size(need)))
elif 1.0 * (total - avail + need) / total > disk[p]['threshold']:
msg = '(%s) %s not enough disk space for clog. Use `redo_dir` to set other disk for clog' % (ip, p)
msg += ', or reduce the value of `datafile_size`' if need > 0 else '.'
critical(msg)
if success:
for ip in servers_net_inferface:
if servers_net_inferface[ip].get(None):
devinfo = client.execute_command('cat /proc/net/dev').stdout
interfaces = []
for interface in re.findall('\n\s+(\w+):', devinfo):
if interface != 'lo':
interfaces.append(interface)
if not interfaces:
interfaces = ['lo']
if len(interfaces) > 1:
servers = ','.join(str(server) for server in servers_net_inferface[ip][None])
critical('%s has more than one network inferface. Please set `devname` for (%s)' % (ip, servers))
else:
servers_net_inferface[ip][interfaces[0]] = servers_net_inferface[ip][None]
del servers_net_inferface[ip][None]
if success:
for ip in servers_net_inferface:
client = servers_clients[ip]
for devname in servers_net_inferface[ip]:
if client.is_localhost() and devname != 'lo' or (not client.is_localhost() and devname == 'lo'):
critical('%s %s fail to ping %s. Please check configuration `devname`' % (server, devname, ip))
continue
for _ip in servers_clients:
if ip == _ip:
continue
if not client.execute_command('ping -W 1 -c 1 -I %s %s' % (devname, _ip)):
critical('%s %s fail to ping %s. Please check configuration `devname`' % (server, devname, _ip))
break
if success:
times = []
for ip in servers_disk:
client = servers_clients[ip]
times.append(time_delta(client))
if times and max(times) - min(times) > 200:
critical('Cluster NTP is out of sync')
def start_check(plugin_context, strict_check=False, *args, **kwargs):
_start_check(plugin_context, strict_check)
if success: if success:
stdio.stop_loading('succeed') stdio.stop_loading('succeed')
plugin_context.return_true() plugin_context.return_true()
......
...@@ -139,7 +139,10 @@ class SshClient(object): ...@@ -139,7 +139,10 @@ class SshClient(object):
return '%s@%s:%d' % (self.config.username, self.config.host, self.config.port) return '%s@%s:%d' % (self.config.username, self.config.host, self.config.port)
def _is_local(self): def _is_local(self):
return self.config.host in ['127.0.0.1', 'localhost'] and self.config.username == getpass.getuser() return self.is_localhost() and self.config.username == getpass.getuser()
def is_localhost(self, stdio=None):
return self.config.host in ['127.0.0.1', 'localhost', '127.1', '127.0.1']
def _login(self, stdio=None): def _login(self, stdio=None):
if self.is_connected: if self.is_connected:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册