提交 7d8a1504 编写于 作者: A Asim R P

Enable connections to standby from an isolation2 spec

This feature enables tests to run SQL on standby after it is promoted.  Use
"-1S: <sql>" to run <sql> statement on standby.  It is assumed that the standby
is already promoted.
上级 c2f00f14
...@@ -55,16 +55,16 @@ class SQLIsolationExecutor(object): ...@@ -55,16 +55,16 @@ class SQLIsolationExecutor(object):
# The re.S flag makes the "." in the regex match newlines. # The re.S flag makes the "." in the regex match newlines.
# When matched against a command in process_command(), all # When matched against a command in process_command(), all
# lines in the command are matched and sent as SQL query. # lines in the command are matched and sent as SQL query.
self.command_pattern = re.compile(r"^(-?\d+|[*])([&\\<\\>UIq]*?)\:(.*)", re.S) self.command_pattern = re.compile(r"^(-?\d+|[*])([&\\<\\>USIq]*?)\:(.*)", re.S)
if dbname: if dbname:
self.dbname = dbname self.dbname = dbname
else: else:
self.dbname = os.environ.get('PGDATABASE') self.dbname = os.environ.get('PGDATABASE')
class SQLConnection(object): class SQLConnection(object):
def __init__(self, out_file, name, utility_mode, dbname): def __init__(self, out_file, name, mode, dbname):
self.name = name self.name = name
self.utility_mode = utility_mode self.mode = mode
self.out_file = out_file self.out_file = out_file
self.dbname = dbname self.dbname = dbname
...@@ -82,7 +82,7 @@ class SQLIsolationExecutor(object): ...@@ -82,7 +82,7 @@ class SQLIsolationExecutor(object):
def session_process(self, pipe): def session_process(self, pipe):
sp = SQLIsolationExecutor.SQLSessionProcess(self.name, sp = SQLIsolationExecutor.SQLSessionProcess(self.name,
self.utility_mode, pipe, self.dbname) self.mode, pipe, self.dbname)
sp.do() sp.do()
def query(self, command): def query(self, command):
...@@ -135,21 +135,29 @@ class SQLIsolationExecutor(object): ...@@ -135,21 +135,29 @@ class SQLIsolationExecutor(object):
self.p.terminate() self.p.terminate()
class SQLSessionProcess(object): class SQLSessionProcess(object):
def __init__(self, name, utility_mode, pipe, dbname): def __init__(self, name, mode, pipe, dbname):
""" """
Constructor Constructor
""" """
self.name = name self.name = name
self.utility_mode = utility_mode self.mode = mode
self.pipe = pipe self.pipe = pipe
self.dbname = dbname self.dbname = dbname
if self.utility_mode: if self.mode == "utility":
(hostname, port) = self.get_utility_mode_port(name) (hostname, port) = self.get_hostname_port(name, 'p')
self.con = self.connectdb(given_dbname=self.dbname, self.con = self.connectdb(given_dbname=self.dbname,
given_host=hostname, given_host=hostname,
given_port=port, given_port=port,
given_opt="-c gp_session_role=utility") given_opt="-c gp_session_role=utility")
elif self.mode == "standby":
# Connect to standby even when it's role is recorded
# as mirror. This is useful for scenarios where a
# test needs to promote a standby without using
# gpactivatestandby.
(hostname, port) = self.get_hostname_port(name, 'm')
self.con = self.connectdb(given_dbname=self.dbname,
given_host=hostname,
given_port=port)
else: else:
self.con = self.connectdb(self.dbname) self.con = self.connectdb(self.dbname)
...@@ -178,15 +186,17 @@ class SQLIsolationExecutor(object): ...@@ -178,15 +186,17 @@ class SQLIsolationExecutor(object):
raise raise
return con return con
def get_utility_mode_port(self, name): def get_hostname_port(self, contentid, role):
""" """
Gets the port number/hostname combination of the Gets the port number/hostname combination of the
contentid = name and role = primary contentid and role
""" """
query = ("SELECT hostname, port FROM gp_segment_configuration WHERE"
" content = %s AND role = '%s'") % (contentid, role)
con = self.connectdb(self.dbname) con = self.connectdb(self.dbname)
r = con.query("SELECT hostname, port FROM gp_segment_configuration WHERE content = %s and role = 'p'" % name).getresult() r = con.query(query).getresult()
if len(r) == 0: if len(r) == 0:
raise Exception("Invalid content %s" % name) raise Exception("Invalid content %s" % contentid)
if r[0][0] == socket.gethostname(): if r[0][0] == socket.gethostname():
return (None, int(r[0][1])) return (None, int(r[0][1]))
return (r[0][0], int(r[0][1])) return (r[0][0], int(r[0][1]))
...@@ -289,35 +299,35 @@ class SQLIsolationExecutor(object): ...@@ -289,35 +299,35 @@ class SQLIsolationExecutor(object):
(c, wait) = self.pipe.recv() (c, wait) = self.pipe.recv()
def get_process(self, out_file, name, utility_mode=False, dbname=""): def get_process(self, out_file, name, mode="", dbname=""):
""" """
Gets or creates the process by the given name Gets or creates the process by the given name
""" """
if len(name) > 0 and not is_digit(name): if len(name) > 0 and not is_digit(name):
raise Exception("Name should be a number") raise Exception("Name should be a number")
if len(name) > 0 and not utility_mode and int(name) >= 1024: if len(name) > 0 and mode != "utility" and int(name) >= 1024:
raise Exception("Session name should be smaller than 1024 unless it is utility mode number") raise Exception("Session name should be smaller than 1024 unless it is utility mode number")
if not (name, utility_mode) in self.processes: if not (name, mode) in self.processes:
if not dbname: if not dbname:
dbname = self.dbname dbname = self.dbname
self.processes[(name, utility_mode)] = SQLIsolationExecutor.SQLConnection(out_file, name, utility_mode, dbname) self.processes[(name, mode)] = SQLIsolationExecutor.SQLConnection(out_file, name, mode, dbname)
return self.processes[(name, utility_mode)] return self.processes[(name, mode)]
def quit_process(self, out_file, name, utility_mode=False, dbname=""): def quit_process(self, out_file, name, mode="", dbname=""):
""" """
Quits a process with the given name Quits a process with the given name
""" """
if len(name) > 0 and not is_digit(name): if len(name) > 0 and not is_digit(name):
raise Exception("Name should be a number") raise Exception("Name should be a number")
if len(name) > 0 and not utility_mode and int(name) >= 1024: if len(name) > 0 and mode != "utility" and int(name) >= 1024:
raise Exception("Session name should be smaller than 1024 unless it is utility mode number") raise Exception("Session name should be smaller than 1024 unless it is utility mode number")
if not (name, utility_mode) in self.processes: if not (name, mode) in self.processes:
raise Exception("Sessions not started cannot be quit") raise Exception("Sessions not started cannot be quit")
self.processes[(name, utility_mode)].quit() self.processes[(name, mode)].quit()
del self.processes[(name, utility_mode)] del self.processes[(name, mode)]
def get_all_primary_contentids(self, dbname): def get_all_primary_contentids(self, dbname):
""" """
...@@ -342,11 +352,18 @@ class SQLIsolationExecutor(object): ...@@ -342,11 +352,18 @@ class SQLIsolationExecutor(object):
process_name = "" process_name = ""
sql = command sql = command
flag = "" flag = ""
con_mode = ""
dbname = "" dbname = ""
m = self.command_pattern.match(command) m = self.command_pattern.match(command)
if m: if m:
process_name = m.groups()[0] process_name = m.groups()[0]
flag = m.groups()[1] flag = m.groups()[1]
if flag and flag[0] == "U":
con_mode = "utility"
elif flag and flag[0] == "S":
if len(flag) > 1:
flag = flag[1:]
con_mode = "standby"
sql = m.groups()[2] sql = m.groups()[2]
sql = sql.lstrip() sql = sql.lstrip()
# If db_name is specifed , it should be of the following syntax: # If db_name is specifed , it should be of the following syntax:
...@@ -399,19 +416,19 @@ class SQLIsolationExecutor(object): ...@@ -399,19 +416,19 @@ class SQLIsolationExecutor(object):
load_helper_file(helper_file) load_helper_file(helper_file)
) )
else: else:
self.get_process(output_file, process_name, dbname=dbname).query(sql.strip()) self.get_process(output_file, process_name, con_mode, dbname=dbname).query(sql.strip())
elif flag == "&": elif flag == "&":
self.get_process(output_file, process_name, dbname=dbname).fork(sql.strip(), True) self.get_process(output_file, process_name, con_mode, dbname=dbname).fork(sql.strip(), True)
elif flag == ">": elif flag == ">":
self.get_process(output_file, process_name, dbname=dbname).fork(sql.strip(), False) self.get_process(output_file, process_name, con_mode, dbname=dbname).fork(sql.strip(), False)
elif flag == "<": elif flag == "<":
if len(sql) > 0: if len(sql) > 0:
raise Exception("No query should be given on join") raise Exception("No query should be given on join")
self.get_process(output_file, process_name, dbname=dbname).join() self.get_process(output_file, process_name, con_mode, dbname=dbname).join()
elif flag == "q": elif flag == "q":
if len(sql) > 0: if len(sql) > 0:
raise Exception("No query should be given on quit") raise Exception("No query should be given on quit")
self.quit_process(output_file, process_name, dbname=dbname) self.quit_process(output_file, process_name, con_mode, dbname=dbname)
elif flag == "U": elif flag == "U":
if process_name == '*': if process_name == '*':
process_names = [str(content) for content in self.get_all_primary_contentids(dbname)] process_names = [str(content) for content in self.get_all_primary_contentids(dbname)]
...@@ -419,17 +436,19 @@ class SQLIsolationExecutor(object): ...@@ -419,17 +436,19 @@ class SQLIsolationExecutor(object):
process_names = [process_name] process_names = [process_name]
for name in process_names: for name in process_names:
self.get_process(output_file, name, utility_mode=True, dbname=dbname).query(sql.strip()) self.get_process(output_file, name, con_mode, dbname=dbname).query(sql.strip())
elif flag == "U&": elif flag == "U&":
self.get_process(output_file, process_name, utility_mode=True, dbname=dbname).fork(sql.strip(), True) self.get_process(output_file, process_name, con_mode, dbname=dbname).fork(sql.strip(), True)
elif flag == "U<": elif flag == "U<":
if len(sql) > 0: if len(sql) > 0:
raise Exception("No query should be given on join") raise Exception("No query should be given on join")
self.get_process(output_file, process_name, utility_mode=True, dbname=dbname).join() self.get_process(output_file, process_name, con_mode, dbname=dbname).join()
elif flag == "Uq": elif flag == "Uq":
if len(sql) > 0: if len(sql) > 0:
raise Exception("No query should be given on quit") raise Exception("No query should be given on quit")
self.quit_process(output_file, process_name, utility_mode=True, dbname=dbname) self.quit_process(output_file, process_name, con_mode, dbname=dbname)
elif flag == "S":
self.get_process(output_file, process_name, con_mode, dbname=dbname).query(sql.strip())
else: else:
raise Exception("Invalid isolation flag") raise Exception("Invalid isolation flag")
...@@ -449,7 +468,7 @@ class SQLIsolationExecutor(object): ...@@ -449,7 +468,7 @@ class SQLIsolationExecutor(object):
command_part = line.partition("--")[0] # remove comment from line command_part = line.partition("--")[0] # remove comment from line
if command_part == "" or command_part == "\n": if command_part == "" or command_part == "\n":
print >>output_file print >>output_file
elif command_part.endswith(";\n") or re.match(r"^\d+[q\\<]:$", line) or re.match(r"^-?\d+U[q\\<]:$", line): elif command_part.endswith(";\n") or re.match(r"^\d+[q\\<]:$", line) or re.match(r"^-?\d+[SU][q\\<]:$", line):
command += command_part command += command_part
try: try:
self.process_command(command, output_file) self.process_command(command, output_file)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册