normal_dml_actions_pre.py 6.1 KB
Newer Older
L
LINxiansheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from my_error import MyError
import mysql.connector
from mysql.connector import errorcode
from actions import BaseDMLAction
from actions import reflect_action_cls_list
from actions import fetch_observer_version
from actions import QueryCursor
from actions import check_current_cluster_is_primary
import logging
import my_utils

'''
添加一条normal dml的方法:

在本文件中,添加一个类名以"NormalDMLActionPre"开头并且继承自BaseDMLAction的类,
然后在这个类中实现以下成员函数,并且每个函数执行出错都要抛错:
(1)@staticmethod get_seq_num():
返回一个代表着执行顺序的序列号,该序列号在本文件中不允许重复,若有重复则会报错。
(2)dump_before_do_action(self):
执行action sql之前把一些相关数据dump到日志中。
(3)check_before_do_action(self):
执行action sql之前的检查。
(4)@staticmethod get_action_dml():
返回action sql,并且该sql必须为dml。
(5)@staticmethod get_rollback_sql():
返回回滚该action的sql。
(6)dump_after_do_action(self):
执行action sql之后把一些相关数据dump到日志中。
(7)check_after_do_action(self):
执行action sql之后的检查。
(8)skip_action(self):
check if check_before_do_action() and do_action() can be skipped

举例:
class NormalDMLActionPre1(BaseDMLAction):
  @staticmethod
  def get_seq_num():
    return 0
  def dump_before_do_action(self):
    my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test""")
  def skip_action(self):
    (desc, results) = self._query_cursor.exec_query("""select * from test.for_test where pk = 9""")
    return (len(results) > 0)
  def check_before_do_action(self):
    (desc, results) = self._query_cursor.exec_query("""select * from test.for_test where pk = 9""")
    if len(results) > 0:
      raise MyError('some row in table test.for_test whose primary key is 9 already exists')
  @staticmethod
  def get_action_dml():
    return """insert into test.for_test values (9, 'haha', 99)"""
  @staticmethod
  def get_rollback_sql():
    return """delete from test.for_test where pk = 9"""
  def dump_after_do_action(self):
    my_utils.query_and_dump_results(self._query_cursor, """select * from test.for_test""")
  def check_after_do_action(self):
    (desc, results) = self._query_cursor.exec_query("""select * from test.for_test where pk = 9""")
    if len(results) != 1:
      raise MyError('there should be only one row whose primary key is 9 in table test.for_test, but there has {0} rows like that'.format(len(results)))
    elif results[0][0] != 9 or results[0][1] != 'haha' or results[0][2] != 99:
      raise MyError('the row that has been inserted is not expected, it is: [{0}]'.format(','.join(str(r) for r in results[0])))
'''

#升级语句对应的action要写在下面的actions begin和actions end这两行之间,
#因为基准版本更新的时候会调用reset_upgrade_scripts.py来清空actions begin和actions end
#这两行之间的这些action,如果不写在这两行之间的话会导致清空不掉相应的action。

####========******####======== actions begin ========####******========####
####========******####========= actions end =========####******========####


def do_normal_dml_actions(cur):
  import normal_dml_actions_pre
  cls_list = reflect_action_cls_list(normal_dml_actions_pre, 'NormalDMLActionPre')

  # check if pre upgrade script can run reentrantly
  query_cur = QueryCursor(cur)
  version = fetch_observer_version(query_cur)
  can_skip = False
  if (cmp(version, "2.2.77") >= 0 and cmp(version, "3.0.0") < 0):
    can_skip = True
  elif (cmp(version, "3.1.1") >= 0):
    can_skip = True
  else:
    can_skip = False

  for cls in cls_list:
    logging.info('do normal dml acion, seq_num: %d', cls.get_seq_num())
    action = cls(cur)
    action.dump_before_do_action()
    if False == can_skip or False == action.skip_action():
      action.check_before_do_action()
      action.do_action()
    else:
      logging.info("skip dml action, seq_num: %d", cls.get_seq_num())
    action.dump_after_do_action()
    action.check_after_do_action()

def do_normal_dml_actions_by_standby_cluster(standby_cluster_infos):
  try:
    for standby_cluster_info in standby_cluster_infos:
      logging.info("do_normal_dml_actions_by_standby_cluster: cluster_id = {0}, ip = {1}, port = {2}"
                   .format(standby_cluster_info['cluster_id'],
                           standby_cluster_info['ip'],
                           standby_cluster_info['port']))
      logging.info("create connection : cluster_id = {0}, ip = {1}, port = {2}"
                   .format(standby_cluster_info['cluster_id'],
                           standby_cluster_info['ip'],
                           standby_cluster_info['port']))
      conn = mysql.connector.connect(user     =  standby_cluster_info['user'],
                                     password =  standby_cluster_info['pwd'],
                                     host     =  standby_cluster_info['ip'],
                                     port     =  standby_cluster_info['port'],
                                     database =  'oceanbase',
                                     raise_on_warnings = True)

      cur = conn.cursor(buffered=True)
      conn.autocommit = True
      query_cur = QueryCursor(cur)
      is_primary = check_current_cluster_is_primary(query_cur)
      if is_primary:
        logging.exception("""primary cluster changed : cluster_id = {0}, ip = {1}, port = {2}"""
                          .format(standby_cluster_info['cluster_id'],
                                  standby_cluster_info['ip'],
                                  standby_cluster_info['port']))
        raise e

      ## process
      do_normal_dml_actions(cur)

      cur.close()
      conn.close()
  except Exception, e:
    logging.exception("""do_normal_dml_actions_by_standby_cluster failed""")
    raise e

def get_normal_dml_actions_sqls_str():
  import normal_dml_actions_pre
  ret_str = ''
  cls_list = reflect_action_cls_list(normal_dml_actions_pre, 'NormalDMLActionPre')
  for i in range(0, len(cls_list)):
    if i > 0:
      ret_str += '\n'
    ret_str += cls_list[i].get_action_dml() + ';'
  return ret_str