提交 fe44cd94 编写于 作者: L LINxiansheng 提交者: LINGuanRen

public upgrade py

上级 056c5a28
......@@ -17,10 +17,10 @@ list(APPEND CPACK_RPM_EXCLUDE_FROM_AUTO_FILELIST_ADDITION "/home/admin/oceanbase
set(CPACK_PACKAGE_NAME "oceanbase-ce")
set(CPACK_PACKAGE_DESCRIPTION_SUMMARY "OceanBase CE is a distributed relational database")
set(CPACK_PACKAGE_VENDOR "Ant Group CO., Ltd.")
set(CPACK_PACKAGE_VERSION 3.1.1)
set(CPACK_PACKAGE_VERSION 3.1.2)
set(CPACK_PACKAGE_VERSION_MAJOR 3)
set(CPACK_PACKAGE_VERSION_MINOR 1)
set(CPACK_PACKAGE_VERSION_PATCH 1)
set(CPACK_PACKAGE_VERSION_PATCH 2)
set(CPACK_RPM_PACKAGE_GROUP "Applications/Databases")
set(CPACK_RPM_PACKAGE_URL "https://open.oceanbase.com")
set(CPACK_RPM_PACKAGE_DESCRIPTION "OceanBase CE is a distributed relational database")
......
此差异已折叠。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
pre_upgrade_log_filename = 'upgrade_pre.log'
pre_upgrade_sql_filename = 'upgrade_sql_pre.txt'
pre_upgrade_rollback_sql_filename = 'rollback_sql_pre.txt'
post_upgrade_log_filename = 'upgrade_post.log'
post_upgrade_sql_filename = 'upgrade_sql_post.txt'
post_upgrade_rollback_sql_filename = 'rollback_sql_post.txt'
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from my_error import MyError
import sys
import mysql.connector
from mysql.connector import errorcode
import logging
import json
import config
import opts
import run_modules
import actions
import normal_ddl_actions_post
import normal_dml_actions_post
import each_tenant_dml_actions_post
import each_tenant_ddl_actions_post
import special_upgrade_action_post
# 由于用了/*+read_consistency(WEAK) */来查询,因此升级期间不能允许创建或删除租户
class UpgradeParams:
log_filename = config.post_upgrade_log_filename
sql_dump_filename = config.post_upgrade_sql_filename
rollback_sql_filename = config.post_upgrade_rollback_sql_filename
def config_logging_module(log_filenamme):
logging.basicConfig(level=logging.INFO,\
format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
datefmt='%Y-%m-%d %H:%M:%S',\
filename=log_filenamme,\
filemode='w')
# 定义日志打印格式
formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
#######################################
# 定义一个Handler打印INFO及以上级别的日志到sys.stdout
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.INFO)
# 设置日志打印格式
stdout_handler.setFormatter(formatter)
# 将定义好的stdout_handler日志handler添加到root logger
logging.getLogger('').addHandler(stdout_handler)
def dump_sql_to_file(dump_filename, tenant_id_list):
normal_ddls_str = normal_ddl_actions_post.get_normal_ddl_actions_sqls_str()
normal_dmls_str = normal_dml_actions_post.get_normal_dml_actions_sqls_str()
each_tenant_dmls_str = each_tenant_dml_actions_post.get_each_tenant_dml_actions_sqls_str(tenant_id_list)
dump_file = open(dump_filename, 'w')
dump_file.write('# 以下是upgrade_post.py脚本中的步骤\n')
dump_file.write('# 仅供upgrade_post.py脚本运行失败需要人肉的时候参考\n')
dump_file.write('\n\n')
dump_file.write('# normal ddl\n')
dump_file.write(normal_ddls_str + '\n')
dump_file.write('\n\n')
dump_file.write('# normal dml\n')
dump_file.write(normal_dmls_str + '\n')
dump_file.write('\n\n')
dump_file.write('# each tenant dml\n')
dump_file.write(each_tenant_dmls_str + '\n')
dump_file.write('\n\n')
dump_file.write('# do special upgrade actions\n')
dump_file.write('# please run ./upgrade_post.py -h [host] -P [port] -u [user] -p [password] -m special_action\n')
dump_file.write('\n\n')
dump_file.close()
def check_before_upgrade(query_cur, upgrade_params):
check_server_version(query_cur)
return
# 混部阶段执行POST脚本,会导致租户级系统表创建可能多数派处于旧binary而被GC,
# 需要规避OCP升级流程异常导致的混部阶段执行POST脚本的问题
def check_server_version(query_cur):
sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server""";
(desc, results) = query_cur.exec_query(sql);
if len(results) != 1:
raise MyError("servers build_version not match")
else:
logging.info("check server version success")
def print_stats():
logging.info('==================================================================================')
logging.info('============================== STATISTICS BEGIN ==================================')
logging.info('==================================================================================')
logging.info('succeed run sql(except sql of special actions): \n\n%s\n', actions.get_succ_sql_list_str())
logging.info('commited sql(except sql of special actions): \n\n%s\n', actions.get_commit_sql_list_str())
logging.info('==================================================================================')
logging.info('=============================== STATISTICS END ===================================')
logging.info('==================================================================================')
def do_upgrade(my_host, my_port, my_user, my_passwd, my_module_set, upgrade_params):
try:
conn = mysql.connector.connect(user = my_user,
password = my_passwd,
host = my_host,
port = my_port,
database = 'oceanbase',
raise_on_warnings = True)
cur = conn.cursor(buffered=True)
try:
query_cur = actions.QueryCursor(cur)
# 开始升级前的检查
check_before_upgrade(query_cur, upgrade_params)
# 获取租户id列表
tenant_id_list = actions.fetch_tenant_ids(query_cur)
if len(tenant_id_list) <= 0:
logging.error('distinct tenant id count is <= 0, tenant_id_count: %d', len(tenant_id_list))
raise MyError('no tenant id')
logging.info('there has %s distinct tenant ids: [%s]', len(tenant_id_list), ','.join(str(tenant_id) for tenant_id in tenant_id_list))
conn.commit()
actions.refresh_commit_sql_list()
dump_sql_to_file(upgrade_params.sql_dump_filename, tenant_id_list)
logging.info('================succeed to dump sql to file: {0}==============='.format(upgrade_params.sql_dump_filename))
if run_modules.MODULE_DDL in my_module_set:
logging.info('================begin to run ddl===============')
conn.autocommit = True
normal_ddl_actions_post.do_normal_ddl_actions(cur)
logging.info('================succeed to run ddl===============')
conn.autocommit = False
if run_modules.MODULE_EACH_TENANT_DDL in my_module_set:
has_run_ddl = True
logging.info('================begin to run each tenant ddl===============')
conn.autocommit = True
each_tenant_ddl_actions_post.do_each_tenant_ddl_actions(cur, tenant_id_list)
logging.info('================succeed to run each tenant ddl===============')
conn.autocommit = False
if run_modules.MODULE_NORMAL_DML in my_module_set:
logging.info('================begin to run normal dml===============')
normal_dml_actions_post.do_normal_dml_actions(cur)
logging.info('================succeed to run normal dml===============')
conn.commit()
actions.refresh_commit_sql_list()
logging.info('================succeed to commit dml===============')
if run_modules.MODULE_EACH_TENANT_DML in my_module_set:
logging.info('================begin to run each tenant dml===============')
conn.autocommit = True
each_tenant_dml_actions_post.do_each_tenant_dml_actions(cur, tenant_id_list)
conn.autocommit = False
logging.info('================succeed to run each tenant dml===============')
if run_modules.MODULE_SPECIAL_ACTION in my_module_set:
logging.info('================begin to run special action===============')
conn.autocommit = True
special_upgrade_action_post.do_special_upgrade(conn, cur, tenant_id_list, my_user, my_passwd)
conn.autocommit = False
actions.refresh_commit_sql_list()
logging.info('================succeed to commit special action===============')
except Exception, e:
logging.exception('run error')
raise e
finally:
# 打印统计信息
print_stats()
# 将回滚sql写到文件中
actions.dump_rollback_sql_to_file(upgrade_params.rollback_sql_filename)
cur.close()
conn.close()
except mysql.connector.Error, e:
logging.exception('connection error')
raise e
except Exception, e:
logging.exception('normal error')
raise e
def do_upgrade_by_argv(argv):
upgrade_params = UpgradeParams()
opts.change_opt_defult_value('log-file', upgrade_params.log_filename)
opts.parse_options(argv)
if not opts.has_no_local_opts():
opts.deal_with_local_opts()
else:
opts.check_db_client_opts()
log_filename = opts.get_opt_log_file()
upgrade_params.log_filename = log_filename
# 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
config_logging_module(upgrade_params.log_filename)
try:
host = opts.get_opt_host()
port = int(opts.get_opt_port())
user = opts.get_opt_user()
password = opts.get_opt_password()
cmd_module_str = opts.get_opt_module()
module_set = set([])
all_module_set = run_modules.get_all_module_set()
cmd_module_list = cmd_module_str.split(',')
for cmd_module in cmd_module_list:
if run_modules.ALL_MODULE == cmd_module:
module_set = module_set | all_module_set
elif cmd_module in all_module_set:
module_set.add(cmd_module)
else:
raise MyError('invalid module: {0}'.format(cmd_module))
logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", module=\"%s\", log-file=\"%s\"',\
host, port, user, password, module_set, log_filename)
do_upgrade(host, port, user, password, module_set, upgrade_params)
except mysql.connector.Error, e:
logging.exception('mysql connctor error')
logging.exception('run error, maybe you can reference ' + upgrade_params.rollback_sql_filename + ' to rollback it')
raise e
except Exception, e:
logging.exception('normal error')
logging.exception('run error, maybe you can reference ' + upgrade_params.rollback_sql_filename + ' to rollback it')
raise e
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from my_error import MyError
import sys
import mysql.connector
from mysql.connector import errorcode
import logging
import config
import opts
import run_modules
import actions
import normal_ddl_actions_pre
import normal_dml_actions_pre
import each_tenant_dml_actions_pre
import upgrade_sys_vars
import special_upgrade_action_pre
# 由于用了/*+read_consistency(WEAK) */来查询,因此升级期间不能允许创建或删除租户
class UpgradeParams:
log_filename = config.pre_upgrade_log_filename
sql_dump_filename = config.pre_upgrade_sql_filename
rollback_sql_filename = config.pre_upgrade_rollback_sql_filename
def config_logging_module(log_filenamme):
logging.basicConfig(level=logging.INFO,\
format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
datefmt='%Y-%m-%d %H:%M:%S',\
filename=log_filenamme,\
filemode='w')
# 定义日志打印格式
formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
#######################################
# 定义一个Handler打印INFO及以上级别的日志到sys.stdout
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.INFO)
# 设置日志打印格式
stdout_handler.setFormatter(formatter)
# 将定义好的stdout_handler日志handler添加到root logger
logging.getLogger('').addHandler(stdout_handler)
def dump_sql_to_file(cur, query_cur, dump_filename, tenant_id_list, update_sys_var_list, add_sys_var_list):
normal_ddls_str = normal_ddl_actions_pre.get_normal_ddl_actions_sqls_str(query_cur)
normal_dmls_str = normal_dml_actions_pre.get_normal_dml_actions_sqls_str()
each_tenant_dmls_str = each_tenant_dml_actions_pre.get_each_tenant_dml_actions_sqls_str(tenant_id_list)
sys_vars_upgrade_dmls_str = upgrade_sys_vars.get_sys_vars_upgrade_dmls_str(cur, query_cur, tenant_id_list, update_sys_var_list, add_sys_var_list)
dump_file = open(dump_filename, 'w')
dump_file.write('# 以下是upgrade_pre.py脚本中的步骤\n')
dump_file.write('# 仅供upgrade_pre.py脚本运行失败需要人肉的时候参考\n')
dump_file.write('\n\n')
dump_file.write('# normal ddl\n')
dump_file.write(normal_ddls_str + '\n')
dump_file.write('\n\n')
dump_file.write('# normal dml\n')
dump_file.write(normal_dmls_str + '\n')
dump_file.write('\n\n')
dump_file.write('# each tenant dml\n')
dump_file.write(each_tenant_dmls_str + '\n')
dump_file.write('\n\n')
dump_file.write('# upgrade sys vars\n')
dump_file.write(sys_vars_upgrade_dmls_str + '\n')
dump_file.write('\n\n')
dump_file.write('# do special upgrade actions\n')
dump_file.write('# please run ./upgrade_pre.py -h [host] -P [port] -u [user] -p [password] -m special_action\n')
dump_file.write('\n\n')
dump_file.close()
def check_before_upgrade(query_cur, upgrade_params):
return
def print_stats():
logging.info('==================================================================================')
logging.info('============================== STATISTICS BEGIN ==================================')
logging.info('==================================================================================')
logging.info('succeed run sql(except sql of special actions): \n\n%s\n', actions.get_succ_sql_list_str())
logging.info('commited sql(except sql of special actions): \n\n%s\n', actions.get_commit_sql_list_str())
logging.info('==================================================================================')
logging.info('=============================== STATISTICS END ===================================')
logging.info('==================================================================================')
def do_upgrade(my_host, my_port, my_user, my_passwd, my_module_set, upgrade_params):
try:
conn = mysql.connector.connect(user = my_user,
password = my_passwd,
host = my_host,
port = my_port,
database = 'oceanbase',
raise_on_warnings = True)
cur = conn.cursor(buffered=True)
try:
query_cur = actions.QueryCursor(cur)
# 开始升级前的检查
check_before_upgrade(query_cur, upgrade_params)
# get min_observer_version
version = actions.fetch_observer_version(query_cur)
need_check_standby_cluster = cmp(version, '2.2.40') >= 0
# 获取租户id列表
tenant_id_list = actions.fetch_tenant_ids(query_cur)
if len(tenant_id_list) <= 0:
logging.error('distinct tenant id count is <= 0, tenant_id_count: %d', len(tenant_id_list))
raise MyError('no tenant id')
logging.info('there has %s distinct tenant ids: [%s]', len(tenant_id_list), ','.join(str(tenant_id) for tenant_id in tenant_id_list))
# 计算需要添加或更新的系统变量
conn.commit()
conn.autocommit = True
(update_sys_var_list, update_sys_var_ori_list, add_sys_var_list) = upgrade_sys_vars.calc_diff_sys_var(cur, tenant_id_list[0])
dump_sql_to_file(cur, query_cur, upgrade_params.sql_dump_filename, tenant_id_list, update_sys_var_list, add_sys_var_list)
conn.autocommit = False
conn.commit()
logging.info('update system variables list: [%s]', ', '.join(str(sv) for sv in update_sys_var_list))
logging.info('update system variables original list: [%s]', ', '.join(str(sv) for sv in update_sys_var_ori_list))
logging.info('add system variables list: [%s]', ', '.join(str(sv) for sv in add_sys_var_list))
logging.info('================succeed to dump sql to file: {0}==============='.format(upgrade_params.sql_dump_filename))
if run_modules.MODULE_DDL in my_module_set:
logging.info('================begin to run ddl===============')
conn.autocommit = True
normal_ddl_actions_pre.do_normal_ddl_actions(cur)
logging.info('================succeed to run ddl===============')
conn.autocommit = False
if run_modules.MODULE_NORMAL_DML in my_module_set:
logging.info('================begin to run normal dml===============')
normal_dml_actions_pre.do_normal_dml_actions(cur)
logging.info('================succeed to run normal dml===============')
conn.commit()
actions.refresh_commit_sql_list()
logging.info('================succeed to commit dml===============')
if run_modules.MODULE_EACH_TENANT_DML in my_module_set:
logging.info('================begin to run each tenant dml===============')
conn.autocommit = True
each_tenant_dml_actions_pre.do_each_tenant_dml_actions(cur, tenant_id_list)
conn.autocommit = False
logging.info('================succeed to run each tenant dml===============')
# 更新系统变量
if run_modules.MODULE_SYSTEM_VARIABLE_DML in my_module_set:
logging.info('================begin to run system variable dml===============')
conn.autocommit = True
upgrade_sys_vars.exec_sys_vars_upgrade_dml(cur, tenant_id_list)
conn.autocommit = False
logging.info('================succeed to run system variable dml===============')
if run_modules.MODULE_SPECIAL_ACTION in my_module_set:
logging.info('================begin to run special action===============')
conn.autocommit = True
special_upgrade_action_pre.do_special_upgrade(conn, cur, tenant_id_list, my_user, my_passwd)
conn.autocommit = False
actions.refresh_commit_sql_list()
logging.info('================succeed to commit special action===============')
except Exception, e:
logging.exception('run error')
raise e
finally:
# 打印统计信息
print_stats()
# 将回滚sql写到文件中
actions.dump_rollback_sql_to_file(upgrade_params.rollback_sql_filename)
cur.close()
conn.close()
except mysql.connector.Error, e:
logging.exception('connection error')
raise e
except Exception, e:
logging.exception('normal error')
raise e
def do_upgrade_by_argv(argv):
upgrade_params = UpgradeParams()
opts.change_opt_defult_value('log-file', upgrade_params.log_filename)
opts.parse_options(argv)
if not opts.has_no_local_opts():
opts.deal_with_local_opts()
else:
opts.check_db_client_opts()
log_filename = opts.get_opt_log_file()
upgrade_params.log_filename = log_filename
# 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
config_logging_module(upgrade_params.log_filename)
try:
host = opts.get_opt_host()
port = int(opts.get_opt_port())
user = opts.get_opt_user()
password = opts.get_opt_password()
cmd_module_str = opts.get_opt_module()
module_set = set([])
all_module_set = run_modules.get_all_module_set()
cmd_module_list = cmd_module_str.split(',')
for cmd_module in cmd_module_list:
if run_modules.ALL_MODULE == cmd_module:
module_set = module_set | all_module_set
elif cmd_module in all_module_set:
module_set.add(cmd_module)
else:
raise MyError('invalid module: {0}'.format(cmd_module))
logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", module=\"%s\", log-file=\"%s\"',\
host, port, user, password, module_set, log_filename)
do_upgrade(host, port, user, password, module_set, upgrade_params)
except mysql.connector.Error, e:
logging.exception('mysql connctor error')
logging.exception('run error, maybe you can reference ' + upgrade_params.rollback_sql_filename + ' to rollback it')
raise e
except Exception, e:
logging.exception('normal error')
logging.exception('run error, maybe you can reference ' + upgrade_params.rollback_sql_filename + ' to rollback it')
raise e
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from my_error import MyError
from actions import BaseEachTenantDDLAction
from actions import reflect_action_cls_list
from actions import fetch_observer_version
from actions import QueryCursor
import logging
import time
import my_utils
import actions
'''
添加一条each tenant ddl的方法:
在本文件中,添加一个类名以"EachTenantDDLActionPost"开头并且继承自BaseEachTenantDDLAction的类,
然后在这个类中实现以下成员函数,并且每个函数执行出错都要抛错:
(1)@staticmethod get_seq_num():
返回一个代表着执行顺序的序列号,该序列号在本文件中不允许重复,若有重复则会报错。
(2)dump_before_do_action(self):
执行action sql之前把一些相关数据dump到日志中。
(3)check_before_do_action(self):
执行action sql之前的检查。
(4)dump_before_do_each_tenant_action(self, tenant_id):
执行用参数tenant_id拼成的这条action sql之前把一些相关数据dump到日志中。
(5)check_before_do_each_tenant_action(self, tenant_id):
执行用参数tenant_id拼成的这条action sql之前的检查。
(6)@staticmethod get_each_tenant_action_ddl(tenant_id):
返回用参数tenant_id拼成的一条action sql,并且该sql必须为ddl。
(7)@staticmethod get_each_tenant_rollback_sql(tenant_id):
返回一条sql,用于回滚get_each_tenant_action_ddl(tenant_id)返回的sql。
(8)dump_after_do_each_tenant_action(self, tenant_id):
执行用参数tenant_id拼成的这条action sql之后把一些相关数据dump到日志中。
(9)check_after_do_each_tenant_action(self, tenant_id):
执行用参数tenant_id拼成的这条action sql之后的检查。
(10)dump_after_do_action(self):
执行action sql之后把一些相关数据dump到日志中。
(11)check_after_do_action(self):
执行action sql之后的检查。
(12)skip_pre_check(self):
check if check_before_do_action() can be skipped
(13)skip_each_tenant_action(self):
check if check_before_do_each_tenant_action() and do_each_tenant_action() can be skipped
举例: 以下为schema拆分后加租户级系统表的示例
class EachTenantDDLActionPostCreateAllTenantBackupBackupLogArchiveStatus(BaseEachTenantDDLAction):
table_name = '__all_tenant_backup_backup_log_archive_status'
@staticmethod
def get_seq_num():
return 24
def dump_before_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """select tenant_id, table_id, table_name from {0} where table_name = '{1}'""".format(self.get_all_table_name(), self.table_name))
def skip_pre_check(self):
return True
def skip_each_tenant_action(self, tenant_id):
(desc, results) = self._query_cursor.exec_query("""select tenant_id, table_id, table_name from {0} where table_name = '{1}' and tenant_id = {2}""".format(self.get_all_table_name(), self.table_name, tenant_id))
return (1 == len(results))
def check_before_do_action(self):
(desc, results) = self._query_cursor.exec_query("""select tenant_id, table_id, table_name from {0} where table_name = '{1}'""".format(self.get_all_table_name(), self.table_name))
if len(results) > 0:
raise MyError("""{0} already created""".format(self.table_name))
def dump_before_do_each_tenant_action(self, tenant_id):
my_utils.query_and_dump_results(self._query_cursor, """select tenant_id, table_id, table_name from {0} where table_name = '{1}' and tenant_id = {2}""".format(self.get_all_table_name(), self.table_name, tenant_id))
def check_before_do_each_tenant_action(self, tenant_id):
(desc, results) = self._query_cursor.exec_query("""select tenant_id, table_id, table_name from {0} where table_name = '{1}' and tenant_id = {2}""".format(self.get_all_table_name(), self.table_name, tenant_id))
if len(results) > 0:
raise MyError("""tenant_id:{0} has already create table {1}""".format(tenant_id, self.table_name))
@staticmethod
def get_each_tenant_action_ddl(tenant_id):
pure_table_id = 303
table_id = (tenant_id << 40) | pure_table_id
return """CREATE TABLE `__all_tenant_backup_backup_log_archive_status` (
`gmt_create` timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP(6),
`gmt_modified` timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
`tenant_id` bigint(20) NOT NULL,
`incarnation` bigint(20) NOT NULL,
`log_archive_round` bigint(20) NOT NULL,
`copy_id` bigint(20) NOT NULL,
`min_first_time` timestamp(6) NOT NULL,
`max_next_time` timestamp(6) NOT NULL,
`input_bytes` bigint(20) NOT NULL DEFAULT '0',
`output_bytes` bigint(20) NOT NULL DEFAULT '0',
`deleted_input_bytes` bigint(20) NOT NULL DEFAULT '0',
`deleted_output_bytes` bigint(20) NOT NULL DEFAULT '0',
`pg_count` bigint(20) NOT NULL DEFAULT '0',
`status` varchar(64) NOT NULL DEFAULT '',
PRIMARY KEY (`tenant_id`, `incarnation`, `log_archive_round`, `copy_id`)
) TABLE_ID={0} DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMPRESSION = 'none' REPLICA_NUM = 1 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 10 TABLEGROUP = 'oceanbase'
""".format(table_id)
@staticmethod
def get_each_tenant_rollback_sql(tenant_id):
return """select 1"""
def dump_after_do_each_tenant_action(self, tenant_id):
my_utils.query_and_dump_results(self._query_cursor, """select tenant_id, table_id, table_name from {0} where table_name = '{1}' and tenant_id = {2}""".format(self.get_all_table_name(), self.table_name, tenant_id))
def check_after_do_each_tenant_action(self, tenant_id):
(desc, results) = self._query_cursor.exec_query("""select tenant_id, table_id, table_name from {0} where table_name = '{1}' and tenant_id = {2}""".format(self.get_all_table_name(), self.table_name, tenant_id))
if len(results) != 1:
raise MyError("""tenant_id:{0} create table {1} failed""".format(tenant_id, self.table_name))
def dump_after_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """select tenant_id, table_id, table_name from {0} where table_name = '{1}'""".format(self.get_all_table_name(), self.table_name))
def check_after_do_action(self):
(desc, results) = self._query_cursor.exec_query("""select tenant_id, table_id, table_name from {0} where table_name = '{1}'""".format(self.get_all_table_name(), self.table_name))
if len(results) != len(self.get_tenant_id_list()):
raise MyError("""there should be {0} rows in {1} whose table_name is {2}, but there has {3} rows like that""".format(len(self.get_tenant_id_list()), self.get_all_table_name(), self.table_name, len(results)))
'''
#升级语句对应的action要写在下面的actions begin和actions end这两行之间,
#因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
#这两行之间的这些action,如果不写在这两行之间的话会导致清空不掉相应的action。
####========******####======== actions begin ========####******========####
####========******####========= actions end =========####******========####
def do_each_tenant_ddl_actions(cur, tenant_id_list):
import each_tenant_ddl_actions_post
# 组户级系统表没法通过虚拟表暴露,需要根据版本决定查哪张实体表
query_cur = QueryCursor(cur)
version = fetch_observer_version(query_cur)
all_table_name = "__all_table"
if (cmp(version, "2.2.60") >= 0) :
all_table_name = "__all_table_v2"
cls_list = reflect_action_cls_list(each_tenant_ddl_actions_post, 'EachTenantDDLActionPost')
# set parameter
if len(cls_list) > 0:
actions.set_parameter(cur, 'enable_sys_table_ddl' , 'True')
ori_enable_ddl = actions.get_ori_enable_ddl(cur)
if ori_enable_ddl == 0:
actions.set_parameter(cur, 'enable_ddl', 'True')
for cls in cls_list:
logging.info('do each tenant ddl acion, seq_num: %d', cls.get_seq_num())
action = cls(cur, tenant_id_list)
action.set_all_table_name(all_table_name)
action.dump_before_do_action()
if False == action.skip_pre_check():
action.check_before_do_action()
else:
logging.info("skip pre check. seq_num: %d", cls.get_seq_num())
# 系统租户组户级系统表创建成功会覆盖普通租户系统表,所以系统租户要最后建表
for tenant_id in action.get_tenant_id_list():
action.dump_before_do_each_tenant_action(tenant_id)
if False == action.skip_each_tenant_action(tenant_id):
action.check_before_do_each_tenant_action(tenant_id)
action.do_each_tenant_action(tenant_id)
else:
logging.info("skip each tenant ddl action, seq_num: %d, tenant_id: %d", cls.get_seq_num(), tenant_id)
action.dump_after_do_each_tenant_action(tenant_id)
action.check_after_do_each_tenant_action(tenant_id)
action.dump_after_do_action()
action.check_after_do_action()
# reset parameter
if len(cls_list) > 0:
if ori_enable_ddl == 0:
actions.set_parameter(cur, 'enable_ddl' , 'False')
actions.set_parameter(cur, 'enable_sys_table_ddl' , 'False')
def get_each_tenant_ddl_actions_sqls_str(tenant_id_list):
import each_tenant_ddl_actions_post
ret_str = ''
cls_list = reflect_action_cls_list(each_tenant_ddl_actions_post, 'EachTenantDDLActionPost')
for i in range(0, len(cls_list)):
for j in range(0, len(tenant_id_list)):
if i > 0 or j > 0:
ret_str += '\n'
ret_str += cls_list[i].get_each_tenant_action_ddl(tenant_id_list[j]) + ';'
return ret_str
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from my_error import MyError
import mysql.connector
from mysql.connector import errorcode
from actions import BaseEachTenantDMLAction
from actions import reflect_action_cls_list
from actions import QueryCursor
from actions import check_current_cluster_is_primary
import logging
import my_utils
'''
添加一条each tenant dml的方法:
在本文件中,添加一个类名以"EachTenantDMLActionPost"开头并且继承自BaseEachTenantDMLAction的类,
然后在这个类中实现以下成员函数,并且每个函数执行出错都要抛错:
(1)@staticmethod get_seq_num():
返回一个代表着执行顺序的序列号,该序列号在本文件中不允许重复,若有重复则会报错。
(2)dump_before_do_action(self):
执行action sql之前把一些相关数据dump到日志中。
(3)check_before_do_action(self):
执行action sql之前的检查。
(4)dump_before_do_each_tenant_action(self, tenant_id):
执行用参数tenant_id拼成的这条action sql之前把一些相关数据dump到日志中。
(5)check_before_do_each_tenant_action(self, tenant_id):
执行用参数tenant_id拼成的这条action sql之前的检查。
(6)@staticmethod get_each_tenant_action_dml(tenant_id):
返回用参数tenant_id拼成的一条action sql,并且该sql必须为dml。
(7)@staticmethod get_each_tenant_rollback_sql(tenant_id):
返回一条sql,用于回滚get_each_tenant_action_dml(tenant_id)返回的sql。
(8)dump_after_do_each_tenant_action(self, tenant_id):
执行用参数tenant_id拼成的这条action sql之后把一些相关数据dump到日志中。
(9)check_after_do_each_tenant_action(self, tenant_id):
执行用参数tenant_id拼成的这条action sql之后的检查。
(10)dump_after_do_action(self):
执行action sql之后把一些相关数据dump到日志中。
(11)check_after_do_action(self):
执行action sql之后的检查。
(12)skip_pre_check(self):
check if check_before_do_action() can be skipped
(13)skip_each_tenant_action(self):
check if check_before_do_each_tenant_action() and do_each_tenant_action() can be skipped
举例:
class EachTenantDMLActionPost1(BaseEachTenantDMLAction):
@staticmethod
def get_seq_num():
return 0
def dump_before_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test1""")
def skip_pre_check(self):
return True
def skip_each_tenant_action(self, tenant_id):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test1 where c2 = 9494""")
return (len(results) > 0)
def check_before_do_action(self):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test1 where c2 = 9494""")
if len(results) > 0:
raise MyError('some rows in table test.for_test1 whose c2 column is 9494 already exists')
def dump_before_do_each_tenant_action(self, tenant_id):
my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test1 where c2 = 9494""")
def check_before_do_each_tenant_action(self, tenant_id):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test1 where pk = {0}""".format(tenant_id))
if len(results) > 0:
raise MyError('some rows in table test.for_test1 whose pk is {0} already exists'.format(tenant_id))
@staticmethod
def get_each_tenant_action_dml(tenant_id):
return """insert into test.for_test1 value ({0}, 'for test 1', 9494)""".format(tenant_id)
@staticmethod
def get_each_tenant_rollback_sql(tenant_id):
return """delete from test.for_test1 where pk = {0}""".format(tenant_id)
def dump_after_do_each_tenant_action(self, tenant_id):
my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test1 where c2 = 9494""")
def check_after_do_each_tenant_action(self, tenant_id):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test1 where pk = {0}""".format(tenant_id))
if len(results) != 1:
raise MyError('there should be only one row whose primary key is {0} in table test.for_test1, but there has {1} rows like that'.format(tenant_id, len(results)))
elif results[0][0] != tenant_id or results[0][1] != 'for test 1' or results[0][2] != 9494:
raise MyError('the row that has been inserted is not expected, it is: [{0}]'.format(','.join(str(r) for r in results[0])))
def dump_after_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test1""")
def check_after_do_action(self):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test1 where c2 = 9494""")
if len(results) != len(self.get_tenant_id_list()):
raise MyError('there should be {0} rows whose c2 column is 9494 in table test.for_test1, but there has {1} rows like that'.format(len(self.get_tenant_id_list()), len(results)))
'''
#升级语句对应的action要写在下面的actions begin和actions end这两行之间,
#因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
#这两行之间的这些action,如果不写在这两行之间的话会导致清空不掉相应的action。
####========******####======== actions begin ========####******========####
####========******####========= actions end =========####******========####
def get_actual_tenant_id(tenant_id):
return tenant_id if (1 == tenant_id) else 0;
def do_each_tenant_dml_actions_by_standby_cluster(standby_cluster_infos):
try:
tenant_id_list = [1]
for standby_cluster_info in standby_cluster_infos:
logging.info("do_each_tenant_dml_actions_by_standby_cluster: cluster_id = {0}, ip = {1}, port = {2}"
.format(standby_cluster_info['cluster_id'],
standby_cluster_info['ip'],
standby_cluster_info['port']))
logging.info("create connection : cluster_id = {0}, ip = {1}, port = {2}"
.format(standby_cluster_info['cluster_id'],
standby_cluster_info['ip'],
standby_cluster_info['port']))
conn = mysql.connector.connect(user = standby_cluster_info['user'],
password = standby_cluster_info['pwd'],
host = standby_cluster_info['ip'],
port = standby_cluster_info['port'],
database = 'oceanbase',
raise_on_warnings = True)
cur = conn.cursor(buffered=True)
conn.autocommit = True
query_cur = QueryCursor(cur)
is_primary = check_current_cluster_is_primary(query_cur)
if is_primary:
logging.exception("""primary cluster changed : cluster_id = {0}, ip = {1}, port = {2}"""
.format(standby_cluster_info['cluster_id'],
standby_cluster_info['ip'],
standby_cluster_info['port']))
raise e
## process
do_each_tenant_dml_actions(cur, tenant_id_list)
cur.close()
conn.close()
except Exception, e:
logging.exception("""do_each_tenant_dml_actions_by_standby_cluster failed""")
raise e
def do_each_tenant_dml_actions(cur, tenant_id_list):
import each_tenant_dml_actions_post
cls_list = reflect_action_cls_list(each_tenant_dml_actions_post, 'EachTenantDMLActionPost')
for cls in cls_list:
logging.info('do each tenant dml acion, seq_num: %d', cls.get_seq_num())
action = cls(cur, tenant_id_list)
sys_tenant_id = 1
action.change_tenant(sys_tenant_id)
action.dump_before_do_action()
if False == action.skip_pre_check():
action.check_before_do_action()
else:
logging.info("skip pre check. seq_num: %d", cls.get_seq_num())
for tenant_id in action.get_tenant_id_list():
action.change_tenant(tenant_id)
action.dump_before_do_each_tenant_action(tenant_id)
if False == action.skip_each_tenant_action(tenant_id):
action.check_before_do_each_tenant_action(tenant_id)
action.do_each_tenant_action(tenant_id)
else:
logging.info("skip each tenant dml action, seq_num: %d, tenant_id: %d", cls.get_seq_num(), tenant_id)
action.dump_after_do_each_tenant_action(tenant_id)
action.check_after_do_each_tenant_action(tenant_id)
action.change_tenant(sys_tenant_id)
action.dump_after_do_action()
action.check_after_do_action()
def get_each_tenant_dml_actions_sqls_str(tenant_id_list):
import each_tenant_dml_actions_post
ret_str = ''
cls_list = reflect_action_cls_list(each_tenant_dml_actions_post, 'EachTenantDMLActionPost')
for i in range(0, len(cls_list)):
for j in range(0, len(tenant_id_list)):
if i > 0 or j > 0:
ret_str += '\n'
ret_str += cls_list[i].get_each_tenant_action_dml(tenant_id_list[j]) + ';'
return ret_str
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from my_error import MyError
import mysql.connector
from mysql.connector import errorcode
from actions import BaseEachTenantDMLAction
from actions import reflect_action_cls_list
from actions import fetch_observer_version
from actions import QueryCursor
from actions import check_current_cluster_is_primary
import logging
import my_utils
import actions
import re
'''
添加一条each tenant dml的方法:
在本文件中,添加一个类名以"EachTenantDMLActionPre"开头并且继承自BaseEachTenantDMLAction的类,
然后在这个类中实现以下成员函数,并且每个函数执行出错都要抛错:
(1)@staticmethod get_seq_num():
返回一个代表着执行顺序的序列号,该序列号在本文件中不允许重复,若有重复则会报错。
(2)dump_before_do_action(self):
执行action sql之前把一些相关数据dump到日志中。
(3)check_before_do_action(self):
执行action sql之前的检查。
(4)dump_before_do_each_tenant_action(self, tenant_id):
执行用参数tenant_id拼成的这条action sql之前把一些相关数据dump到日志中。
(5)check_before_do_each_tenant_action(self, tenant_id):
执行用参数tenant_id拼成的这条action sql之前的检查。
(6)@staticmethod get_each_tenant_action_dml(tenant_id):
返回用参数tenant_id拼成的一条action sql,并且该sql必须为dml。
(7)@staticmethod get_each_tenant_rollback_sql(tenant_id):
返回一条sql,用于回滚get_each_tenant_action_dml(tenant_id)返回的sql。
(8)dump_after_do_each_tenant_action(self, tenant_id):
执行用参数tenant_id拼成的这条action sql之后把一些相关数据dump到日志中。
(9)check_after_do_each_tenant_action(self, tenant_id):
执行用参数tenant_id拼成的这条action sql之后的检查。
(10)dump_after_do_action(self):
执行action sql之后把一些相关数据dump到日志中。
(11)check_after_do_action(self):
执行action sql之后的检查。
(12)skip_pre_check(self):
check if check_before_do_action() can be skipped
(13)skip_each_tenant_action(self):
check if check_before_do_each_tenant_action() and do_each_tenant_action() can be skipped
举例:
class EachTenantDMLActionPre1(BaseEachTenantDMLAction):
@staticmethod
def get_seq_num():
return 0
def dump_before_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test1""")
def skip_pre_check(self):
return True
def skip_each_tenant_action(self, tenant_id):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test1 where c2 = 9494""")
return (len(results) > 0)
def check_before_do_action(self):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test1 where c2 = 9494""")
if len(results) > 0:
raise MyError('some rows in table test.for_test1 whose c2 column is 9494 already exists')
def dump_before_do_each_tenant_action(self, tenant_id):
my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test1 where c2 = 9494""")
def check_before_do_each_tenant_action(self, tenant_id):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test1 where pk = {0}""".format(tenant_id))
if len(results) > 0:
raise MyError('some rows in table test.for_test1 whose pk is {0} already exists'.format(tenant_id))
@staticmethod
def get_each_tenant_action_dml(tenant_id):
return """insert into test.for_test1 value ({0}, 'for test 1', 9494)""".format(tenant_id)
@staticmethod
def get_each_tenant_rollback_sql(tenant_id):
return """delete from test.for_test1 where pk = {0}""".format(tenant_id)
def dump_after_do_each_tenant_action(self, tenant_id):
my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test1 where c2 = 9494""")
def check_after_do_each_tenant_action(self, tenant_id):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test1 where pk = {0}""".format(tenant_id))
if len(results) != 1:
raise MyError('there should be only one row whose primary key is {0} in table test.for_test1, but there has {1} rows like that'.format(tenant_id, len(results)))
elif results[0][0] != tenant_id or results[0][1] != 'for test 1' or results[0][2] != 9494:
raise MyError('the row that has been inserted is not expected, it is: [{0}]'.format(','.join(str(r) for r in results[0])))
def dump_after_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test1""")
def check_after_do_action(self):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test1 where c2 = 9494""")
if len(results) != len(self.get_tenant_id_list()):
raise MyError('there should be {0} rows whose c2 column is 9494 in table test.for_test1, but there has {1} rows like that'.format(len(self.get_tenant_id_list()), len(results)))
'''
#升级语句对应的action要写在下面的actions begin和actions end这两行之间,
#因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
#这两行之间的这些action,如果不写在这两行之间的话会导致清空不掉相应的action。
####========******####======== actions begin ========####******========####
####========******####========= actions end =========####******========####
def get_actual_tenant_id(tenant_id):
return tenant_id if (1 == tenant_id) else 0;
def do_each_tenant_dml_actions(cur, tenant_id_list):
import each_tenant_dml_actions_pre
cls_list = reflect_action_cls_list(each_tenant_dml_actions_pre, 'EachTenantDMLActionPre')
# check if pre upgrade script can run reentrantly
query_cur = QueryCursor(cur)
version = fetch_observer_version(query_cur)
can_skip = False
if (cmp(version, "2.2.77") >= 0 and cmp(version, "3.0.0") < 0):
can_skip = True
elif (cmp(version, "3.1.1") >= 0):
can_skip = True
else:
can_skip = False
for cls in cls_list:
logging.info('do each tenant dml acion, seq_num: %d', cls.get_seq_num())
action = cls(cur, tenant_id_list)
sys_tenant_id = 1
action.change_tenant(sys_tenant_id)
action.dump_before_do_action()
if False == can_skip or False == action.skip_pre_check():
action.check_before_do_action()
else:
logging.info("skip pre check. seq_num: %d", cls.get_seq_num())
for tenant_id in action.get_tenant_id_list():
action.change_tenant(tenant_id)
action.dump_before_do_each_tenant_action(tenant_id)
if False == can_skip or False == action.skip_each_tenant_action(tenant_id):
action.check_before_do_each_tenant_action(tenant_id)
action.do_each_tenant_action(tenant_id)
else:
logging.info("skip each tenant dml action, seq_num: %d, tenant_id: %d", cls.get_seq_num(), tenant_id)
action.dump_after_do_each_tenant_action(tenant_id)
action.check_after_do_each_tenant_action(tenant_id)
action.change_tenant(sys_tenant_id)
action.dump_after_do_action()
action.check_after_do_action()
def do_each_tenant_dml_actions_by_standby_cluster(standby_cluster_infos):
try:
tenant_id_list = [1]
for standby_cluster_info in standby_cluster_infos:
logging.info("do_each_tenant_dml_actions_by_standby_cluster: cluster_id = {0}, ip = {1}, port = {2}"
.format(standby_cluster_info['cluster_id'],
standby_cluster_info['ip'],
standby_cluster_info['port']))
logging.info("create connection : cluster_id = {0}, ip = {1}, port = {2}"
.format(standby_cluster_info['cluster_id'],
standby_cluster_info['ip'],
standby_cluster_info['port']))
conn = mysql.connector.connect(user = standby_cluster_info['user'],
password = standby_cluster_info['pwd'],
host = standby_cluster_info['ip'],
port = standby_cluster_info['port'],
database = 'oceanbase',
raise_on_warnings = True)
cur = conn.cursor(buffered=True)
conn.autocommit = True
query_cur = QueryCursor(cur)
is_primary = check_current_cluster_is_primary(query_cur)
if is_primary:
logging.exception("""primary cluster changed : cluster_id = {0}, ip = {1}, port = {2}"""
.format(standby_cluster_info['cluster_id'],
standby_cluster_info['ip'],
standby_cluster_info['port']))
raise e
## process
do_each_tenant_dml_actions(cur, tenant_id_list)
cur.close()
conn.close()
except Exception, e:
logging.exception("""do_each_tenant_dml_actions_by_standby_cluster failed""")
raise e
def get_each_tenant_dml_actions_sqls_str(tenant_id_list):
import each_tenant_dml_actions_pre
ret_str = ''
cls_list = reflect_action_cls_list(each_tenant_dml_actions_pre, 'EachTenantDMLActionPre')
for i in range(0, len(cls_list)):
for j in range(0, len(tenant_id_list)):
if i > 0 or j > 0:
ret_str += '\n'
ret_str += cls_list[i].get_each_tenant_action_dml(tenant_id_list[j]) + ';'
return ret_str
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import stat
def get_py_filename_list(except_filter_filename_list):
py_filename_list = []
filename_list = os.listdir(os.path.dirname(os.path.abspath(sys.argv[0])))
for filename in filename_list:
if filename.endswith('.py'):
is_filtered = False
for except_filter_filename in except_filter_filename_list:
if filename == except_filter_filename:
is_filtered = True
break
if False == is_filtered:
py_filename_list.append(filename)
py_filename_list.sort()
return py_filename_list
def get_concat_sub_files_lines(py_filename_list, file_splitter_line, \
sub_filename_line_prefix, sub_file_module_end_line):
concat_sub_files_lines = []
# 写入__init__.py
concat_sub_files_lines.append(file_splitter_line + '\n')
concat_sub_files_lines.append(sub_filename_line_prefix + '__init__.py\n')
concat_sub_files_lines.append('##!/usr/bin/env python\n')
concat_sub_files_lines.append('## -*- coding: utf-8 -*-\n')
# 写入其他py文件
for sub_py_filename in py_filename_list:
sub_py_file = open(sub_py_filename, 'r')
sub_py_file_lines = sub_py_file.readlines()
concat_sub_files_lines.append(file_splitter_line + '\n')
concat_sub_files_lines.append(sub_filename_line_prefix + sub_py_filename + '\n')
for sub_py_file_line in sub_py_file_lines:
concat_sub_files_lines.append('#' + sub_py_file_line)
sub_py_file.close()
concat_sub_files_lines.append(file_splitter_line + '\n')
concat_sub_files_lines.append(sub_file_module_end_line + '\n')
return concat_sub_files_lines
def gen_upgrade_script(filename, concat_sub_files_lines, extra_lines_str):
os.chmod(filename, stat.S_IRUSR + stat.S_IWUSR + stat.S_IXUSR + stat.S_IRGRP + stat.S_IXGRP + stat.S_IROTH + stat.S_IXOTH)
file = open(filename, 'w')
file.write('#!/usr/bin/env python\n')
file.write('# -*- coding: utf-8 -*-\n')
for concat_sub_files_line in concat_sub_files_lines:
file.write(concat_sub_files_line)
file.write('\n')
file.write(extra_lines_str)
file.close()
os.chmod(filename, stat.S_IRUSR + stat.S_IXUSR + stat.S_IRGRP + stat.S_IXGRP + stat.S_IROTH + stat.S_IXOTH)
def get_main_func_str(run_filename):
return """
if __name__ == '__main__':
cur_filename = sys.argv[0][sys.argv[0].rfind(os.sep)+1:]
(cur_file_short_name,cur_file_ext_name1) = os.path.splitext(sys.argv[0])
(cur_file_real_name,cur_file_ext_name2) = os.path.splitext(cur_filename)
sub_files_dir_suffix = '_extract_files_' + datetime.datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') + '_' + random_str()
sub_files_dir = cur_file_short_name + sub_files_dir_suffix
sub_files_short_dir = cur_file_real_name + sub_files_dir_suffix
split_py_files(sub_files_dir)
exec('from ' + sub_files_short_dir + '.{run_module_name} import do_upgrade_by_argv')
do_upgrade_by_argv(sys.argv[1:])
""".format(run_module_name = run_filename[0:run_filename.rfind('.')])
def get_pre_and_post_extra_lines_strs(upgrade_pre_filename, upgrade_post_filename, \
do_upgrade_pre_filename, do_upgrade_post_filename, \
file_splitter_line, sub_filename_line_prefix, sub_file_module_end_line):
upgrade_common_lines = """
import os
import sys
import datetime
from random import Random
class SplitError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
def random_str(rand_str_len = 8):
str = ''
chars = 'AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz0123456789'
length = len(chars) - 1
random = Random()
for i in range(rand_str_len):
str += chars[random.randint(0, length)]
return str
def split_py_files(sub_files_dir):
char_enter = '\\n'
file_splitter_line = '{file_splitter_line}'
sub_filename_line_prefix = '{sub_filename_line_prefix}'
sub_file_module_end_line = '{sub_file_module_end_line}'
os.makedirs(sub_files_dir)
print('succeed to create run dir: ' + sub_files_dir + char_enter)
cur_file = open(sys.argv[0], 'r')
cur_file_lines = cur_file.readlines()
cur_file_lines_count = len(cur_file_lines)
sub_file_lines = []
sub_filename = ''
begin_read_sub_py_file = False
is_first_splitter_line = True
i = 0
while i < cur_file_lines_count:
if (file_splitter_line + char_enter) != cur_file_lines[i]:
if begin_read_sub_py_file:
sub_file_lines.append(cur_file_lines[i])
else:
if is_first_splitter_line:
is_first_splitter_line = False
else:
#读完一个子文件了,写到磁盘中
sub_file = open(sub_files_dir + '/' + sub_filename, 'w')
for sub_file_line in sub_file_lines:
sub_file.write(sub_file_line[1:])
sub_file.close()
#清空sub_file_lines
sub_file_lines = []
#再读取下一行的文件名或者结束标记
i += 1
if i >= cur_file_lines_count:
raise SplitError('invalid line index:' + str(i) + ', lines_count:' + str(cur_file_lines_count))
elif (sub_file_module_end_line + char_enter) == cur_file_lines[i]:
print 'succeed to split all sub py files'
break
else:
mark_idx = cur_file_lines[i].find(sub_filename_line_prefix)
if 0 != mark_idx:
raise SplitError('invalid sub file name line, mark_idx = ' + str(mark_idx) + ', line = ' + cur_file_lines[i])
else:
sub_filename = cur_file_lines[i][len(sub_filename_line_prefix):-1]
begin_read_sub_py_file = True
i += 1
cur_file.close()
""".format(file_splitter_line = file_splitter_line, \
sub_filename_line_prefix = sub_filename_line_prefix, \
sub_file_module_end_line = sub_file_module_end_line)
upgrade_pre_main_func_lines_str = get_main_func_str(do_upgrade_pre_filename)
upgrade_post_main_func_lines_str = get_main_func_str(do_upgrade_post_filename)
upgrade_pre_extra_lines_str = upgrade_common_lines + '\n' + upgrade_pre_main_func_lines_str
upgrade_post_extra_lines_str = upgrade_common_lines + '\n' + upgrade_post_main_func_lines_str
return (upgrade_pre_extra_lines_str, upgrade_post_extra_lines_str)
if __name__ == '__main__':
upgrade_pre_filename = 'upgrade_pre.py'
upgrade_post_filename = 'upgrade_post.py'
do_upgrade_pre_filename = 'do_upgrade_pre.py'
do_upgrade_post_filename = 'do_upgrade_post.py'
cur_filename = sys.argv[0][sys.argv[0].rfind(os.sep)+1:]
except_filter_filename_list = [cur_filename, upgrade_pre_filename, upgrade_post_filename]
file_splitter_line = '####====XXXX======######==== I am a splitter ====######======XXXX====####'
sub_filename_line_prefix = '#filename:'
sub_file_module_end_line = '#sub file module end'
(upgrade_pre_extra_lines_str, upgrade_post_extra_lines_str) = \
get_pre_and_post_extra_lines_strs(upgrade_pre_filename, upgrade_post_filename, \
do_upgrade_pre_filename, do_upgrade_post_filename, \
file_splitter_line, sub_filename_line_prefix, sub_file_module_end_line)
py_filename_list = get_py_filename_list(except_filter_filename_list)
concat_sub_files_lines = get_concat_sub_files_lines(py_filename_list, file_splitter_line, \
sub_filename_line_prefix, sub_file_module_end_line)
gen_upgrade_script(upgrade_pre_filename, concat_sub_files_lines, upgrade_pre_extra_lines_str)
gen_upgrade_script(upgrade_post_filename, concat_sub_files_lines, upgrade_post_extra_lines_str)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
class MyError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import mysql.connector
from mysql.connector import errorcode
from my_error import MyError
from actions import QueryCursor
import logging
def results_to_str(desc, results):
ret_str = ''
max_width_list = []
for col_desc in desc:
max_width_list.append(len(str(col_desc[0])))
col_count = len(max_width_list)
for result in results:
if col_count != len(result):
raise MyError('column count is not equal, desc column count: {0}, data column count: {1}'.format(col_count, len(result)))
for i in range(0, col_count):
result_col_width = len(str(result[i]))
if max_width_list[i] < result_col_width:
max_width_list[i] = result_col_width
# 打印列名
for i in range(0, col_count):
if i > 0:
ret_str += ' ' # 空四格
ret_str += str(desc[i][0])
# 补足空白
for j in range(0, max_width_list[i] - len(str(desc[i][0]))):
ret_str += ' '
# 打印数据
for result in results:
ret_str += '\n' # 先换行
for i in range(0, col_count):
if i > 0:
ret_str += ' ' # 空四格
ret_str += str(result[i])
# 补足空白
for j in range(0, max_width_list[i] - len(str(result[i]))):
ret_str += ' '
return ret_str
def query_and_dump_results(query_cur, sql):
(desc, results) = query_cur.exec_query(sql)
result_str = results_to_str(desc, results)
logging.info('dump query results, sql: %s, results:\n%s', sql, result_str)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from my_error import MyError
from actions import BaseDDLAction
from actions import reflect_action_cls_list
import logging
import time
import my_utils
import actions
'''
添加一条normal ddl的方法:
在本文件中,添加一个类名以"NormalDDLActionPost"开头并且继承自BaseDDLAction的类,
然后在这个类中实现以下成员函数,并且每个函数执行出错都要抛错:
(1)@staticmethod get_seq_num():
返回一个代表着执行顺序的序列号,该序列号在本文件中不允许重复,若有重复则会报错。
(2)dump_before_do_action(self):
执行action sql之前把一些相关数据dump到日志中。
(3)check_before_do_action(self):
执行action sql之前的检查。
(4)@staticmethod get_action_ddl():
返回action sql,并且该sql必须为ddl。
(5)@staticmethod get_rollback_sql():
返回回滚该action的sql。
(6)dump_after_do_action(self):
执行action sql之后把一些相关数据dump到日志中。
(7)check_after_do_action(self):
执行action sql之后的检查。
(8)skip_action(self):
check if check_before_do_action() and do_action() can be skipped
改列示例:
class NormalDDLActionPostModifyAllRestoreInfoValue(BaseDDLAction):
@staticmethod
def get_seq_num():
return 12
def dump_before_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """desc oceanbase.__all_restore_info""")
def skip_action(self):
(desc, results) = self._query_cursor.exec_query("""show columns from oceanbase.__all_restore_info where field = 'value' and type = 'longtext'""")
return len(results) > 0
def check_before_do_action(self):
(desc, results) = self._query_cursor.exec_query("""show columns from oceanbase.__all_restore_info like 'value'""")
if len(results) != 1:
raise MyError('table oceanbase.__all_rootservice_event_history column value not exists')
@staticmethod
def get_action_ddl():
return """alter table oceanbase.__all_restore_info modify column `value` longtext NOT NULL"""
@staticmethod
def get_rollback_sql():
return """"""
def dump_after_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """desc oceanbase.__all_restore_info""")
def check_after_do_action(self):
(desc, results) = self._query_cursor.exec_query("""show columns from oceanbase.__all_restore_info where field = 'value' and type = 'longtext'""")
if len(results) != 1:
raise MyError('fail to modify column value for oceanbase.__all_restore_info')
加列示例:
class NormalDDLActionPostAllTenantProfileAddVerifyFunction(BaseDDLAction):
@staticmethod
def get_seq_num():
return 0
def dump_before_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """desc oceanbase.__all_tenant_profile""")
def skip_action(self):
(desc, results) = self._query_cursor.exec_query("""show columns from oceanbase.__all_tenant_profile like 'password_verify_function'""")
return len(results) > 0;
def check_before_do_action(self):
(desc, results) = self._query_cursor.exec_query("""show columns from oceanbase.__all_tenant_profile like 'password_verify_function'""")
if len(results) != 0:
raise MyError('password_verify_function column alread exists')
@staticmethod
def get_action_ddl():
return """alter table oceanbase.__all_tenant_profile add column `password_verify_function` varchar(30) DEFAULT NULL id 23"""
@staticmethod
def get_rollback_sql():
return """alter table oceanbase.__all_tenant_profile drop column password_verify_function"""
def dump_after_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """desc oceanbase.__all_tenant_profile""")
def check_after_do_action(self):
(desc, results) = self._query_cursor.exec_query("""show columns from oceanbase.__all_tenant_profile like 'password_verify_function'""")
if len(results) != 1:
raise MyError('failed to add column password_verify_function for oceanbase.__all_tenant_profile')
'''
#升级语句对应的action要写在下面的actions begin和actions end这两行之间,
#因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
#这两行之间的这些action,如果不写在这两行之间的话会导致清空不掉相应的action。
####========******####======== actions begin ========####******========####
####========******####========= actions end =========####******========####
def do_normal_ddl_actions(cur):
import normal_ddl_actions_post
cls_list = reflect_action_cls_list(normal_ddl_actions_post, 'NormalDDLActionPost')
# set parameter
if len(cls_list) > 0:
actions.set_parameter(cur, 'enable_sys_table_ddl' , 'True')
ori_enable_ddl = actions.get_ori_enable_ddl(cur)
if ori_enable_ddl == 0:
actions.set_parameter(cur, 'enable_ddl', 'True')
for cls in cls_list:
logging.info('do normal ddl acion, seq_num: %d', cls.get_seq_num())
action = cls(cur)
action.dump_before_do_action()
if False == action.skip_action():
action.check_before_do_action()
action.do_action()
else:
logging.info("skip ddl action, seq_num: %d", cls.get_seq_num())
action.dump_after_do_action()
action.check_after_do_action()
# reset parameter
if len(cls_list) > 0:
if ori_enable_ddl == 0:
actions.set_parameter(cur, 'enable_ddl' , 'False')
actions.set_parameter(cur, 'enable_sys_table_ddl' , 'False')
def get_normal_ddl_actions_sqls_str():
import normal_ddl_actions_post
ret_str = ''
cls_list = reflect_action_cls_list(normal_ddl_actions_post, 'NormalDDLActionPost')
for i in range(0, len(cls_list)):
if i > 0:
ret_str += '\n'
ret_str += cls_list[i].get_action_ddl() + ';'
return ret_str
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from my_error import MyError
from actions import BaseDDLAction
from actions import reflect_action_cls_list
from actions import fetch_observer_version
from actions import QueryCursor
import logging
import time
import my_utils
import actions
import re
class UpgradeParams:
low_version = '1.4.73'
high_version = '2.0.0'
'''
添加一条normal ddl的方法:
在本文件中,添加一个类名以"NormalDDLActionPre"开头并且继承自BaseDDLAction的类,
然后在这个类中实现以下成员函数,并且每个函数执行出错都要抛错:
(1)@staticmethod get_seq_num():
返回一个代表着执行顺序的序列号,该序列号在本文件中不允许重复,若有重复则会报错。
(2)dump_before_do_action(self):
执行action sql之前把一些相关数据dump到日志中。
(3)check_before_do_action(self):
执行action sql之前的检查。
(4)@staticmethod get_action_ddl():
返回action sql,并且该sql必须为ddl。
(5)@staticmethod get_rollback_sql():
返回回滚该action的sql。
(6)dump_after_do_action(self):
执行action sql之后把一些相关数据dump到日志中。
(7)check_after_do_action(self):
执行action sql之后的检查。
(8)skip_action(self):
check if check_before_do_action() and do_action() can be skipped
加表示例:
class NormalDDLActionPreAddAllBackupBackupLogArchiveStatusHistory(BaseDDLAction):
table_name = '__all_backup_backup_log_archive_stat'
@staticmethod
def get_seq_num():
return 102
def dump_before_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """show tables from oceanbase like '{0}'""".format(self.table_name))
def skip_action(self):
(desc, results) = self._query_cursor.exec_query("""show tables from oceanbase like '{0}'""".format(self.table_name))
return (len(results) > 0)
def check_before_do_action(self):
(desc, results) = self._query_cursor.exec_query("""show tables from oceanbase like '{0}'""".format(self.table_name))
if len(results) > 0:
raise MyError("""table oceanabse.{0} already exists""".format(self.table_name))
@staticmethod
def get_action_ddl():
return """
CREATE TABLE `__all_backup_backup_log_archive_status_history` (
`gmt_create` timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP(6),
`gmt_modified` timestamp(6) NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6),
`tenant_id` bigint(20) NOT NULL,
`incarnation` bigint(20) NOT NULL,
`log_archive_round` bigint(20) NOT NULL,
`copy_id` bigint(20) NOT NULL,
`min_first_time` timestamp(6) NOT NULL,
`max_next_time` timestamp(6) NOT NULL,
`input_bytes` bigint(20) NOT NULL DEFAULT '0',
`output_bytes` bigint(20) NOT NULL DEFAULT '0',
`deleted_input_bytes` bigint(20) NOT NULL DEFAULT '0',
`deleted_output_bytes` bigint(20) NOT NULL DEFAULT '0',
`pg_count` bigint(20) NOT NULL DEFAULT '0',
`backup_dest` varchar(2048) DEFAULT NULL,
`is_mark_deleted` tinyint(4) DEFAULT NULL,
PRIMARY KEY (`tenant_id`, `incarnation`, `log_archive_round`, `copy_id`)
) TABLE_ID = 1099511628080 DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMPRESSION = 'none' REPLICA_NUM = 1 BLOCK_SIZE = 16384 USE_BLOOM_FILTER = FALSE TABLET_SIZE = 134217728 PCTFREE = 10 TABLEGROUP = 'oceanbase'
"""
@staticmethod
def get_rollback_sql():
return """"""
def dump_after_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """show tables from oceanbase like '{0}'""".format(self.table_name))
my_utils.query_and_dump_results(self._query_cursor, """show columns from oceanbase.{0}""".format(self.table_name))
def check_after_do_action(self):
(desc, results) = self._query_cursor.exec_query("""show tables from oceanbase like '{0}'""".format(self.table_name))
if len(results) != 1:
raise MyError("""table oceanbase.{0} not exists""".format(self.table_name))
(desc, results) = self._query_cursor.exec_query("""show columns from oceanbase.{0}""".format(self.table_name))
if len(results) != 15:
raise MyError("""table oceanbase.{0} has invalid column descs""".format(self.table_name))
改列示例:
class NormalDDLActionPreModifyAllRestoreInfoValue(BaseDDLAction):
@staticmethod
def get_seq_num():
return 12
def dump_before_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """desc oceanbase.__all_restore_info""")
def skip_action(self):
(desc, results) = self._query_cursor.exec_query("""show columns from oceanbase.__all_restore_info where field = 'value' and type = 'longtext'""")
return len(results) > 0
def check_before_do_action(self):
(desc, results) = self._query_cursor.exec_query("""show columns from oceanbase.__all_restore_info like 'value'""")
if len(results) != 1:
raise MyError('table oceanbase.__all_rootservice_event_history column value not exists')
@staticmethod
def get_action_ddl():
return """alter table oceanbase.__all_restore_info modify column `value` longtext NOT NULL"""
@staticmethod
def get_rollback_sql():
return """"""
def dump_after_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """desc oceanbase.__all_restore_info""")
def check_after_do_action(self):
(desc, results) = self._query_cursor.exec_query("""show columns from oceanbase.__all_restore_info where field = 'value' and type = 'longtext'""")
if len(results) != 1:
raise MyError('fail to modify column value for oceanbase.__all_restore_info')
加列示例:
class NormalDDLActionPreAllTenantProfileAddVerifyFunction(BaseDDLAction):
@staticmethod
def get_seq_num():
return 0
def dump_before_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """desc oceanbase.__all_tenant_profile""")
def skip_action(self):
(desc, results) = self._query_cursor.exec_query("""show columns from oceanbase.__all_tenant_profile like 'password_verify_function'""")
return len(results) > 0;
def check_before_do_action(self):
(desc, results) = self._query_cursor.exec_query("""show columns from oceanbase.__all_tenant_profile like 'password_verify_function'""")
if len(results) != 0:
raise MyError('password_verify_function column alread exists')
@staticmethod
def get_action_ddl():
return """alter table oceanbase.__all_tenant_profile add column `password_verify_function` varchar(30) DEFAULT NULL id 23"""
@staticmethod
def get_rollback_sql():
return """alter table oceanbase.__all_tenant_profile drop column password_verify_function"""
def dump_after_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """desc oceanbase.__all_tenant_profile""")
def check_after_do_action(self):
(desc, results) = self._query_cursor.exec_query("""show columns from oceanbase.__all_tenant_profile like 'password_verify_function'""")
if len(results) != 1:
raise MyError('failed to add column password_verify_function for oceanbase.__all_tenant_profile')
'''
#升级语句对应的action要写在下面的actions begin和actions end这两行之间,
#因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
#这两行之间的这些action,如果不写在这两行之间的话会导致清空不掉相应的action。
####========******####======== actions begin ========####******========####
####========******####========= actions end =========####******========####
def do_normal_ddl_actions(cur):
import normal_ddl_actions_pre
upgrade_params = UpgradeParams()
cls_list = reflect_action_cls_list(normal_ddl_actions_pre, 'NormalDDLActionPre')
# check if pre upgrade script can run reentrantly
query_cur = QueryCursor(cur)
version = fetch_observer_version(query_cur)
can_skip = False
if (cmp(version, "2.2.77") >= 0 and cmp(version, "3.0.0") < 0):
can_skip = True
elif (cmp(version, "3.1.1") >= 0):
can_skip = True
else:
can_skip = False
# set parameter
if len(cls_list) > 0:
actions.set_parameter(cur, 'enable_sys_table_ddl' , 'True')
ori_enable_ddl = actions.get_ori_enable_ddl(cur)
if ori_enable_ddl == 0:
actions.set_parameter(cur, 'enable_ddl', 'True')
for cls in cls_list:
logging.info('do normal ddl acion, seq_num: %d', cls.get_seq_num())
action = cls(cur)
action.dump_before_do_action()
if False == can_skip or False == action.skip_action():
action.check_before_do_action()
action.do_action()
else:
logging.info("skip ddl action, seq_num: %d", cls.get_seq_num())
action.dump_after_do_action()
action.check_after_do_action()
# reset parameter
if len(cls_list) > 0:
if ori_enable_ddl == 0:
actions.set_parameter(cur, 'enable_ddl' , 'False')
actions.set_parameter(cur, 'enable_sys_table_ddl' , 'False')
def get_normal_ddl_actions_sqls_str(query_cur):
import normal_ddl_actions_pre
ret_str = ''
cls_list = reflect_action_cls_list(normal_ddl_actions_pre, 'NormalDDLActionPre')
for i in range(0, len(cls_list)):
if i > 0:
ret_str += '\n'
ret_str += cls_list[i].get_action_ddl() + ';'
return ret_str
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from my_error import MyError
import mysql.connector
from mysql.connector import errorcode
from actions import BaseDMLAction
from actions import reflect_action_cls_list
from actions import QueryCursor
from actions import check_current_cluster_is_primary
import logging
import my_utils
'''
添加一条normal dml的方法:
在本文件中,添加一个类名以"NormalDMLActionPost"开头并且继承自BaseDMLAction的类,
然后在这个类中实现以下成员函数,并且每个函数执行出错都要抛错:
(1)@staticmethod get_seq_num():
返回一个代表着执行顺序的序列号,该序列号在本文件中不允许重复,若有重复则会报错。
(2)dump_before_do_action(self):
执行action sql之前把一些相关数据dump到日志中。
(3)check_before_do_action(self):
执行action sql之前的检查。
(4)@staticmethod get_action_dml():
返回action sql,并且该sql必须为dml。
(5)@staticmethod get_rollback_sql():
返回回滚该action的sql。
(6)dump_after_do_action(self):
执行action sql之后把一些相关数据dump到日志中。
(7)check_after_do_action(self):
执行action sql之后的检查。
(8)skip_action(self):
check if check_before_do_action() and do_action() can be skipped
举例:
class NormalDMLActionPost1(BaseDMLAction):
@staticmethod
def get_seq_num():
return 0
def dump_before_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test""")
def skip_action(self):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test where pk = 9""")
return (len(results) > 0)
def check_before_do_action(self):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test where pk = 9""")
if len(results) > 0:
raise MyError('some row in table test.for_test whose primary key is 9 already exists')
@staticmethod
def get_action_dml():
return """insert into test.for_test values (9, 'haha', 99)"""
@staticmethod
def get_rollback_sql():
return """delete from test.for_test where pk = 9"""
def dump_after_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test""")
def check_after_do_action(self):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test where pk = 9""")
if len(results) != 1:
raise MyError('there should be only one row whose primary key is 9 in table test.for_test, but there has {0} rows like that'.format(len(results)))
elif results[0][0] != 9 or results[0][1] != 'haha' or results[0][2] != 99:
raise MyError('the row that has been inserted is not expected, it is: [{0}]'.format(','.join(str(r) for r in results[0])))
'''
#升级语句对应的action要写在下面的actions begin和actions end这两行之间,
#因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
#这两行之间的这些action,如果不写在这两行之间的话会导致清空不掉相应的action。
####========******####======== actions begin ========####******========####
####========******####========= actions end =========####******========####
def do_normal_dml_actions_by_standby_cluster(standby_cluster_infos):
try:
for standby_cluster_info in standby_cluster_infos:
logging.info("do_normal_dml_actions_by_standby_cluster: cluster_id = {0}, ip = {1}, port = {2}"
.format(standby_cluster_info['cluster_id'],
standby_cluster_info['ip'],
standby_cluster_info['port']))
logging.info("create connection : cluster_id = {0}, ip = {1}, port = {2}"
.format(standby_cluster_info['cluster_id'],
standby_cluster_info['ip'],
standby_cluster_info['port']))
conn = mysql.connector.connect(user = standby_cluster_info['user'],
password = standby_cluster_info['pwd'],
host = standby_cluster_info['ip'],
port = standby_cluster_info['port'],
database = 'oceanbase',
raise_on_warnings = True)
cur = conn.cursor(buffered=True)
conn.autocommit = True
query_cur = QueryCursor(cur)
is_primary = check_current_cluster_is_primary(query_cur)
if is_primary:
logging.exception("""primary cluster changed : cluster_id = {0}, ip = {1}, port = {2}"""
.format(standby_cluster_info['cluster_id'],
standby_cluster_info['ip'],
standby_cluster_info['port']))
raise e
## process
do_normal_dml_actions(cur)
cur.close()
conn.close()
except Exception, e:
logging.exception("""do_normal_dml_actions_by_standby_cluster failed""")
raise e
def do_normal_dml_actions(cur):
import normal_dml_actions_post
cls_list = reflect_action_cls_list(normal_dml_actions_post, 'NormalDMLActionPost')
for cls in cls_list:
logging.info('do normal dml acion, seq_num: %d', cls.get_seq_num())
action = cls(cur)
action.dump_before_do_action()
if False == action.skip_action():
action.check_before_do_action()
action.do_action()
else:
logging.info("skip dml action, seq_num: %d", cls.get_seq_num())
action.dump_after_do_action()
action.check_after_do_action()
def get_normal_dml_actions_sqls_str():
import normal_dml_actions_post
ret_str = ''
cls_list = reflect_action_cls_list(normal_dml_actions_post, 'NormalDMLActionPost')
for i in range(0, len(cls_list)):
if i > 0:
ret_str += '\n'
ret_str += cls_list[i].get_action_dml() + ';'
return ret_str
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from my_error import MyError
import mysql.connector
from mysql.connector import errorcode
from actions import BaseDMLAction
from actions import reflect_action_cls_list
from actions import fetch_observer_version
from actions import QueryCursor
from actions import check_current_cluster_is_primary
import logging
import my_utils
'''
添加一条normal dml的方法:
在本文件中,添加一个类名以"NormalDMLActionPre"开头并且继承自BaseDMLAction的类,
然后在这个类中实现以下成员函数,并且每个函数执行出错都要抛错:
(1)@staticmethod get_seq_num():
返回一个代表着执行顺序的序列号,该序列号在本文件中不允许重复,若有重复则会报错。
(2)dump_before_do_action(self):
执行action sql之前把一些相关数据dump到日志中。
(3)check_before_do_action(self):
执行action sql之前的检查。
(4)@staticmethod get_action_dml():
返回action sql,并且该sql必须为dml。
(5)@staticmethod get_rollback_sql():
返回回滚该action的sql。
(6)dump_after_do_action(self):
执行action sql之后把一些相关数据dump到日志中。
(7)check_after_do_action(self):
执行action sql之后的检查。
(8)skip_action(self):
check if check_before_do_action() and do_action() can be skipped
举例:
class NormalDMLActionPre1(BaseDMLAction):
@staticmethod
def get_seq_num():
return 0
def dump_before_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test""")
def skip_action(self):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test where pk = 9""")
return (len(results) > 0)
def check_before_do_action(self):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test where pk = 9""")
if len(results) > 0:
raise MyError('some row in table test.for_test whose primary key is 9 already exists')
@staticmethod
def get_action_dml():
return """insert into test.for_test values (9, 'haha', 99)"""
@staticmethod
def get_rollback_sql():
return """delete from test.for_test where pk = 9"""
def dump_after_do_action(self):
my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test""")
def check_after_do_action(self):
(desc, results) = self._query_cursor.exec_query("""select * from test.for_test where pk = 9""")
if len(results) != 1:
raise MyError('there should be only one row whose primary key is 9 in table test.for_test, but there has {0} rows like that'.format(len(results)))
elif results[0][0] != 9 or results[0][1] != 'haha' or results[0][2] != 99:
raise MyError('the row that has been inserted is not expected, it is: [{0}]'.format(','.join(str(r) for r in results[0])))
'''
#升级语句对应的action要写在下面的actions begin和actions end这两行之间,
#因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
#这两行之间的这些action,如果不写在这两行之间的话会导致清空不掉相应的action。
####========******####======== actions begin ========####******========####
####========******####========= actions end =========####******========####
def do_normal_dml_actions(cur):
import normal_dml_actions_pre
cls_list = reflect_action_cls_list(normal_dml_actions_pre, 'NormalDMLActionPre')
# check if pre upgrade script can run reentrantly
query_cur = QueryCursor(cur)
version = fetch_observer_version(query_cur)
can_skip = False
if (cmp(version, "2.2.77") >= 0 and cmp(version, "3.0.0") < 0):
can_skip = True
elif (cmp(version, "3.1.1") >= 0):
can_skip = True
else:
can_skip = False
for cls in cls_list:
logging.info('do normal dml acion, seq_num: %d', cls.get_seq_num())
action = cls(cur)
action.dump_before_do_action()
if False == can_skip or False == action.skip_action():
action.check_before_do_action()
action.do_action()
else:
logging.info("skip dml action, seq_num: %d", cls.get_seq_num())
action.dump_after_do_action()
action.check_after_do_action()
def do_normal_dml_actions_by_standby_cluster(standby_cluster_infos):
try:
for standby_cluster_info in standby_cluster_infos:
logging.info("do_normal_dml_actions_by_standby_cluster: cluster_id = {0}, ip = {1}, port = {2}"
.format(standby_cluster_info['cluster_id'],
standby_cluster_info['ip'],
standby_cluster_info['port']))
logging.info("create connection : cluster_id = {0}, ip = {1}, port = {2}"
.format(standby_cluster_info['cluster_id'],
standby_cluster_info['ip'],
standby_cluster_info['port']))
conn = mysql.connector.connect(user = standby_cluster_info['user'],
password = standby_cluster_info['pwd'],
host = standby_cluster_info['ip'],
port = standby_cluster_info['port'],
database = 'oceanbase',
raise_on_warnings = True)
cur = conn.cursor(buffered=True)
conn.autocommit = True
query_cur = QueryCursor(cur)
is_primary = check_current_cluster_is_primary(query_cur)
if is_primary:
logging.exception("""primary cluster changed : cluster_id = {0}, ip = {1}, port = {2}"""
.format(standby_cluster_info['cluster_id'],
standby_cluster_info['ip'],
standby_cluster_info['port']))
raise e
## process
do_normal_dml_actions(cur)
cur.close()
conn.close()
except Exception, e:
logging.exception("""do_normal_dml_actions_by_standby_cluster failed""")
raise e
def get_normal_dml_actions_sqls_str():
import normal_dml_actions_pre
ret_str = ''
cls_list = reflect_action_cls_list(normal_dml_actions_pre, 'NormalDMLActionPre')
for i in range(0, len(cls_list)):
if i > 0:
ret_str += '\n'
ret_str += cls_list[i].get_action_dml() + ';'
return ret_str
# 描述oceanbase各个版本升级依赖关系
# 对于每一个正式发布的ob版本,在下面的yaml文档中添加一项,包括如下属性:
# * version: 待升级的版本,或者升级过程中经过的版本;一般用A.B.C的版本号,如果是A.B.C-D的形式,表示build号必须大于等于D
# * can_be_upgraded_to: 表示该版本经过OB QA兼容性测试,可以*直接*升级到的ob版本号,是一个列表可以包括多个版本
# * deprecated: 缺省为False。如果为True,表示这个版本已经废除。可以作
# 为升级的起点,可以作为升级过度版本,但是不能作为升级目标版本。
# * require_from_binary: 缺省为False。如果为True,表示升级过程中如果作为中间版本,除了运行upgrade脚本,还需要把observer也升级到该版本
#
# OCP的OB升级模块会读取本文件,给定升级的起始版本和目标版本,自动在满
# 足上述约束的情况下寻找一个升级的最短路径。基本算法是:
# 基于如下描述构建一个图,每个版本一个结点,can_be_upgraded_to关系定义
# 一条边,寻找起始版本和升级目标两个版本之间的最短路径。除了起始点,其
# 他节点不能是deprecated。如果有A.B.C和A.B.C-D两种节点匹配,优先选择后
# 者。
#
--- # oceanbase version dependencies
- version: 3.1.1
can_be_upgraded_to:
- 3.1.2
- version: 3.1.2
can_be_upgraded_to:
- 3.1.3
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from my_error import MyError
import sys
import os
import getopt
help_str = \
"""
Help:
""" +\
sys.argv[0] + """ [OPTIONS]""" +\
'\n\n' +\
'-I, --help Display this help and exit.\n' +\
'-V, --version Output version information and exit.\n' +\
'-h, --host=name Connect to host.\n' +\
'-P, --port=name Port number to use for connection.\n' +\
'-u, --user=name User for login.\n' +\
'-p, --password=name Password to use when connecting to server. If password is\n' +\
' not given it\'s empty string "".\n' +\
'-m, --module=name Modules to run. Modules should be a string combined by some of\n' +\
' the following strings: ddl, normal_dml, each_tenant_dml,\n' +\
' system_variable_dml, special_action, all. "all" represents\n' +\
' that all modules should be run. They are splitted by ",".\n' +\
' For example: -m all, or --module=ddl,normal_dml,special_action\n' +\
'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
'\n\n' +\
'Maybe you want to run cmd like that:\n' +\
sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
version_str = """version 1.0.0"""
class Option:
__g_short_name_set = set([])
__g_long_name_set = set([])
__short_name = None
__long_name = None
__is_with_param = None
__is_local_opt = None
__has_value = None
__value = None
def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
if short_name in Option.__g_short_name_set:
raise MyError('duplicate option short name: {0}'.format(short_name))
elif long_name in Option.__g_long_name_set:
raise MyError('duplicate option long name: {0}'.format(long_name))
Option.__g_short_name_set.add(short_name)
Option.__g_long_name_set.add(long_name)
self.__short_name = short_name
self.__long_name = long_name
self.__is_with_param = is_with_param
self.__is_local_opt = is_local_opt
self.__has_value = False
if None != default_value:
self.set_value(default_value)
def is_with_param(self):
return self.__is_with_param
def get_short_name(self):
return self.__short_name
def get_long_name(self):
return self.__long_name
def has_value(self):
return self.__has_value
def get_value(self):
return self.__value
def set_value(self, value):
self.__value = value
self.__has_value = True
def is_local_opt(self):
return self.__is_local_opt
def is_valid(self):
return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value
g_opts =\
[\
Option('I', 'help', False, True),\
Option('V', 'version', False, True),\
Option('h', 'host', True, False),\
Option('P', 'port', True, False),\
Option('u', 'user', True, False),\
Option('p', 'password', True, False, ''),\
# 要跑哪个模块,默认全跑
Option('m', 'module', True, False, 'all'),\
# 日志文件路径,不同脚本的main函数中中会改成不同的默认值
Option('l', 'log-file', True, False)
]\
def change_opt_defult_value(opt_long_name, opt_default_val):
global g_opts
for opt in g_opts:
if opt.get_long_name() == opt_long_name:
opt.set_value(opt_default_val)
return
def has_no_local_opts():
global g_opts
no_local_opts = True
for opt in g_opts:
if opt.is_local_opt() and opt.has_value():
no_local_opts = False
return no_local_opts
def check_db_client_opts():
global g_opts
for opt in g_opts:
if not opt.is_local_opt() and not opt.has_value():
raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
.format(opt.get_short_name(), sys.argv[0]))
def parse_option(opt_name, opt_val):
global g_opts
for opt in g_opts:
if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
opt.set_value(opt_val)
def parse_options(argv):
global g_opts
short_opt_str = ''
long_opt_list = []
for opt in g_opts:
if opt.is_with_param():
short_opt_str += opt.get_short_name() + ':'
else:
short_opt_str += opt.get_short_name()
for opt in g_opts:
if opt.is_with_param():
long_opt_list.append(opt.get_long_name() + '=')
else:
long_opt_list.append(opt.get_long_name())
(opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
for (opt_name, opt_val) in opts:
parse_option(opt_name, opt_val)
if has_no_local_opts():
check_db_client_opts()
def deal_with_local_opt(opt):
if 'help' == opt.get_long_name():
global help_str
print help_str
elif 'version' == opt.get_long_name():
global version_str
print version_str
def deal_with_local_opts():
global g_opts
if has_no_local_opts():
raise MyError('no local options, can not deal with local options')
else:
for opt in g_opts:
if opt.is_local_opt() and opt.has_value():
deal_with_local_opt(opt)
# 只处理一个
return
def get_opt_host():
global g_opts
for opt in g_opts:
if 'host' == opt.get_long_name():
return opt.get_value()
def get_opt_port():
global g_opts
for opt in g_opts:
if 'port' == opt.get_long_name():
return opt.get_value()
def get_opt_user():
global g_opts
for opt in g_opts:
if 'user' == opt.get_long_name():
return opt.get_value()
def get_opt_password():
global g_opts
for opt in g_opts:
if 'password' == opt.get_long_name():
return opt.get_value()
def get_opt_module():
global g_opts
for opt in g_opts:
if 'module' == opt.get_long_name():
return opt.get_value()
def get_opt_log_file():
global g_opts
for opt in g_opts:
if 'log-file' == opt.get_long_name():
return opt.get_value()
#parse_options(sys.argv[1:])
此差异已折叠。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
def clear_action_codes(action_filename_list, action_begin_line, \
action_end_line, is_special_upgrade_code):
char_enter = '\n'
for action_filename in action_filename_list:
new_action_file_lines = []
action_file = open(action_filename, 'r')
action_file_lines = action_file.readlines()
is_action_codes = False
for action_file_line in action_file_lines:
if is_action_codes and action_file_line == (action_end_line + char_enter):
is_action_codes = False
if not is_action_codes:
new_action_file_lines.append(action_file_line)
if not is_action_codes and action_file_line == (action_begin_line + char_enter):
is_action_codes = True
action_file.close()
new_action_file = open(action_filename, 'w')
for new_action_file_line in new_action_file_lines:
if is_special_upgrade_code:
if new_action_file_line == (action_end_line + char_enter):
new_action_file.write(' return\n')
new_action_file.write(new_action_file_line)
new_action_file.close()
def regenerate_upgrade_script():
print('\n=========run gen_upgrade_scripts.py, begin=========\n')
info = os.popen('./gen_upgrade_scripts.py;')
print(info.read())
print('\n=========run gen_upgrade_scripts.py, end=========\n')
if __name__ == '__main__':
action_begin_line = '####========******####======== actions begin ========####******========####'
action_end_line = '####========******####========= actions end =========####******========####'
action_filename_list = \
[\
'normal_ddl_actions_pre.py',\
'normal_ddl_actions_post.py',\
'normal_dml_actions_pre.py',\
'normal_dml_actions_post.py',\
'each_tenant_dml_actions_pre.py',\
'each_tenant_dml_actions_post.py',\
'each_tenant_ddl_actions_post.py'\
]
special_upgrade_filename_list = \
[\
'special_upgrade_action_pre.py',\
'special_upgrade_action_post.py'
]
clear_action_codes(action_filename_list, action_begin_line, action_end_line, False)
clear_action_codes(special_upgrade_filename_list, action_begin_line, action_end_line, True)
regenerate_upgrade_script()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
ALL_MODULE = 'all'
MODULE_DDL = 'ddl'
MODULE_NORMAL_DML = 'normal_dml'
MODULE_EACH_TENANT_DML = 'each_tenant_dml'
MODULE_EACH_TENANT_DDL = 'each_tenant_ddl'
MODULE_SYSTEM_VARIABLE_DML = 'system_variable_dml'
MODULE_SPECIAL_ACTION = 'special_action'
def get_all_module_set():
import run_modules
module_set = set([])
attrs_from_run_module = dir(run_modules)
for attr in attrs_from_run_module:
if attr.startswith('MODULE_'):
module = getattr(run_modules, attr)
module_set.add(module)
return module_set
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
import time
from actions import Cursor
from actions import DMLCursor
from actions import QueryCursor
from actions import check_current_cluster_is_primary
import mysql.connector
from mysql.connector import errorcode
import actions
def do_special_upgrade(conn, cur, tenant_id_list, user, pwd):
# special upgrade action
#升级语句对应的action要写在下面的actions begin和actions end这两行之间,
#因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
#这两行之间的这些代码,如果不写在这两行之间的话会导致清空不掉相应的代码。
####========******####======== actions begin ========####******========####
run_upgrade_job(conn, cur, "3.1.2")
return
####========******####========= actions end =========####******========####
def trigger_schema_split_job(conn, cur, user, pwd):
try:
query_cur = actions.QueryCursor(cur)
is_primary = actions.check_current_cluster_is_primary(query_cur)
if not is_primary:
logging.warn("current cluster should by primary")
raise e
# primary cluster
trigger_schema_split_job_by_cluster(conn, cur)
# stanby cluster
standby_cluster_list = actions.fetch_standby_cluster_infos(conn, query_cur, user, pwd)
for standby_cluster in standby_cluster_list:
# connect
logging.info("start to trigger schema split by cluster: cluster_id = {0}"
.format(standby_cluster['cluster_id']))
logging.info("create connection : cluster_id = {0}, ip = {1}, port = {2}"
.format(standby_cluster['cluster_id'],
standby_cluster['ip'],
standby_cluster['port']))
tmp_conn = mysql.connector.connect(user = standby_cluster['user'],
password = standby_cluster['pwd'],
host = standby_cluster['ip'],
port = standby_cluster['port'],
database = 'oceanbase')
tmp_cur = tmp_conn.cursor(buffered=True)
tmp_conn.autocommit = True
tmp_query_cur = actions.QueryCursor(tmp_cur)
# check if stanby cluster
is_primary = actions.check_current_cluster_is_primary(tmp_query_cur)
if is_primary:
logging.exception("""primary cluster changed : cluster_id = {0}, ip = {1}, port = {2}"""
.format(standby_cluster['cluster_id'],
standby_cluster['ip'],
standby_cluster['port']))
raise e
# trigger schema split
trigger_schema_split_job_by_cluster(tmp_conn, tmp_cur)
# close
tmp_cur.close()
tmp_conn.close()
logging.info("""trigger schema split success : cluster_id = {0}, ip = {1}, port = {2}"""
.format(standby_cluster['cluster_id'],
standby_cluster['ip'],
standby_cluster['port']))
except Exception, e:
logging.warn("trigger schema split failed")
raise e
logging.info("trigger schema split success")
def trigger_schema_split_job_by_cluster(conn, cur):
try:
# check record in rs_job
sql = "select count(*) from oceanbase.__all_rootservice_job where job_type = 'SCHEMA_SPLIT_V2';"
logging.info(sql)
cur.execute(sql)
result = cur.fetchall()
if 1 != len(result) or 1 != len(result[0]):
logging.warn("unexpected result cnt")
raise e
elif 0 == result[0][0]:
# insert fail record to start job
sql = "replace into oceanbase.__all_rootservice_job(job_id, job_type, job_status, progress, rs_svr_ip, rs_svr_port) values (0, 'SCHEMA_SPLIT_V2', 'FAILED', 100, '0.0.0.0', '0');"
logging.info(sql)
cur.execute(sql)
# check record in rs_job
sql = "select count(*) from oceanbase.__all_rootservice_job where job_type = 'SCHEMA_SPLIT_V2' and job_status = 'FAILED';"
logging.info(sql)
cur.execute(sql)
result = cur.fetchall()
if 1 != len(result) or 1 != len(result[0]) or 1 != result[0][0]:
logging.warn("schema split record should be 1")
raise e
except Exception, e:
logging.warn("start schema split task failed")
raise e
logging.info("start schema split task success")
def query(cur, sql):
cur.execute(sql)
results = cur.fetchall()
return results
def get_tenant_names(cur):
return [_[0] for _ in query(cur, 'select tenant_name from oceanbase.__all_tenant')]
def update_cluster_update_table_schema_version(conn, cur):
time.sleep(30)
try:
query_timeout_sql = "set ob_query_timeout = 30000000;"
logging.info(query_timeout_sql)
cur.execute(query_timeout_sql)
sql = "alter system run job 'UPDATE_TABLE_SCHEMA_VERSION';"
logging.info(sql)
cur.execute(sql)
except Exception, e:
logging.warn("run update table schema version job failed")
raise e
logging.info("run update table schema version job success")
def run_create_inner_schema_job(conn, cur):
try:
###### enable ddl
ori_enable_ddl = actions.get_ori_enable_ddl(cur)
if ori_enable_ddl == 0:
actions.set_parameter(cur, 'enable_ddl', 'True')
# check record in rs_job
count_sql = """select count(*) from oceanbase.__all_rootservice_job
where job_type = 'CREATE_INNER_SCHEMA';"""
result = query(cur, count_sql)
job_count = 0
if (1 != len(result)) :
logging.warn("unexpected sql output")
raise e
else :
job_count = result[0][0]
# run job
sql = "alter system run job 'CREATE_INNER_SCHEMA';"
logging.info(sql)
cur.execute(sql)
# wait job finish
times = 180
## 先检查job count变化
count_check = False
while times > 0:
result = query(cur, count_sql)
if (1 != len(result)):
logging.warn("unexpected sql output")
raise e
elif (result[0][0] > job_count):
count_check = True
logging.info('create_inner_schema job detected')
break
time.sleep(10)
times -= 1
if times == 0:
raise MyError('check create_inner_schema job failed!')
## 继续检查job status状态
status_sql = """select job_status from oceanbase.__all_rootservice_job
where job_type = 'CREATE_INNER_SCHEMA' order by job_id desc limit 1;"""
status_check = False
while times > 0 and count_check == True:
result = query(cur, status_sql)
if (0 == len(result)):
logging.warn("unexpected sql output")
raise e
elif (1 != len(result) or 1 != len(result[0])):
logging.warn("result len not match")
raise e
elif result[0][0] == "FAILED":
logging.warn("run create_inner_schema job faild")
raise e
elif result[0][0] == "INPROGRESS":
logging.info('create_inner_schema job is still running')
elif result[0][0] == "SUCCESS":
status_check = True
break;
else:
logging.warn("invalid result: {0}" % (result[0][0]))
raise e
time.sleep(10)
times -= 1
if times == 0:
raise MyError('check create_inner_schema job failed!')
if (status_check == True and count_check == True):
logging.info('check create_inner_schema job success')
else:
logging.warn("run create_inner_schema job faild")
raise e
# disable ddl
if ori_enable_ddl == 0:
actions.set_parameter(cur, 'enable_ddl', 'False')
except Exception, e:
logging.warn("run create_inner_schema job failed")
raise e
logging.info("run create_inner_schema job success")
def statistic_primary_zone_count(conn, cur):
try:
###### disable ddl
ori_enable_ddl = actions.get_ori_enable_ddl(cur)
if ori_enable_ddl == 1:
actions.set_parameter(cur, 'enable_ddl', 'False')
# check record in rs_job
count_sql = """select count(*) from oceanbase.__all_rootservice_job
where job_type = 'STATISTIC_PRIMARY_ZONE_ENTITY_COUNT';"""
result = query(cur, count_sql)
job_count = 0
if (1 != len(result)) :
logging.warn("unexpected sql output")
raise e
else :
job_count = result[0][0]
# run job
sql = "alter system run job 'STATISTIC_PRIMARY_ZONE_ENTITY_COUNT';"
logging.info(sql)
cur.execute(sql)
# wait job finish
times = 180
## 先检查job count变化
count_check = False
while times > 0:
result = query(cur, count_sql)
if (1 != len(result)):
logging.warn("unexpected sql output")
raise e
elif (result[0][0] > job_count):
count_check = True
logging.info('statistic_primary_zone_entity_count job detected')
break
time.sleep(10)
times -= 1
if times == 0:
raise MyError('statistic_primary_zone_entity_count job failed!')
## 继续检查job status状态
status_sql = """select job_status from oceanbase.__all_rootservice_job
where job_type = 'STATISTIC_PRIMARY_ZONE_ENTITY_COUNT' order by job_id desc limit 1;"""
status_check = False
while times > 0 and count_check == True:
result = query(cur, status_sql)
if (0 == len(result)):
logging.warn("unexpected sql output")
raise e
elif (1 != len(result) or 1 != len(result[0])):
logging.warn("result len not match")
raise e
elif result[0][0] == "FAILED":
logging.warn("run statistic_primary_zone_entity_count job faild")
raise e
elif result[0][0] == "INPROGRESS":
logging.info('statistic_primary_zone_entity_count job is still running')
elif result[0][0] == "SUCCESS":
status_check = True
break;
else:
logging.warn("invalid result: {0}" % (result[0][0]))
raise e
time.sleep(10)
times -= 1
if times == 0:
raise MyError('check statistic_primary_zone_entity_count job failed!')
if (status_check == True and count_check == True):
logging.info('check statistic_primary_zone_entity_count job success')
else:
logging.warn("run statistic_primary_zone_entity_count job faild")
raise e
# enable ddl
if ori_enable_ddl == 1:
actions.set_parameter(cur, 'enable_ddl', 'True')
except Exception, e:
logging.warn("run statistic_primary_zone_entity_count job failed")
raise e
logging.info("run statistic_primary_zone_entity_count job success")
def disable_major_freeze(conn, cur):
try:
actions.set_parameter(cur, "enable_major_freeze", 'False')
except Exception, e:
logging.warn("disable enable_major_freeze failed")
raise e
logging.info("disable enable_major_freeze finish")
def get_max_used_job_id(cur):
try:
max_job_id = 0
sql = "select job_id from oceanbase.__all_rootservice_job order by job_id desc limit 1"
results = query(cur, sql)
if (len(results) == 0):
max_job_id = 0
elif (len(results) != 1 or len(results[0]) != 1):
logging.warn("row cnt not match")
raise e
else:
max_job_id = results[0][0]
logging.info("get max_used_job_id:{0}".format(max_job_id))
return max_job_id
except Exception, e:
logging.warn("failed to get max_used_job_id")
raise e
def check_can_run_upgrade_job(cur, version):
try:
sql = """select job_status from oceanbase.__all_rootservice_job
where job_type = 'RUN_UPGRADE_POST_JOB' and extra_info = '{0}'
order by job_id desc limit 1""".format(version)
results = query(cur, sql)
bret = True
if (len(results) == 0):
bret = True
logging.info("upgrade job not created yet, should run upgrade job")
elif (len(results) != 1 or len(results[0]) != 1):
logging.warn("row cnt not match")
raise e
elif ("INPROGRESS" == results[0][0]):
logging.warn("upgrade job still running, should wait")
raise e
elif ("SUCCESS" == results[0][0]):
bret = False
logging.info("execute upgrade job successfully, skip run upgrade job")
elif ("FAILED" == results[0][0]):
bret = True
logging.info("execute upgrade job failed, should run again")
else:
logging.warn("invalid job status: {0}".format(results[0][0]))
raise e
return bret
except Exception, e:
logging.warn("failed to check if upgrade job can run")
raise e
def check_upgrade_job_result(cur, version, max_used_job_id):
try:
times = 0
while (times <= 180):
sql = """select job_status from oceanbase.__all_rootservice_job
where job_type = 'RUN_UPGRADE_POST_JOB' and extra_info = '{0}'
and job_id > {1} order by job_id desc limit 1""".format(version, max_used_job_id)
results = query(cur, sql)
if (len(results) == 0):
logging.info("upgrade job not created yet")
elif (len(results) != 1 or len(results[0]) != 1):
logging.warn("row cnt not match")
raise e
elif ("INPROGRESS" == results[0][0]):
logging.info("upgrade job is still running")
elif ("SUCCESS" == results[0][0]):
logging.info("execute upgrade job successfully")
break;
elif ("FAILED" == results[0][0]):
logging.warn("execute upgrade job failed")
raise e
else:
logging.warn("invalid job status: {0}".format(results[0][0]))
raise e
times = times + 1
time.sleep(10)
except Exception, e:
logging.warn("failed to check upgrade job result")
raise e
def run_upgrade_job(conn, cur, version):
try:
logging.info("start to run upgrade job, version:{0}".format(version))
# pre check
if (check_can_run_upgrade_job(cur, version) == True):
conn.autocommit = True
# disable enable_ddl
ori_enable_ddl = actions.get_ori_enable_ddl(cur)
if ori_enable_ddl == 1:
actions.set_parameter(cur, 'enable_ddl', 'False')
# get max_used_job_id
max_used_job_id = get_max_used_job_id(cur)
# run upgrade job
sql = """alter system run upgrade job '{0}'""".format(version)
logging.info(sql)
cur.execute(sql)
# check upgrade job result
check_upgrade_job_result(cur, version, max_used_job_id)
# reset enable_ddl
if ori_enable_ddl == 1:
actions.set_parameter(cur, 'enable_ddl', 'True')
except Exception, e:
logging.warn("run upgrade job failed, version:{0}".format(version))
raise e
logging.info("run upgrade job success, version:{0}".format(version))
此差异已折叠。
此差异已折叠。
此差异已折叠。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import os
import time
import mysql.connector
from mysql.connector import errorcode
import logging
import getopt
class UpgradeParams:
log_filename = 'upgrade_cluster_health_checker.log'
#### --------------start : my_error.py --------------
class MyError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
#### --------------start : actions.py 只允许执行查询语句--------------
class QueryCursor:
__cursor = None
def __init__(self, cursor):
self.__cursor = cursor
def exec_sql(self, sql, print_when_succ = True):
try:
self.__cursor.execute(sql)
rowcount = self.__cursor.rowcount
if True == print_when_succ:
logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
return rowcount
except mysql.connector.Error, e:
logging.exception('mysql connector error, fail to execute sql: %s', sql)
raise e
except Exception, e:
logging.exception('normal error, fail to execute sql: %s', sql)
raise e
def exec_query(self, sql, print_when_succ = True):
try:
self.__cursor.execute(sql)
results = self.__cursor.fetchall()
rowcount = self.__cursor.rowcount
if True == print_when_succ:
logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
return (self.__cursor.description, results)
except mysql.connector.Error, e:
logging.exception('mysql connector error, fail to execute sql: %s', sql)
raise e
except Exception, e:
logging.exception('normal error, fail to execute sql: %s', sql)
raise e
#### ---------------end----------------------
#### --------------start : opt.py --------------
help_str = \
"""
Help:
""" +\
sys.argv[0] + """ [OPTIONS]""" +\
'\n\n' +\
'-I, --help Display this help and exit.\n' +\
'-V, --version Output version information and exit.\n' +\
'-h, --host=name Connect to host.\n' +\
'-P, --port=name Port number to use for connection.\n' +\
'-u, --user=name User for login.\n' +\
'-p, --password=name Password to use when connecting to server. If password is\n' +\
' not given it\'s empty string "".\n' +\
'-m, --module=name Modules to run. Modules should be a string combined by some of\n' +\
' the following strings: ddl, normal_dml, each_tenant_dml,\n' +\
' system_variable_dml, special_action, all. "all" represents\n' +\
' that all modules should be run. They are splitted by ",".\n' +\
' For example: -m all, or --module=ddl,normal_dml,special_action\n' +\
'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
'-t, --timeout=name check timeout, default: 600(s).\n' + \
'\n\n' +\
'Maybe you want to run cmd like that:\n' +\
sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'
version_str = """version 1.0.0"""
class Option:
__g_short_name_set = set([])
__g_long_name_set = set([])
__short_name = None
__long_name = None
__is_with_param = None
__is_local_opt = None
__has_value = None
__value = None
def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
if short_name in Option.__g_short_name_set:
raise MyError('duplicate option short name: {0}'.format(short_name))
elif long_name in Option.__g_long_name_set:
raise MyError('duplicate option long name: {0}'.format(long_name))
Option.__g_short_name_set.add(short_name)
Option.__g_long_name_set.add(long_name)
self.__short_name = short_name
self.__long_name = long_name
self.__is_with_param = is_with_param
self.__is_local_opt = is_local_opt
self.__has_value = False
if None != default_value:
self.set_value(default_value)
def is_with_param(self):
return self.__is_with_param
def get_short_name(self):
return self.__short_name
def get_long_name(self):
return self.__long_name
def has_value(self):
return self.__has_value
def get_value(self):
return self.__value
def set_value(self, value):
self.__value = value
self.__has_value = True
def is_local_opt(self):
return self.__is_local_opt
def is_valid(self):
return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value
g_opts =\
[\
Option('I', 'help', False, True),\
Option('V', 'version', False, True),\
Option('h', 'host', True, False),\
Option('P', 'port', True, False),\
Option('u', 'user', True, False),\
Option('p', 'password', True, False, ''),\
# 要跑哪个模块,默认全跑
Option('m', 'module', True, False, 'all'),\
# 日志文件路径,不同脚本的main函数中中会改成不同的默认值
Option('l', 'log-file', True, False),\
# 一些检查的超时时间,默认是600s
Option('t', 'timeout', True, False, '600')
]\
def change_opt_defult_value(opt_long_name, opt_default_val):
global g_opts
for opt in g_opts:
if opt.get_long_name() == opt_long_name:
opt.set_value(opt_default_val)
return
def has_no_local_opts():
global g_opts
no_local_opts = True
for opt in g_opts:
if opt.is_local_opt() and opt.has_value():
no_local_opts = False
return no_local_opts
def check_db_client_opts():
global g_opts
for opt in g_opts:
if not opt.is_local_opt() and not opt.has_value():
raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
.format(opt.get_short_name(), sys.argv[0]))
def parse_option(opt_name, opt_val):
global g_opts
for opt in g_opts:
if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
opt.set_value(opt_val)
def parse_options(argv):
global g_opts
short_opt_str = ''
long_opt_list = []
for opt in g_opts:
if opt.is_with_param():
short_opt_str += opt.get_short_name() + ':'
else:
short_opt_str += opt.get_short_name()
for opt in g_opts:
if opt.is_with_param():
long_opt_list.append(opt.get_long_name() + '=')
else:
long_opt_list.append(opt.get_long_name())
(opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
for (opt_name, opt_val) in opts:
parse_option(opt_name, opt_val)
if has_no_local_opts():
check_db_client_opts()
def deal_with_local_opt(opt):
if 'help' == opt.get_long_name():
global help_str
print help_str
elif 'version' == opt.get_long_name():
global version_str
print version_str
def deal_with_local_opts():
global g_opts
if has_no_local_opts():
raise MyError('no local options, can not deal with local options')
else:
for opt in g_opts:
if opt.is_local_opt() and opt.has_value():
deal_with_local_opt(opt)
# 只处理一个
return
def get_opt_host():
global g_opts
for opt in g_opts:
if 'host' == opt.get_long_name():
return opt.get_value()
def get_opt_port():
global g_opts
for opt in g_opts:
if 'port' == opt.get_long_name():
return opt.get_value()
def get_opt_user():
global g_opts
for opt in g_opts:
if 'user' == opt.get_long_name():
return opt.get_value()
def get_opt_password():
global g_opts
for opt in g_opts:
if 'password' == opt.get_long_name():
return opt.get_value()
def get_opt_module():
global g_opts
for opt in g_opts:
if 'module' == opt.get_long_name():
return opt.get_value()
def get_opt_log_file():
global g_opts
for opt in g_opts:
if 'log-file' == opt.get_long_name():
return opt.get_value()
def get_opt_timeout():
global g_opts
for opt in g_opts:
if 'timeout' == opt.get_long_name():
return opt.get_value()
#### ---------------end----------------------
#### --------------start : do_upgrade_pre.py--------------
def config_logging_module(log_filenamme):
logging.basicConfig(level=logging.INFO,\
format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
datefmt='%Y-%m-%d %H:%M:%S',\
filename=log_filenamme,\
filemode='w')
# 定义日志打印格式
formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
#######################################
# 定义一个Handler打印INFO及以上级别的日志到sys.stdout
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setLevel(logging.INFO)
# 设置日志打印格式
stdout_handler.setFormatter(formatter)
# 将定义好的stdout_handler日志handler添加到root logger
logging.getLogger('').addHandler(stdout_handler)
#### ---------------end----------------------
#### START ####
# 1. 检查paxos副本是否同步, paxos副本是否缺失
def check_paxos_replica(query_cur):
# 2.1 检查paxos副本是否同步
(desc, results) = query_cur.exec_query("""select count(1) as unsync_cnt from __all_virtual_clog_stat where is_in_sync = 0 and is_offline = 0 and replica_type != 16""")
if results[0][0] > 0 :
raise MyError('{0} replicas unsync, please check'.format(results[0][0]))
# 2.2 检查paxos副本是否有缺失 TODO
logging.info('check paxos replica success')
# 2. 检查是否有做balance, locality变更
def check_rebalance_task(query_cur):
# 3.1 检查是否有做locality变更
(desc, results) = query_cur.exec_query("""select count(1) as cnt from __all_rootservice_job where job_status='INPROGRESS' and return_code is null""")
if results[0][0] > 0 :
raise MyError('{0} locality tasks is doing, please check'.format(results[0][0]))
# 3.2 检查是否有做balance
(desc, results) = query_cur.exec_query("""select count(1) as rebalance_task_cnt from __all_virtual_rebalance_task_stat""")
if results[0][0] > 0 :
raise MyError('{0} rebalance tasks is doing, please check'.format(results[0][0]))
logging.info('check rebalance task success')
# 3. 检查集群状态
def check_cluster_status(query_cur):
# 4.1 检查是否非合并状态
(desc, results) = query_cur.exec_query("""select info from __all_zone where zone='' and name='merge_status'""")
if cmp(results[0][0], 'IDLE') != 0 :
raise MyError('global status expected = {0}, actual = {1}'.format('IDLE', results[0][0]))
logging.info('check cluster status success')
# 4.2 检查合并版本是否>=3
(desc, results) = query_cur.exec_query("""select cast(value as unsigned) value from __all_zone where zone='' and name='last_merged_version'""")
if results[0][0] < 2:
raise MyError('global last_merged_version expected >= 2 actual = {0}'.format(results[0][0]))
logging.info('check global last_merged_version success')
# 4. 检查租户分区数是否超出内存限制
def check_tenant_part_num(query_cur):
# 统计每个租户在各个server上的分区数量
(desc, res_part_num) = query_cur.exec_query("""select svr_ip, svr_port, table_id >> 40 as tenant_id, count(*) as part_num from __all_virtual_clog_stat group by 1,2,3 order by 1,2,3""")
# 计算每个租户在每个server上的max_memory
(desc, res_unit_memory) = query_cur.exec_query("""select u.svr_ip, u.svr_port, t.tenant_id, uc.max_memory, p.replica_type from __all_unit u, __All_resource_pool p, __all_tenant t, __all_unit_config uc where p.resource_pool_id = u.resource_pool_id and t.tenant_id = p.tenant_id and p.unit_config_id = uc.unit_config_id""")
# 查询每个server的memstore_limit_percentage
(desc, res_svr_memstore_percent) = query_cur.exec_query("""select svr_ip, svr_port, name, value from __all_virtual_sys_parameter_stat where name = 'memstore_limit_percentage'""")
part_static_cost = 128 * 1024
part_dynamic_cost = 400 * 1024
# 考虑到升级过程中可能有建表的需求,因此预留512个分区
part_num_reserved = 512
for line in res_part_num:
svr_ip = line[0]
svr_port = line[1]
tenant_id = line[2]
part_num = line[3]
for uline in res_unit_memory:
uip = uline[0]
uport = uline[1]
utid = uline[2]
umem = uline[3]
utype = uline[4]
if svr_ip == uip and svr_port == uport and tenant_id == utid:
for mpline in res_svr_memstore_percent:
mpip = mpline[0]
mpport = mpline[1]
if mpip == uip and mpport == uport:
mspercent = int(mpline[3])
mem_limit = umem
if 0 == utype:
# full类型的unit需要为memstore预留内存
mem_limit = umem * (100 - mspercent) / 100
part_num_limit = mem_limit / (part_static_cost + part_dynamic_cost / 10);
if part_num_limit <= 1000:
part_num_limit = mem_limit / (part_static_cost + part_dynamic_cost)
if part_num >= (part_num_limit - part_num_reserved):
raise MyError('{0} {1} {2} exceed tenant partition num limit, please check'.format(line, uline, mpline))
break
logging.info('check tenant partition num success')
# 5. 检查存在租户partition,但是不存在unit的observer
def check_tenant_resource(query_cur):
(desc, res_unit) = query_cur.exec_query("""select tenant_id, svr_ip, svr_port from __all_virtual_partition_info where (tenant_id, svr_ip, svr_port) not in (select tenant_id, svr_ip, svr_port from __all_unit, __all_resource_pool where __all_unit.resource_pool_id = __all_resource_pool.resource_pool_id group by tenant_id, svr_ip, svr_port) group by tenant_id, svr_ip, svr_port""")
for line in res_unit:
raise MyError('{0} tenant unit not exist but partition exist'.format(line))
logging.info("check tenant resource success")
# 6. 检查progressive_merge_round都升到1
def check_progressive_merge_round(query_cur):
(desc, results) = query_cur.exec_query("""select count(*) as cnt from __all_virtual_table where progressive_merge_round = 0 and table_type not in (1,2,4) and data_table_id = 0""")
if results[0][0] != 0:
raise MyError("""progressive_merge_round of main table should all be 1""")
(desc, results) = query_cur.exec_query("""select count(*) as cnt from __all_virtual_table where progressive_merge_round = 0 and table_type not in (1,2,4) and data_table_id > 0 and data_table_id in (select table_id from __all_virtual_table where table_type not in (1,2,4) and data_table_id = 0)""")
if results[0][0] != 0:
raise MyError("""progressive_merge_round of index should all be 1""")
logging.info("""check progressive_merge_round status success""")
# 主库状态检查
def check_primary_cluster_sync_status(query_cur, timeout):
(desc, res) = query_cur.exec_query("""select current_scn from oceanbase.v$ob_cluster where cluster_role='PRIMARY' and cluster_status='VALID'""")
if len(res) != 1:
raise MyError('query results count is not 1')
query_sql = "select count(*) from oceanbase.v$ob_standby_status where cluster_role != 'PHYSICAL STANDBY' or cluster_status != 'VALID' or current_scn < {0}".format(res[0][0]);
times = timeout
print times
while times > 0 :
(desc, res1) = query_cur.exec_query(query_sql)
if len(res1) == 1 and res1[0][0] == 0:
break;
time.sleep(1)
times -=1
if times == 0:
raise MyError("there exists standby cluster not synchronizing, checking primary cluster status failed!!!")
else:
logging.info("check primary cluster sync status success")
# 备库状态检查
def check_standby_cluster_sync_status(query_cur, timeout):
(desc, res) = query_cur.exec_query("""select time_to_usec(now(6)) from dual""")
query_sql = "select count(*) from oceanbase.v$ob_cluster where (cluster_role != 'PHYSICAL STANDBY') or (cluster_status != 'VALID') or (current_scn < {0}) or (switchover_status != 'NOT ALLOWED')".format(res[0][0]);
times = timeout
while times > 0 :
(desc, res2) = query_cur.exec_query(query_sql)
if len(res2) == 1 and res2[0][0] == 0:
break
time.sleep(1)
times -= 1
if times == 0:
raise MyError('current standby cluster not synchronizing, please check!!!')
else:
logging.info("check standby cluster sync status success")
# 判断是主库还是备库
def check_cluster_sync_status(query_cur, timeout):
(desc, res) = query_cur.exec_query("""select cluster_role from oceanbase.v$ob_cluster""")
if res[0][0] == 'PRIMARY':
check_primary_cluster_sync_status(query_cur, timeout)
else:
check_standby_cluster_sync_status(query_cur, timeout)
# 开始升级前的检查
def do_check(my_host, my_port, my_user, my_passwd, upgrade_params, timeout):
try:
conn = mysql.connector.connect(user = my_user,
password = my_passwd,
host = my_host,
port = my_port,
database = 'oceanbase',
raise_on_warnings = True)
conn.autocommit = True
cur = conn.cursor(buffered=True)
try:
query_cur = QueryCursor(cur)
check_paxos_replica(query_cur)
check_rebalance_task(query_cur)
check_cluster_status(query_cur)
check_tenant_part_num(query_cur)
check_tenant_resource(query_cur)
check_cluster_sync_status(query_cur, timeout)
except Exception, e:
logging.exception('run error')
raise e
finally:
cur.close()
conn.close()
except mysql.connector.Error, e:
logging.exception('connection error')
raise e
except Exception, e:
logging.exception('normal error')
raise e
if __name__ == '__main__':
upgrade_params = UpgradeParams()
change_opt_defult_value('log-file', upgrade_params.log_filename)
parse_options(sys.argv[1:])
if not has_no_local_opts():
deal_with_local_opts()
else:
check_db_client_opts()
log_filename = get_opt_log_file()
upgrade_params.log_filename = log_filename
# 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
config_logging_module(upgrade_params.log_filename)
try:
host = get_opt_host()
port = int(get_opt_port())
user = get_opt_user()
password = get_opt_password()
timeout = int(get_opt_timeout())
logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", log-file=\"%s\", timeout=%s', \
host, port, user, password, log_filename, timeout)
do_check(host, port, user, password, upgrade_params, timeout)
except mysql.connector.Error, e:
logging.exception('mysql connctor error')
raise e
except Exception, e:
logging.exception('normal error')
raise e
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册