upgrade_sys_vars.py 18.3 KB
Newer Older
L
LINxiansheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import new
import time
import re
import json
import traceback
import sys
import mysql.connector
from mysql.connector import errorcode
import logging
from my_error import MyError
import actions
from actions import DMLCursor
from actions import QueryCursor
from sys_vars_dict import sys_var_dict
import my_utils


# 由于用了/*+read_consistency(WEAK) */来查询,因此升级期间不能允许创建或删除租户

def calc_diff_sys_var(cur, tenant_id):
  try:
    change_tenant(cur, tenant_id)
    actual_tenant_id = get_actual_tenant_id(tenant_id)
    cur.execute("""select name, data_type, value, info, flags, min_val, max_val from __all_sys_variable_history where tenant_id=%s and (tenant_id, zone, name, schema_version) in (select tenant_id, zone, name, max(schema_version) from __all_sys_variable_history where tenant_id=%s group by tenant_id, zone, name);"""%(actual_tenant_id, actual_tenant_id))
    results = cur.fetchall()
    logging.info('there has %s system variable of tenant id %d', len(results), tenant_id)
    update_sys_var_list = []
    update_sys_var_ori_list = []
    add_sys_var_list = []
    for r in results:
      if sys_var_dict.has_key(r[0]):
        sys_var = sys_var_dict[r[0]]
        if long(sys_var["data_type"]) != long(r[1]) or sys_var["info"].strip() != r[3].strip() or long(sys_var["flags"]) != long(r[4]) or ("min_val" in sys_var.keys() and sys_var["min_val"] != r[5]) or ("max_val" in sys_var.keys() and sys_var["max_val"] != r[6]):
          update_sys_var_list.append(sys_var)
          update_sys_var_ori_list.append(r)
    for (name, sys_var) in sys_var_dict.items():
      sys_var_exist = 0
      for r in results:
        if r[0] == sys_var["name"]:
          sys_var_exist = 1
          break
      if 0 == sys_var_exist:
        add_sys_var_list.append(sys_var)
    # reset
    sys_tenant_id = 1
    change_tenant(cur, sys_tenant_id)
    return (update_sys_var_list, update_sys_var_ori_list, add_sys_var_list)
  except Exception, e:
    logging.exception('fail to calc diff sys var')
    raise e

def gen_update_sys_var_sql_for_tenant(tenant_id, sys_var):
  actual_tenant_id = get_actual_tenant_id(tenant_id)
  update_sql = 'update oceanbase.__all_sys_variable set data_type = ' + str(sys_var["data_type"])\
      + ', info = \'' + sys_var["info"].strip() + '\', flags = ' + str(sys_var["flags"])
  update_sql = update_sql\
      + ((', min_val = \'' + sys_var["min_val"] + '\'') if "min_val" in sys_var.keys() else '')\
      + ((', max_val = \'' + sys_var["max_val"] + '\'') if "max_val" in sys_var.keys() else '')
  update_sql = update_sql + ' where tenant_id = ' + str(actual_tenant_id) + ' and name = \'' + sys_var["name"] + '\''
  return update_sql

def gen_update_sys_var_history_sql_for_tenant(dml_cur, tenant_id, sys_var):
  try:
    actual_tenant_id = get_actual_tenant_id(tenant_id)
    (desc, results) = dml_cur.exec_query("""select schema_version from oceanbase.__all_sys_variable_history
                                            where tenant_id = {0} and name = '{1}'
                                            order by schema_version desc limit 1"""
                                            .format(actual_tenant_id, sys_var["name"]))
    schema_version = results[0][0]
    (desc, results) = dml_cur.exec_query("""select value from __all_sys_variable where tenant_id={0} and name='{1}' limit 1"""
                                         .format(actual_tenant_id, sys_var["name"]))
    res_len = len(results)
    if res_len != 1:
      logging.error('fail to get value from __all_sys_variable, result count:'+ str(res_len))
      raise MyError('fail to get value from __all_sys_variable')
    value = results[0][0]
    min_val = sys_var["min_val"] if "min_val" in sys_var.keys() else ''
    max_val = sys_var["max_val"] if "max_val" in sys_var.keys() else ''
    replace_sql = """replace into oceanbase.__all_sys_variable_history(
                          tenant_id,
                          zone,
                          name,
                          schema_version,
                          is_deleted,
                          data_type,
                          value,
                          info,
                          flags,
                          min_val,
                          max_val)
                      values(%d, '', '%s', %d, 0, %d, '%s', '%s', %d, '%s', '%s')
                  """%(actual_tenant_id, sys_var["name"], schema_version, sys_var["data_type"], value, sys_var["info"], sys_var["flags"], min_val, max_val)
    return replace_sql
  except Exception, e:
    logging.exception('fail to gen replace sys var history sql')
    raise e

def gen_replace_sys_var_history_sql_for_tenant(dml_cur, tenant_id, sys_var):
  try:
    actual_tenant_id = get_actual_tenant_id(tenant_id)
    (desc, results) = dml_cur.exec_query("""select schema_version from oceanbase.__all_sys_variable_history
                                            where tenant_id={0} order by schema_version asc limit 1""".format(actual_tenant_id))
    schema_version = results[0][0]
    min_val = sys_var["min_val"] if "min_val" in sys_var.keys() else ''
    max_val = sys_var["max_val"] if "max_val" in sys_var.keys() else ''
    replace_sql = """replace into oceanbase.__all_sys_variable_history(
                          tenant_id,
                          zone,
                          name,
                          schema_version,
                          is_deleted,
                          data_type,
                          value,
                          info,
                          flags,
                          min_val,
                          max_val)
                      values(%d, '', '%s', %d, 0, %d, '%s', '%s', %d, '%s', '%s')
                  """%(actual_tenant_id, sys_var["name"], schema_version, sys_var["data_type"], sys_var["value"], sys_var["info"], sys_var["flags"], min_val, max_val)
    return replace_sql
  except Exception, e:
    logging.exception('fail to gen replace sys var history sql')
    raise e


def gen_sys_var_update_sqls_for_tenant(query_cur, tenant_id, update_sys_var_list):
  update_sqls = ''
  for i in range(0, len(update_sys_var_list)):
    sys_var = update_sys_var_list[i]
    if i > 0:
      update_sqls += '\n'
    update_sqls += gen_update_sys_var_sql_for_tenant(tenant_id, sys_var) + ';\n'
    update_sqls += gen_update_sys_var_history_sql_for_tenant(query_cur, tenant_id, sys_var) + ';'
  return update_sqls

def update_sys_vars_for_tenant(dml_cur, tenant_id, update_sys_var_list):
  try:
    for i in range(0, len(update_sys_var_list)):
      sys_var = update_sys_var_list[i]
      update_sql = gen_update_sys_var_sql_for_tenant(tenant_id, sys_var)
      rowcount = dml_cur.exec_update(update_sql)
      if 1 != rowcount:
        # 以history为准,考虑可重入,此处不校验__all_sys_variable的更新结果
        logging.info('sys var not change, just skip, sql: %s, tenant_id: %d', update_sql, tenant_id)
      else:
        logging.info('succeed to update sys var for tenant, sql: %s, tenant_id: %d', update_sql, tenant_id)
#replace update sys var to __all_sys_variable_history
      replace_sql = gen_update_sys_var_history_sql_for_tenant(dml_cur, tenant_id, sys_var)
      rowcount = dml_cur.exec_update(replace_sql)
      if 1 != rowcount and 2 != rowcount:
        logging.error('fail to replace sysvar, replace_sql:%s'%replace_sql)
        raise MyError('fail to repalce sysvar')
      else:
        logging.info('succeed to replace sys var history for tenant, sql: %s, tenant_id: %d', replace_sql, tenant_id)
  except Exception, e:
    logging.exception('fail to update for tenant, tenant_id: %d', tenant_id)
    raise e

def gen_add_sys_var_sql_for_tenant(tenant_id, sys_var):
  actual_tenant_id = get_actual_tenant_id(tenant_id)
  add_sql = 'replace into oceanbase.__all_sys_variable(tenant_id, zone, name, data_type, value, info, flags, min_val, max_val) values('\
      + str(actual_tenant_id) +', \'\', \'' + sys_var["name"] + '\', ' + str(sys_var["data_type"]) + ', \'' + sys_var["value"] + '\', \''\
      + sys_var["info"].strip() + '\', ' + str(sys_var["flags"]) + ', \''
  add_sql = add_sql + (sys_var["min_val"] if "min_val" in sys_var.keys() else '') + '\', \''\
      + (sys_var["max_val"] if "max_val" in sys_var.keys() else '') + '\')'
  return add_sql

def gen_sys_var_add_sqls_for_tenant(query_cur, tenant_id, add_sys_var_list):
  add_sqls = ''
  for i in range(0, len(add_sys_var_list)):
    sys_var = add_sys_var_list[i]
    if i > 0:
      add_sqls += '\n'
    add_sqls += gen_add_sys_var_sql_for_tenant(tenant_id, sys_var) + ';\n'
    add_sqls += gen_replace_sys_var_history_sql_for_tenant(query_cur, tenant_id, sys_var) + ';'
  return add_sqls

def add_sys_vars_for_tenant(dml_cur, tenant_id, add_sys_var_list):
  try:
    for i in range(0, len(add_sys_var_list)):
      sys_var = add_sys_var_list[i]
      add_sql = gen_add_sys_var_sql_for_tenant(tenant_id, sys_var)
      rowcount = dml_cur.exec_update(add_sql)
      if 1 != rowcount:
        # 以history为准,考虑可重入,此处不校验__all_sys_variable的更新结果
        logging.info('sys var not change, just skip, sql: %s, tenant_id: %d', update_sql, tenant_id)
      else:
        logging.info('succeed to insert sys var for tenant, sql: %s, tenant_id: %d', add_sql, tenant_id)
      replace_sql = gen_replace_sys_var_history_sql_for_tenant(dml_cur, tenant_id, sys_var)
      rowcount = dml_cur.exec_update(replace_sql)
      if 1 != rowcount:
        logging.error('fail to replace system variable history, sql:%s'%replace_sql)
        raise MyError('fail to replace system variable history')
      else:
        logging.info('succeed to replace sys var for tenant, sql: %s, tenant_id: %d', replace_sql, tenant_id)
  except Exception, e:
    logging.exception('fail to add for tenant, tenant_id: %d', tenant_id)
    raise e


def gen_sys_var_special_update_sqls_for_tenant(tenant_id):
  special_update_sqls = ''
  return special_update_sqls

def special_update_sys_vars_for_tenant(dml_cur, tenant_id, add_sys_var_list, sys_var_name, sys_var_value):
  try:
    sys_var = None
    for i in range(0, len(add_sys_var_list)):
      if (sys_var_name == add_sys_var_list[i]["name"]):
        sys_var = add_sys_var_list[i]
        break;

    if None == sys_var:
      logging.info('%s is not new, no need special update again', sys_var_name)
      return

    sys_var["value"] = sys_var_value;
    update_sql = gen_update_sys_var_value_sql_for_tenant(tenant_id, sys_var)
    rowcount = dml_cur.exec_update(update_sql)
    if 1 != rowcount:
      # 以history为准,考虑可重入,此处不校验__all_sys_variable的更新结果
      logging.info('sys var not change, just skip, sql: %s, tenant_id: %d', update_sql, tenant_id)
    else:
      logging.info('succeed to update sys var for tenant, sql: %s, tenant_id: %d', update_sql, tenant_id)
    #replace update sys var to __all_sys_variable_history
    replace_sql = gen_update_sys_var_history_sql_for_tenant(dml_cur, tenant_id, sys_var)
    rowcount = dml_cur.exec_update(replace_sql)
    if 1 != rowcount and 2 != rowcount:
      logging.error('fail to replace sysvar, replace_sql:%s'%replace_sql)
      raise MyError('fail to repalce sysvar')
    else:
      logging.info('succeed to replace sys var history for tenant, sql: %s, tenant_id: %d', replace_sql, tenant_id)
  except Exception, e:
    logging.exception('fail to add for tenant, tenant_id: %d', tenant_id)
    raise e

def get_sys_vars_upgrade_dmls_str(cur, query_cur, tenant_id_list, update_sys_var_list, add_sys_var_list):
  ret_str = ''
  if len(tenant_id_list) <= 0:
    logging.error('distinct tenant id count is <= 0, tenant_id_count: %d', len(tenant_id_list))
    raise MyError('invalid arg')
  for i in range(0, len(tenant_id_list)):
    tenant_id = tenant_id_list[i]
    change_tenant(cur, tenant_id)
    if i > 0:
      ret_str += '\n'
    ret_str += gen_sys_var_update_sqls_for_tenant(query_cur, tenant_id, update_sys_var_list)
  if ret_str != '' and len(add_sys_var_list) > 0:
    ret_str += '\n'
  for i in range(0, len(tenant_id_list)):
    tenant_id = tenant_id_list[i]
    change_tenant(cur, tenant_id)
    if i > 0:
      ret_str += '\n'
    ret_str += gen_sys_var_add_sqls_for_tenant(query_cur, tenant_id, add_sys_var_list)
  if ret_str != '' and gen_sys_var_special_update_sqls_for_tenant(tenant_id_list[0]) != '':
    ret_str += '\n'
  for i in range(0, len(tenant_id_list)):
    tenant_id = tenant_id_list[i]
    change_tenant(cur, tenant_id)
    if i > 0:
      ret_str += '\n'
    ret_str += gen_sys_var_special_update_sqls_for_tenant(tenant_id)
  sys_tenant_id= 1
  change_tenant(cur, sys_tenant_id)
  return ret_str

def gen_update_sys_var_value_sql_for_tenant(tenant_id, sys_var):
  update_sql = ('update oceanbase.__all_sys_variable set value = \'' + str(sys_var["value"])
      + '\' where tenant_id = ' + str(tenant_id) + ' and name = \'' + sys_var["name"] + '\'')
  return update_sql

# 修改相关实现需要调整ObUpgradeUtils::upgrade_sys_variable()
def exec_sys_vars_upgrade_dml(cur, tenant_id_list):
  if len(tenant_id_list) <= 0:
    logging.error('distinct tenant id count is <= 0, tenant_id_count: %d', len(tenant_id_list))
    raise MyError('invalid arg')
  dml_cur = DMLCursor(cur)
  # 操作前先dump出oceanbase.__all_sys_variable表的所有数据
  my_utils.query_and_dump_results(dml_cur, """select * from oceanbase.__all_virtual_sys_variable""")
  # 操作前先dump出oceanbase.__all_sys_variable_history表的所有数据
  my_utils.query_and_dump_results(dml_cur, """select * from oceanbase.__all_virtual_sys_variable_history""")

  for i in range(0, len(tenant_id_list)):
    tenant_id = tenant_id_list[i]
    # calc diff
    (update_sys_var_list, update_sys_var_ori_list, add_sys_var_list) = calc_diff_sys_var(cur, tenant_id)
    logging.info('update system variables list: [%s]', ', '.join(str(sv) for sv in update_sys_var_list))
    logging.info('update system variables original list: [%s]', ', '.join(str(sv) for sv in update_sys_var_ori_list))
    logging.info('add system variables list: [%s]', ', '.join(str(sv) for sv in add_sys_var_list))
    # update
    change_tenant(cur, tenant_id)
    update_sys_vars_for_tenant(dml_cur, tenant_id, update_sys_var_list)
    add_sys_vars_for_tenant(dml_cur, tenant_id, add_sys_var_list)
    special_update_sys_vars_for_tenant(dml_cur, tenant_id, add_sys_var_list, 'nls_date_format', 'YYYY-MM-DD HH24:MI:SS');
    special_update_sys_vars_for_tenant(dml_cur, tenant_id, add_sys_var_list, 'nls_timestamp_format', 'YYYY-MM-DD HH24:MI:SS.FF');
    special_update_sys_vars_for_tenant(dml_cur, tenant_id, add_sys_var_list, 'nls_timestamp_tz_format', 'YYYY-MM-DD HH24:MI:SS.FF TZR TZD');
  # reset
  sys_tenant_id = 1
  change_tenant(cur, sys_tenant_id)

def exec_sys_vars_upgrade_dml_in_standby_cluster(standby_cluster_infos):
  try:
    for standby_cluster_info in standby_cluster_infos:
      exec_sys_vars_upgrade_dml_by_cluster(standby_cluster_info)
  except Exception, e:
    logging.exception("""exec_sys_vars_upgrade_dml_in_standby_cluster failed""")
    raise e

def exec_sys_vars_upgrade_dml_by_cluster(standby_cluster_info):
  try:

    logging.info("exec_sys_vars_upgrade_dml_by_cluster : cluster_id = {0}, ip = {1}, port = {2}"
                 .format(standby_cluster_info['cluster_id'],
                         standby_cluster_info['ip'],
                         standby_cluster_info['port']))
    logging.info("create connection : cluster_id = {0}, ip = {1}, port = {2}"
                 .format(standby_cluster_info['cluster_id'],
                         standby_cluster_info['ip'],
                         standby_cluster_info['port']))
    conn = mysql.connector.connect(user     =  standby_cluster_info['user'],
                                   password =  standby_cluster_info['pwd'],
                                   host     =  standby_cluster_info['ip'],
                                   port     =  standby_cluster_info['port'],
                                   database =  'oceanbase',
                                   raise_on_warnings = True)
    cur = conn.cursor(buffered=True)
    conn.autocommit = True
    dml_cur = DMLCursor(cur)
    query_cur = QueryCursor(cur)
    is_primary = actions.check_current_cluster_is_primary(query_cur)
    if is_primary:
      logging.exception("""primary cluster changed : cluster_id = {0}, ip = {1}, port = {2}"""
                        .format(standby_cluster_info['cluster_id'],
                                standby_cluster_info['ip'],
                                standby_cluster_info['port']))
      raise e

    # only update sys tenant in standby cluster
    tenant_id = 1
    # calc diff
    (update_sys_var_list, update_sys_var_ori_list, add_sys_var_list) = calc_diff_sys_var(cur, tenant_id)
    logging.info('update system variables list: [%s]', ', '.join(str(sv) for sv in update_sys_var_list))
    logging.info('update system variables original list: [%s]', ', '.join(str(sv) for sv in update_sys_var_ori_list))
    logging.info('add system variables list: [%s]', ', '.join(str(sv) for sv in add_sys_var_list))
    # update
    update_sys_vars_for_tenant(dml_cur, tenant_id, update_sys_var_list)
    add_sys_vars_for_tenant(dml_cur, tenant_id, add_sys_var_list)
    special_update_sys_vars_for_tenant(dml_cur, tenant_id, add_sys_var_list, 'nls_date_format', 'YYYY-MM-DD HH24:MI:SS');
    special_update_sys_vars_for_tenant(dml_cur, tenant_id, add_sys_var_list, 'nls_timestamp_format', 'YYYY-MM-DD HH24:MI:SS.FF');
    special_update_sys_vars_for_tenant(dml_cur, tenant_id, add_sys_var_list, 'nls_timestamp_tz_format', 'YYYY-MM-DD HH24:MI:SS.FF TZR TZD');

    cur.close()
    conn.close()

  except Exception, e:
    logging.exception("""exec_sys_vars_upgrade_dml_in_standby_cluster failed :
                         cluster_id = {0}, ip = {1}, port = {2}"""
                         .format(standby_cluster_info['cluster_id'],
                                 standby_cluster_info['ip'],
                                 standby_cluster_info['port']))
    raise e


def get_actual_tenant_id(tenant_id):
  return tenant_id if (1 == tenant_id) else 0;

def change_tenant(cur, tenant_id):
  # change tenant
  sql = "alter system change tenant tenant_id = {0};".format(tenant_id)
  logging.info(sql);
  cur.execute(sql);
  # check
  sql = "select effective_tenant_id();"
  cur.execute(sql)
  result = cur.fetchall()
  if (1 != len(result) or 1 != len(result[0])):
    raise MyError("invalid result cnt")
  elif (tenant_id != result[0][0]):
    raise MyError("effective_tenant_id:{0} , tenant_id:{1}".format(result[0][0], tenant_id))