提交 2db1b0d8 编写于 作者: A Asim R P 提交者: Asim RP

Enable connections to standby from an isolation2 spec

This feature enables tests to run SQL on standby after it is promoted.  Use
"-1S: <sql>" to run <sql> statement on standby.  It is assumed that the standby
is already promoted.
上级 06afd1cd
......@@ -55,16 +55,16 @@ class SQLIsolationExecutor(object):
# The re.S flag makes the "." in the regex match newlines.
# When matched against a command in process_command(), all
# lines in the command are matched and sent as SQL query.
self.command_pattern = re.compile(r"^(-?\d+|[*])([&\\<\\>UIq]*?)\:(.*)", re.S)
self.command_pattern = re.compile(r"^(-?\d+|[*])([&\\<\\>USIq]*?)\:(.*)", re.S)
if dbname:
self.dbname = dbname
else:
self.dbname = os.environ.get('PGDATABASE')
class SQLConnection(object):
def __init__(self, out_file, name, utility_mode, dbname):
def __init__(self, out_file, name, mode, dbname):
self.name = name
self.utility_mode = utility_mode
self.mode = mode
self.out_file = out_file
self.dbname = dbname
......@@ -82,7 +82,7 @@ class SQLIsolationExecutor(object):
def session_process(self, pipe):
sp = SQLIsolationExecutor.SQLSessionProcess(self.name,
self.utility_mode, pipe, self.dbname)
self.mode, pipe, self.dbname)
sp.do()
def query(self, command):
......@@ -135,21 +135,29 @@ class SQLIsolationExecutor(object):
self.p.terminate()
class SQLSessionProcess(object):
def __init__(self, name, utility_mode, pipe, dbname):
def __init__(self, name, mode, pipe, dbname):
"""
Constructor
"""
self.name = name
self.utility_mode = utility_mode
self.mode = mode
self.pipe = pipe
self.dbname = dbname
if self.utility_mode:
(hostname, port) = self.get_utility_mode_port(name)
if self.mode == "utility":
(hostname, port) = self.get_hostname_port(name, 'p')
self.con = self.connectdb(given_dbname=self.dbname,
given_host=hostname,
given_port=port,
given_opt="-c gp_session_role=utility")
elif self.mode == "standby":
# Connect to standby even when it's role is recorded
# as mirror. This is useful for scenarios where a
# test needs to promote a standby without using
# gpactivatestandby.
(hostname, port) = self.get_hostname_port(name, 'm')
self.con = self.connectdb(given_dbname=self.dbname,
given_host=hostname,
given_port=port)
else:
self.con = self.connectdb(self.dbname)
......@@ -178,15 +186,17 @@ class SQLIsolationExecutor(object):
raise
return con
def get_utility_mode_port(self, name):
def get_hostname_port(self, contentid, role):
"""
Gets the port number/hostname combination of the
contentid = name and role = primary
contentid and role
"""
query = ("SELECT hostname, port FROM gp_segment_configuration WHERE"
" content = %s AND role = '%s'") % (contentid, role)
con = self.connectdb(self.dbname)
r = con.query("SELECT hostname, port FROM gp_segment_configuration WHERE content = %s and role = 'p'" % name).getresult()
r = con.query(query).getresult()
if len(r) == 0:
raise Exception("Invalid content %s" % name)
raise Exception("Invalid content %s" % contentid)
if r[0][0] == socket.gethostname():
return (None, int(r[0][1]))
return (r[0][0], int(r[0][1]))
......@@ -289,35 +299,35 @@ class SQLIsolationExecutor(object):
(c, wait) = self.pipe.recv()
def get_process(self, out_file, name, utility_mode=False, dbname=""):
def get_process(self, out_file, name, mode="", dbname=""):
"""
Gets or creates the process by the given name
"""
if len(name) > 0 and not is_digit(name):
raise Exception("Name should be a number")
if len(name) > 0 and not utility_mode and int(name) >= 1024:
if len(name) > 0 and mode != "utility" and int(name) >= 1024:
raise Exception("Session name should be smaller than 1024 unless it is utility mode number")
if not (name, utility_mode) in self.processes:
if not (name, mode) in self.processes:
if not dbname:
dbname = self.dbname
self.processes[(name, utility_mode)] = SQLIsolationExecutor.SQLConnection(out_file, name, utility_mode, dbname)
return self.processes[(name, utility_mode)]
self.processes[(name, mode)] = SQLIsolationExecutor.SQLConnection(out_file, name, mode, dbname)
return self.processes[(name, mode)]
def quit_process(self, out_file, name, utility_mode=False, dbname=""):
def quit_process(self, out_file, name, mode="", dbname=""):
"""
Quits a process with the given name
"""
if len(name) > 0 and not is_digit(name):
raise Exception("Name should be a number")
if len(name) > 0 and not utility_mode and int(name) >= 1024:
if len(name) > 0 and mode != "utility" and int(name) >= 1024:
raise Exception("Session name should be smaller than 1024 unless it is utility mode number")
if not (name, utility_mode) in self.processes:
if not (name, mode) in self.processes:
raise Exception("Sessions not started cannot be quit")
self.processes[(name, utility_mode)].quit()
del self.processes[(name, utility_mode)]
self.processes[(name, mode)].quit()
del self.processes[(name, mode)]
def get_all_primary_contentids(self, dbname):
"""
......@@ -342,11 +352,18 @@ class SQLIsolationExecutor(object):
process_name = ""
sql = command
flag = ""
con_mode = ""
dbname = ""
m = self.command_pattern.match(command)
if m:
process_name = m.groups()[0]
flag = m.groups()[1]
if flag and flag[0] == "U":
con_mode = "utility"
elif flag and flag[0] == "S":
if len(flag) > 1:
flag = flag[1:]
con_mode = "standby"
sql = m.groups()[2]
sql = sql.lstrip()
# If db_name is specifed , it should be of the following syntax:
......@@ -399,19 +416,19 @@ class SQLIsolationExecutor(object):
load_helper_file(helper_file)
)
else:
self.get_process(output_file, process_name, dbname=dbname).query(sql.strip())
self.get_process(output_file, process_name, con_mode, dbname=dbname).query(sql.strip())
elif flag == "&":
self.get_process(output_file, process_name, dbname=dbname).fork(sql.strip(), True)
self.get_process(output_file, process_name, con_mode, dbname=dbname).fork(sql.strip(), True)
elif flag == ">":
self.get_process(output_file, process_name, dbname=dbname).fork(sql.strip(), False)
self.get_process(output_file, process_name, con_mode, dbname=dbname).fork(sql.strip(), False)
elif flag == "<":
if len(sql) > 0:
raise Exception("No query should be given on join")
self.get_process(output_file, process_name, dbname=dbname).join()
self.get_process(output_file, process_name, con_mode, dbname=dbname).join()
elif flag == "q":
if len(sql) > 0:
raise Exception("No query should be given on quit")
self.quit_process(output_file, process_name, dbname=dbname)
self.quit_process(output_file, process_name, con_mode, dbname=dbname)
elif flag == "U":
if process_name == '*':
process_names = [str(content) for content in self.get_all_primary_contentids(dbname)]
......@@ -419,17 +436,19 @@ class SQLIsolationExecutor(object):
process_names = [process_name]
for name in process_names:
self.get_process(output_file, name, utility_mode=True, dbname=dbname).query(sql.strip())
self.get_process(output_file, name, con_mode, dbname=dbname).query(sql.strip())
elif flag == "U&":
self.get_process(output_file, process_name, utility_mode=True, dbname=dbname).fork(sql.strip(), True)
self.get_process(output_file, process_name, con_mode, dbname=dbname).fork(sql.strip(), True)
elif flag == "U<":
if len(sql) > 0:
raise Exception("No query should be given on join")
self.get_process(output_file, process_name, utility_mode=True, dbname=dbname).join()
self.get_process(output_file, process_name, con_mode, dbname=dbname).join()
elif flag == "Uq":
if len(sql) > 0:
raise Exception("No query should be given on quit")
self.quit_process(output_file, process_name, utility_mode=True, dbname=dbname)
self.quit_process(output_file, process_name, con_mode, dbname=dbname)
elif flag == "S":
self.get_process(output_file, process_name, con_mode, dbname=dbname).query(sql.strip())
else:
raise Exception("Invalid isolation flag")
......@@ -449,7 +468,7 @@ class SQLIsolationExecutor(object):
command_part = line.partition("--")[0] # remove comment from line
if command_part == "" or command_part == "\n":
print >>output_file
elif command_part.endswith(";\n") or re.match(r"^\d+[q\\<]:$", line) or re.match(r"^-?\d+U[q\\<]:$", line):
elif command_part.endswith(";\n") or re.match(r"^\d+[q\\<]:$", line) or re.match(r"^-?\d+[SU][q\\<]:$", line):
command += command_part
try:
self.process_command(command, output_file)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册