# coding: utf-8 # OceanBase Deploy. # Copyright (C) 2021 OceanBase # # This file is part of OceanBase Deploy. # # OceanBase Deploy is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # OceanBase Deploy is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with OceanBase Deploy. If not, see . from __future__ import absolute_import, division, print_function import re import time from copy import deepcopy from _stdio import SafeStdio SQL_FILE = "exec_sql_file" EXEC_SQL = "exec_sql" VARIABLES = 'variables' SYSTEM_CONFIG = 'system_config' class OptimizeItem(object): class OptimizeItemType(object): TYPE_STR = None def __init__(self, s): try: self._origin = s self._value = 0 self._format() except: raise Exception("'%s' is not %s" % (self._origin, self._type_str)) @property def _type_str(self): if self.TYPE_STR is None: self.TYPE_STR = str(self.__class__.__name__).split('.')[-1] return self.TYPE_STR def _format(self): raise NotImplementedError def __str__(self): return str(self._origin) def __repr__(self): return self.__str__() def __hash__(self): return self._origin.__hash__() @property def __cmp_value__(self): return self._value def __eq__(self, value): if value is None: return False return self.__cmp_value__ == value.__cmp_value__ def __gt__(self, value): if value is None: return True return self.__cmp_value__ > value.__cmp_value__ def __ge__(self, value): if value is None: return True return self.__eq__(value) or self.__gt__(value) def __lt__(self, value): if value is None: return False return self.__cmp_value__ < value.__cmp_value__ def __le__(self, value): if value is None: return False return self.__eq__(value) or self.__lt__(value) class Moment(OptimizeItemType): def _format(self): if self._origin: if self._origin.upper() == 'DISABLE': self._value = 0 else: r = re.match('^(\d{1,2}):(\d{1,2})$', self._origin) h, m = r.groups() h, m = int(h), int(m) if 0 <= h <= 23 and 0 <= m <= 60: self._value = h * 60 + m else: raise Exception('Invalid Value') else: self._value = 0 class Time(OptimizeItemType): UNITS = { 'ns': 0.000000001, 'us': 0.000001, 'ms': 0.001, 's': 1, 'm': 60, 'h': 3600, 'd': 86400 } def _format(self): if self._origin: self._origin = str(self._origin).strip() if self._origin.isdigit(): n = self._origin unit = self.UNITS['s'] else: r = re.match('^(\d+)(\w+)$', self._origin.lower()) n, u = r.groups() unit = self.UNITS.get(u.lower()) if unit: self._value = int(n) * unit else: raise Exception('Invalid Value') else: self._value = 0 class Capacity(OptimizeItemType): UNITS = {"B": 1, "K": 1 << 10, "M": 1 << 20, "G": 1 << 30, "T": 1 << 40, 'P': 1 << 50} def _format(self): if self._origin: self._origin = str(self._origin).strip() if self._origin.isdigit(): n = self._origin unit = self.UNITS['M'] else: r = re.match('^(\d+)(\w)B?$', self._origin.upper()) n, u = r.groups() unit = self.UNITS.get(u.upper()) if unit: self._value = int(n) * unit else: raise Exception('Invalid Value') else: self._value = 0 class StringList(OptimizeItemType): def _format(self): if self._origin: self._origin = str(self._origin).strip() self._value = self._origin.split(';') else: self._value = [] class Double(OptimizeItemType): def _format(self): self._value = float(self._origin) if self._origin else 0 class Boolean(OptimizeItemType): def _format(self): if isinstance(self._origin, bool): self._value = self._origin else: _origin = str(self._origin).lower() if _origin == 'true': self._value = True elif _origin == 'false': self._value = False elif _origin.isdigit(): self._value = bool(self._origin) else: raise Exception('%s is not Boolean' % _origin) class Integer(OptimizeItemType): def _format(self): if self._origin is None: self._value = 0 self._origin = 0 else: _origin = str(self._origin) try: self.value = self._value = int(_origin) except: raise Exception('%s is not Integer' % _origin) class String(OptimizeItemType): def _format(self): self._value = str(self._origin) if self._origin else '' class SqlFile(object): def __init__(self, path, entrance, sys=False, **kwargs): self.path = path self.entrance = entrance self.exec_by_sys = sys self.extra_kwargs = kwargs self.need_restart = False def _get_sql(self, **kwargs): sql_kwargs = deepcopy(self.entrance.envs) sql_kwargs.update(sql_file=self.path, **self.extra_kwargs) if self.exec_by_sys: sql_kwargs.update( tenant=sql_kwargs.get('sys_tenant', 'sys'), user=sql_kwargs.get('sys_user', 'root'), password=sql_kwargs.get('sys_password', ""), database="" ) sql_kwargs.update(kwargs) passwd = sql_kwargs.get('password', "") database = sql_kwargs.get('database', "") sql_kwargs['pass_str'] = '-p{}'.format(passwd) if passwd else "" sql_kwargs['db_str'] = '-D{}'.format(database) if database else "" return "{obclient_bin} -h{host} -P{port} -u{user}@{tenant} {pass_str} {db_str} -A < {sql_file}".format( **sql_kwargs) def optimize(self, client, stdio=None, **kwargs): ret = client.execute_command(self._get_sql(**kwargs), stdio=stdio) if ret: return True else: raise Exception('failed to execute {} {}'.format(self.path, ret.stderr)) def recover(self, *args, **kwargs): return None class Variable(object): TYPES = { 'DOUBLE': OptimizeItem.Double, 'BOOL': OptimizeItem.Boolean, 'INT': OptimizeItem.Integer, 'STRING': OptimizeItem.String, 'MOMENT': OptimizeItem.Moment, 'TIME': OptimizeItem.Time, 'CAPACITY': OptimizeItem.Capacity, 'STRING_LIST': OptimizeItem.StringList } def __init__(self, value, entrance, name=None, value_type=None, condition="lambda n, o: n != o", optimizer="default", expression=False, query_key='value', **kwargs): self.entrance = entrance self.name = name self._value = value self._origin_value = None self._value_type = value_type self._condition = condition self.extra_kwargs = kwargs self._optimizer_name = optimizer self._optimizer = None self.need_restart = False self.expression = expression self.query_key = query_key @property def value_type(self): if self._value_type in self.TYPES: return self.TYPES[self._value_type] @property def origin_value(self): return self._origin_value @origin_value.setter def origin_value(self, value): if not self.value_type and isinstance(value, str) and value.isdigit(): value = int(value) if self.value_type: value = self.value_type(value) self._origin_value = value @property def optimizer(self): if not self._optimizer: self._optimizer = self.entrance.get_optimizer(self._optimizer_name) return self._optimizer def optimize_arguments(self, **kwargs): ret = deepcopy(self.entrance.envs) ret.update(name=self.name, value=self.value, **self.extra_kwargs) ret.update(kwargs) return ret def optimize(self, cursor, stdio=None, **kwargs): if self.name == 'sleep': stdio.verbose('sleep {}'.format(self.value)) time.sleep(self.value) return True kwargs = self.optimize_arguments(**kwargs) ret = self.optimizer.query(cursor, stdio=stdio, **kwargs) if not ret or ret.get(self.query_key) is None: stdio.warn('failed to query {}, skip it.'.format(self.name)) return False self.origin_value = ret[self.query_key] stdio.verbose('origin_value {}({}) target_value {}({}) condition {}'.format( self.origin_value, type(self.origin_value).__name__, self.value, type(self.value).__name__, self._condition)) if self.meet_the_condition(self.value, self.origin_value): if not self.optimizer.modify(cursor, stdio=stdio, **kwargs): raise Exception('fail to optimize {} to {}'.format(self.name, self.value)) return True def recover(self, cursor, stdio=None, **kwargs): if self.name == 'sleep': stdio.verbose('sleep {}'.format(self.value)) time.sleep(self.value) return True if self.origin_value is None: return True return self.optimizer.modify(cursor, stdio=stdio, **self.optimize_arguments(value=self.origin_value, **kwargs)) @property def value(self): if self.expression: value = eval(self._value, {}, self.entrance.envs) else: value = self._value if not self.value_type and isinstance(value, str) and value.isdigit(): return int(value) # self.value_type if self.value_type: value = self.value_type(value) return value def meet_the_condition(self, new_value, old_value): try: envs = deepcopy(self.entrance.envs) _condition = eval(self._condition, envs, envs) envs.update( new_value=new_value, old_value=old_value, _condition=_condition ) exec("ret = _condition(new_value, old_value)", envs, envs) ret = envs.get('ret') return ret except: raise Exception("Invalid condition: {}".format(self._condition)) class ExecSql(Variable): OPTIMIZE_TYPE = EXEC_SQL def __init__(self, name, entrance, value=None, **kwargs): super(ExecSql, self).__init__(name=name, entrance=entrance, value=value, **kwargs) def optimize(self, cursor, stdio=None, **kwargs): kwargs = self.optimize_arguments(**kwargs) if not self.optimizer.modify(cursor, stdio=stdio, **kwargs): raise Exception('fail to exec sql {} '.format(self.name)) return True def recover(self, cursor, stdio=None, **kwargs): return class SystemConfig(Variable): OPTIMIZE_TYPE = SYSTEM_CONFIG def __init__(self, name, entrance, value, need_restart=False, **kwargs): super(SystemConfig, self).__init__(name=name, entrance=entrance, value=value, **kwargs) self.need_restart = need_restart def optimize_arguments(self, **kwargs): ret = deepcopy(self.entrance.envs) ret.update(name=self.name, value=self.value, **self.extra_kwargs) ret['tenant_where'] = 'tenant="{}"'.format(self.entrance.envs.get('tenant')) ret.update(kwargs) return ret class OptimizeObjectEntrance(object): OPTIMIZE_TYPE = None ITEM_CLASS = None def __init__(self, kwargs_list): """ :param kwargs_list: [ {'key1': value1, 'key2': value2} ] """ self._optimize_config = None self.items = [] self.items_done = [] self.optimizer_map = {} for kwargs in kwargs_list: self.items.append(self.ITEM_CLASS(entrance=self, **kwargs)) self._need_restart = False @property def envs(self): return self._optimize_config.envs def optimize(self, stdio=None, disable_restart=False, *args, **kwargs): try: for item in self.items: if disable_restart and item.need_restart: continue ret = item.optimize(*args, stdio=stdio, **kwargs) if ret: self.items_done.append(item) if item.need_restart: self._need_restart = True return True except: stdio.exception('Failed to optimize {}'.format(self.OPTIMIZE_TYPE)) return False def recover(self, stdio=None, *args, **kwargs): try: for item in self.items_done[::-1]: item.recover(stdio=stdio, *args, **kwargs) if item.need_restart: self._need_restart = True except: stdio.exception('Failed to recover {}'.format(self.OPTIMIZE_TYPE)) def mark_relationship(self, optimizers=None, optimize_config=None): self.optimizer_map = optimizers self._optimize_config = optimize_config def get_optimizer(self, name='default'): return self.optimizer_map[name] @property def need_restart(self): return self._need_restart class VariableEntrance(OptimizeObjectEntrance): ITEM_CLASS = Variable OPTIMIZE_TYPE = VARIABLES class SystemConfigEntrance(OptimizeObjectEntrance): ITEM_CLASS = SystemConfig OPTIMIZE_TYPE = SYSTEM_CONFIG class SqlFileEntrance(OptimizeObjectEntrance): ITEM_CLASS = SqlFile OPTIMIZE_TYPE = SQL_FILE class ExecSqlEntrance(OptimizeObjectEntrance): ITEM_CLASS = ExecSql OPTIMIZE_TYPE = EXEC_SQL OptimizeObjectMap = { VARIABLES: VariableEntrance, SYSTEM_CONFIG: SystemConfigEntrance, EXEC_SQL: ExecSqlEntrance, SQL_FILE: SqlFileEntrance } class OptimizeConfig(SafeStdio): def __init__(self, config_dict, optimizer_dict): """ :param config_dict: { : : : OptimizeObject } :param optimizer_dict: { : : : Optimizer } """ self.config_dict = config_dict self.optimizer_dict = optimizer_dict self.envs = {} def set_exec_sql(self, component, stage, sql_kwargs_list, stdio=None): """ :param component: :param stage: :param sql_kwargs_list: [ { "path": "", "sys": false # execute sql file in sys tenant } ] :return: """ stdio.verbose( 'Set the optimization of the component {} in the {} stage to execute the sql file'.format(component, stage)) self.config_dict[component][stage] = {SQL_FILE: SqlFileEntrance(kwargs_list=sql_kwargs_list)} def get_optimize_entrances(self, component, stage): optimize_entrance_map = self.config_dict.get(component, {}).get(stage, {}) optimize_entrances = [] for optimize_entrance in optimize_entrance_map.values(): optimize_entrance.mark_relationship(optimize_config=self, optimizers=self._get_optimizer_map(component, optimize_entrance.OPTIMIZE_TYPE)) optimize_entrances.append(optimize_entrance) return optimize_entrances def _get_optimizer_map(self, component, optimize_type): return self.optimizer_dict.get(component, {}).get(optimize_type, {}) def set_envs(self, envs): self.envs = envs class CursorOptimizer(SafeStdio): def __init__(self, query_sql=None, modify_sql=None, *args, **kwargs): super(CursorOptimizer, self).__init__(*args, **kwargs) self.query_sql = query_sql self.origin_modify_sql = modify_sql self._modify_sql = None def query(self, cursor, stdio=None, *args, **kwargs): if not self.query_sql: stdio.verbose('no query sql') return try: sql = self.query_sql.format(**kwargs) stdio.verbose('execute sql: %s' % sql) cursor.execute(sql) ret = cursor.fetchone() return ret except: stdio.exception("") @property def modify_sql(self): if self._modify_sql is None: self._modify_sql = self.origin_modify_sql.replace('{value}', '%s') return self._modify_sql def modify(self, cursor, stdio=None, value=None, *args, **kwargs): if not self.modify_sql: stdio.verbose('no modify sql') return try: sql = self.modify_sql.format(**kwargs) cursor_args = None if value is not None: cursor_args = (value, ) stdio.verbose('execute sql: %s' % (sql % cursor_args)) else: stdio.verbose('execute sql: %s' % sql) cursor.execute(sql, cursor_args) cursor.fetchone() return True except: stdio.exception("") return False class OptimizeParser(SafeStdio): def __init__(self): self.optimize_config_dict = {} self.optimizer_dict = {} self._optimize_config = None @property def optimize_config(self): if not self._optimize_config: self._optimize_config = OptimizeConfig(self.optimize_config_dict, self.optimizer_dict) return self._optimize_config def load(self, config, stdio=None): try: # load optimizer optimizer = config.get('optimizer', {}) for component, type_optimizer_map in optimizer.items(): self._load_optimizer_by_component(component, type_optimizer_map) except: stdio.exception('Failed to load optimizer') return False try: # load optimize config optimize_config = config.get('optimize_config', {}) for component, stage_dict in optimize_config.items(): self._load_config_by_component(component, stage_dict) except: stdio.exception('Failed to load optimize config') return False return True def _load_config_by_component(self, component, stage_dict): # replace self.optimize_config_dict[component] = {} for stage, optimize_item in stage_dict.items(): self.optimize_config_dict[component][stage] = {} for optimize_type, content in optimize_item.items(): if optimize_type in OptimizeObjectMap: self.optimize_config_dict[component][stage][optimize_type] = OptimizeObjectMap[optimize_type]( content) else: raise Exception('Invalid optimize_type {}'.format(optimize_type)) def _load_optimizer_by_component(self, component, optimizer_dict): # override self.optimizer_dict[component] = self.optimizer_dict.get(component, {}) for optimize_type, name_optimizer_map in optimizer_dict.items(): component_optimizer_map = self.optimizer_dict[component].get(optimize_type, {}) for name, optimizer_kwargs in name_optimizer_map.items(): component_optimizer_map[name] = CursorOptimizer(**optimizer_kwargs) self.optimizer_dict[component][optimize_type] = component_optimizer_map def load_config_by_component(self, component, config, stdio=None): try: self._load_config_by_component(component, config) return True except: stdio.exception('Failed to load optimize config for component {}'.format(component)) return False def load_optimizer_by_component(self, component, optimizers, stdio=None): try: self._load_optimizer_by_component(component, optimizers) return True except: stdio.exception('Failed to load optimizer for component {}'.format(component)) return False