未验证 提交 329f5f1b 编写于 作者: J Jialun 提交者: GitHub

Add gpexpand rollback test case (#6129)

- Add a fault inject to test rollback process
- Remove pg_hba.conf updating during online expand, for only master
  will connect to segment via libpq, master's ip has already been
  in pg_hba.conf
上级 3f6f76d8
......@@ -444,15 +444,13 @@ class GpExpandStatus():
'BUILD_SEGMENT_TEMPLATE_DONE': 4,
'BUILD_SEGMENTS_STARTED': 5,
'BUILD_SEGMENTS_DONE': 6,
'UPDATE_OLD_SEGMENTS_STARTED': 7,
'UPDATE_OLD_SEGMENTS_DONE': 8,
'UPDATE_CATALOG_STARTED': 9,
'UPDATE_CATALOG_DONE': 10,
'SETUP_EXPANSION_SCHEMA_STARTED': 11,
'SETUP_EXPANSION_SCHEMA_DONE': 12,
'PREPARE_EXPANSION_SCHEMA_STARTED': 13,
'PREPARE_EXPANSION_SCHEMA_DONE': 14,
'EXPANSION_PREPARE_DONE': 15
'UPDATE_CATALOG_STARTED': 7,
'UPDATE_CATALOG_DONE': 8,
'SETUP_EXPANSION_SCHEMA_STARTED': 9,
'SETUP_EXPANSION_SCHEMA_DONE': 10,
'PREPARE_EXPANSION_SCHEMA_STARTED': 11,
'PREPARE_EXPANSION_SCHEMA_DONE': 12,
'EXPANSION_PREPARE_DONE': 13
}
self._status = []
self._status_info = []
......@@ -464,7 +462,6 @@ class GpExpandStatus():
self._fp_standby = None
self._temp_dir = None
self._input_filename = None
self._original_primary_count = None
self._gp_segment_configuration_backup = None
if os.path.exists(self._status_filename):
......@@ -487,8 +484,6 @@ class GpExpandStatus():
self._number_new_segments = status_info
elif status == 'EXPANSION_PREPARE_STARTED':
self._input_filename = status_info
elif status == 'UPDATE_OLD_SEGMENTS_STARTED':
self._original_primary_count = int(status_info)
elif status == 'UPDATE_CATALOG_STARTED':
self._gp_segment_configuration_backup = status_info
......@@ -601,10 +596,6 @@ class GpExpandStatus():
""" Gets the number of new segments added """
return self._number_new_segments
def get_original_primary_count(self):
"""Returns the original number of primary segments"""
return self._original_primary_count
def get_gp_segment_configuration_backup(self):
"""Gets the filename of the gp_segment_configuration backup file
created during expansion setup"""
......@@ -618,6 +609,12 @@ class GpExpandStatus():
"""Returns True if running on standby"""
return os.path.exists(self._master_data_directory + self._status_standby_filename)
def can_rollback(self, status):
"""Return if it can rollback under current status"""
if int(self._status_values[status]) > int(self._status_values['UPDATE_CATALOG_DONE']):
return False
return True
# -------------------------------------------------------------------------
......@@ -874,30 +871,6 @@ class SegmentTemplate:
self.tempDir + '/gpexpand.*')
rmCmd.run(validateAfter=True)
self.logger.info('Adding new segments into template pg_hba.conf')
try:
fp = open(self.tempDir + '/pg_hba.conf', 'a')
try:
new_host_set = set()
for newSeg in self.gparray.getExpansionSegDbList() + self.gparray.getDbList():
host = newSeg.getSegmentHostName()
new_host_set.add(host)
for new_host in new_host_set:
addrinfo = socket.getaddrinfo(new_host, None)
ipaddrlist = list(set([(ai[0], ai[4][0]) for ai in addrinfo]))
fp.write('# %s\n' % new_host)
for addr in ipaddrlist:
fp.write(
'host\tall\tall\t%s/%s\ttrust\n' % (addr[1], '32' if addr[0] == socket.AF_INET else '128'))
finally:
fp.close()
except IOError:
raise SegmentTemplateError('Failed to open %s/pg_hba.conf' % self.tempDir)
except Exception:
raise SegmentTemplateError('Failed to add new segments to template pg_hba.conf')
def _tar_template(self):
"""Tars up the template files"""
self.logger.info('Creating schema tar file')
......@@ -1177,7 +1150,6 @@ Set PGDATABASE or use the -D option to specify the correct database to use.""" %
def rollback(self, dburl):
"""Rolls back and expansion setup that didn't successfully complete"""
cleanSchema = False
status_history = self.statusLogger.get_status_history()
if not status_history:
raise ExpansionError('No status history to rollback.')
......@@ -1186,7 +1158,10 @@ Set PGDATABASE or use the -D option to specify the correct database to use.""" %
raise ExpansionError('Expansion preparation complete. Nothing to rollback')
for status in reversed(status_history):
if status[0] == 'BUILD_SEGMENT_TEMPLATE_STARTED':
if not self.statusLogger.can_rollback(status[0]):
raise ExpansionError('Catalog has been changed, the cluster can not rollback.')
elif status[0] == 'BUILD_SEGMENT_TEMPLATE_STARTED':
if self.statusLogger.is_standby():
self.logger.info('Running on standby master, skipping segment template rollback')
continue
......@@ -1201,32 +1176,16 @@ Set PGDATABASE or use the -D option to specify the correct database to use.""" %
self.statusLogger.get_seg_tarfile(),
self.gparray, removeDataDirs=True)
elif status[0] == 'UPDATE_OLD_SEGMENTS_STARTED':
self.logger.info('Rolling back update of original segments')
self.restore_original_segments()
elif status[0] == 'UPDATE_CATALOG_STARTED':
self.logger.info('Rolling back master update')
self.restore_master()
self.gparray = GpArray.initFromCatalog(dburl, utility=True)
elif status[0] == 'SETUP_EXPANSION_SCHEMA_STARTED':
cleanSchema = True
else:
self.logger.debug('Skipping %s' % status[0])
self.conn.close()
if cleanSchema:
self.logger.info('Dropping expansion expansion schema')
schema_conn = dbconn.connect(self.dburl, encoding='UTF8', allowSystemTableMods=True)
try:
dbconn.execSQL(schema_conn, drop_schema_sql)
schema_conn.commit()
schema_conn.close()
except:
pass # schema wasn't created yet.
self.statusLogger.remove_status_file()
self.statusLogger.remove_segment_configuration_backup_file()
......@@ -1363,56 +1322,7 @@ Set PGDATABASE or use the -D option to specify the correct database to use.""" %
raise ExpansionError(msg)
def update_original_segments(self):
"""Updates the pg_hba.conf file and updates the gp_id catalog table
of existing hosts"""
self.statusLogger.set_status('UPDATE_OLD_SEGMENTS_STARTED', self.gparray.get_primary_count())
self.logger.info('Backing up pg_hba.conf file on original segments')
# backup pg_hba.conf file on original segments
for seg in self.old_segments:
if seg.isSegmentQD() or seg.getSegmentStatus() != 'u':
continue
hostname = seg.getSegmentHostName()
datadir = seg.getSegmentDataDirectory()
srcFile = datadir + '/pg_hba.conf'
dstFile = datadir + '/pg_hba.gpexpand.bak'
cpCmd = Scp(name='gpexpand back up pg_hba.conf file on original segments',
srcFile=srcFile, dstFile=dstFile, dstHost=hostname, ctxt=REMOTE,
remoteHost=hostname)
self.pool.addCommand(cpCmd)
self.pool.join()
try:
self.pool.check_results()
except ExecutionError, msg:
raise ExpansionError('Failed to configure original segments: %s' % msg)
# Copy the new pg_hba.conf file to original segments
self.logger.info('Copying new pg_hba.conf file to original segments')
for seg in self.old_segments:
if seg.isSegmentQD() or seg.getSegmentStatus() != 'u':
continue
hostname = seg.getSegmentHostName()
datadir = seg.getSegmentDataDirectory()
cpCmd = Scp(name='gpexpand copy new pg_hba.conf file to original segments',
srcFile=self.tempDir + '/pg_hba.conf', dstFile=datadir,
dstHost=hostname)
self.pool.addCommand(cpCmd)
self.pool.join()
try:
self.pool.check_results()
except ExecutionError, msg:
raise ExpansionError('Failed to configure original segments: %s' % msg)
"""Updates the gp_id catalog table of existing hosts"""
# Update the gp_id of original segments
self.newPrimaryCount = 0;
......@@ -1422,66 +1332,11 @@ Set PGDATABASE or use the -D option to specify the correct database to use.""" %
self.newPrimaryCount += self.gparray.get_primary_count()
self.logger.info('Configuring original segments')
if self.segTemplate:
self.segTemplate.cleanup()
# FIXME: update postmaster.opts
self.statusLogger.set_status('UPDATE_OLD_SEGMENTS_DONE')
def restore_original_segments(self):
""" Restores the original segments back to their state prior the expansion
setup. This is only possible if the expansion setup has not completed
successfully."""
self.logger.info('Restoring original segments')
gp_segment_configuration_backup_file = self.statusLogger.get_gp_segment_configuration_backup();
if gp_segment_configuration_backup_file:
originalArray = GpArray.initFromFile(self.statusLogger.get_gp_segment_configuration_backup())
else:
originalArray = self.gparray
# Restore pg_hba.conf file from backup
self.logger.info('Restoring pg_hba.conf file on original segments')
for seg in originalArray.getSegDbList():
datadir = seg.getSegmentDataDirectory()
hostname = seg.getSegmentHostName()
srcFile = datadir + '/pg_hba.gpexpand.bak'
dstFile = datadir + '/pg_hba.conf'
cpCmd = Scp(name='gpexpand restore of pg_hba.conf file on original segments',
srcFile=srcFile, dstFile=dstFile, dstHost=hostname, ctxt=REMOTE,
remoteHost=hostname)
self.pool.addCommand(cpCmd)
self.pool.join()
try:
self.pool.check_results()
except:
# Setup didn't get this far so no backup to restore.
self.pool.empty_completed_items()
# note: this code may not be needed -- it will NOT change gp_id
# However, the call to gpconfigurenewsegment may still be doing some needed work (stopping the segment)
# which could be unnecessary or could be moved here)
self.logger.info('Restoring original segments catalog tables')
orig_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(originalArray.getSegDbList())
for host in iter(orig_segment_info):
segCfgCmd = ConfigureNewSegment(name='gpexpand configure new segments', confinfo=orig_segment_info[host],
verbose=gplog.logging_is_verbose(), batchSize=self.options.batch_size,
ctxt=REMOTE, remoteHost=host)
self.pool.addCommand(segCfgCmd)
self.pool.join()
try:
self.pool.check_results()
except ExecutionError:
raise ExpansionError('Failed to restore original segments')
def update_catalog(self):
"""
Starts the database, calls updateSystemConfig() to setup
......@@ -1528,7 +1383,10 @@ Set PGDATABASE or use the -D option to specify the correct database to use.""" %
dbconn.execSQL(self.conn, "select gp_expand_bump_version()")
self.conn.close()
inject_fault('gpexpand rollback test fault injection')
self.statusLogger.set_status('UPDATE_CATALOG_DONE')
self.pastThePointOfNoReturn = True;
# --------------------------------------------------------------------------
def cleanup_new_segments(self):
......@@ -1692,7 +1550,6 @@ Set PGDATABASE or use the -D option to specify the correct database to use.""" %
"""Removes the gpexpand status file and segment configuration backup file"""
self.statusLogger.remove_status_file()
self.statusLogger.remove_segment_configuration_backup_file()
self.pastThePointOfNoReturn = True;
def setup_schema(self):
"""Used to setup the gpexpand schema"""
......
......@@ -256,7 +256,7 @@ Feature: expand the cluster by adding more segments
Then gpexpand should return a return code of 0
And verify that the cluster has 1 new segments
And all the segments are running
And check segment conf: postgresql.conf pg_hba.conf
And check segment conf: postgresql.conf
And verify that the master pid has not been changed
@gpexpand_no_mirrors
......@@ -336,3 +336,26 @@ Feature: expand the cluster by adding more segments
When the user runs gpexpand against database "gptest" to redistribute
# Temporarily comment the verifys until redistribute is fixed. This allows us to commit a resource to get a dump of the ICW dump for other tests to use
# Then distribution information from table "public.redistribute" with data in "gptest" is verified against saved data
@gpexpand_no_mirrors
@gpexpand_rollback
Scenario: inject a fail and test if rollback is ok
Given a working directory of the test as '/tmp/gpexpand_behave'
And the database is killed on hosts "mdw,sdw1"
And the user runs command "rm -rf /tmp/gpexpand_behave/*"
And a temporary directory under "/data/gpdata/gpexpand/expandedData" to expand into
And the database is not running
And a cluster is created with no mirrors on "mdw" and "sdw1"
And database "gptest" exists
And there are no gpexpand_inputfiles
And the cluster is setup for an expansion on hosts "mdw,sdw1"
And the gp_segment_configuration have been saved
And the number of segments have been saved
And set fault inject "gpexpand rollback test fault injection"
And the user runs gpexpand interview to add 1 new segment and 0 new host "ignored.host"
When the user runs gpexpand with the latest gpexpand_inputfile without ret code check
Then gpexpand should return a return code of 3
And verify that the cluster has 1 new segments
And run rollback with database "gptest"
And verify the gp_segment_configuration has been restored
And unset fault inject
......@@ -2123,6 +2123,11 @@ def impl(context, additional_params=''):
if ret_code != 0:
raise Exception("gpexpand exited with return code: %d.\nstderr=%s\nstdout=%s" % (ret_code, std_err, std_out))
@when('the user runs gpexpand with the latest gpexpand_inputfile without ret code check')
def impl(context):
gpexpand = Gpexpand(context, working_directory=context.working_directory, database='gptest')
gpexpand.initialize_segments()
@when('the user runs gpexpand against database "{dbname}" to redistribute with duration "{duration}"')
def impl(context, dbname, duration):
_gpexpand_redistribute(context, dbname, duration)
......@@ -2182,6 +2187,59 @@ def impl(context):
query = """SELECT count(*) from gp_segment_configuration where -1 < content"""
context.start_data_segments = dbconn.execSQLForSingleton(conn, query)
@given('the gp_segment_configuration have been saved')
@when('the gp_segment_configuration have been saved')
@then('the gp_segment_configuration have been saved')
def impl(context):
dbname = 'gptest'
gp_segment_conf_backup = {}
with dbconn.connect(dbconn.DbURL(dbname=dbname)) as conn:
query = """SELECT count(*) from gp_segment_configuration where -1 < content"""
segment_count = int(dbconn.execSQLForSingleton(conn, query))
query = """SELECT * from gp_segment_configuration where -1 < content order by dbid"""
cursor = dbconn.execSQL(conn, query)
for i in range(0, segment_count):
dbid, content, role, preferred_role, mode, status,\
port, hostname, address, datadir = cursor.fetchone();
gp_segment_conf_backup[dbid] = {}
gp_segment_conf_backup[dbid]['content'] = content
gp_segment_conf_backup[dbid]['role'] = role
gp_segment_conf_backup[dbid]['preferred_role'] = preferred_role
gp_segment_conf_backup[dbid]['mode'] = mode
gp_segment_conf_backup[dbid]['status'] = status
gp_segment_conf_backup[dbid]['port'] = port
gp_segment_conf_backup[dbid]['hostname'] = hostname
gp_segment_conf_backup[dbid]['address'] = address
gp_segment_conf_backup[dbid]['datadir'] = datadir
context.gp_segment_conf_backup = gp_segment_conf_backup
@given('verify the gp_segment_configuration has been restored')
@when('verify the gp_segment_configuration has been restored')
@then('verify the gp_segment_configuration has been restored')
def impl(context):
dbname = 'gptest'
gp_segment_conf_backup = {}
with dbconn.connect(dbconn.DbURL(dbname=dbname)) as conn:
query = """SELECT count(*) from gp_segment_configuration where -1 < content"""
segment_count = int(dbconn.execSQLForSingleton(conn, query))
query = """SELECT * from gp_segment_configuration where -1 < content order by dbid"""
cursor = dbconn.execSQL(conn, query)
for i in range(0, segment_count):
dbid, content, role, preferred_role, mode, status,\
port, hostname, address, datadir = cursor.fetchone();
gp_segment_conf_backup[dbid] = {}
gp_segment_conf_backup[dbid]['content'] = content
gp_segment_conf_backup[dbid]['role'] = role
gp_segment_conf_backup[dbid]['preferred_role'] = preferred_role
gp_segment_conf_backup[dbid]['mode'] = mode
gp_segment_conf_backup[dbid]['status'] = status
gp_segment_conf_backup[dbid]['port'] = port
gp_segment_conf_backup[dbid]['hostname'] = hostname
gp_segment_conf_backup[dbid]['address'] = address
gp_segment_conf_backup[dbid]['datadir'] = datadir
if context.gp_segment_conf_backup != gp_segment_conf_backup:
raise Exception("gp_segment_configuration has not been restored")
@given('user has created {table_name} table')
def impl(context, table_name):
dbname = 'gptest'
......@@ -2430,8 +2488,8 @@ def impl(context, config_file):
run_gpcommand(context, 'gpinitsystem -a -c ../gpAux/gpdemo/clusterConfigFile -O %s' % config_file)
check_return_code(context, 0)
@when('check segment conf: postgresql.conf pg_hba.conf')
@then('check segment conf: postgresql.conf pg_hba.conf')
@when('check segment conf: postgresql.conf')
@then('check segment conf: postgresql.conf')
def step_impl(context):
query = "select dbid, port, hostname, datadir from gp_segment_configuration where content >= 0"
conn = dbconn.connect(dbconn.DbURL(dbname='postgres'))
......@@ -2454,25 +2512,6 @@ def step_impl(context):
raise Exception("port value in postgresql.conf of %s is incorrect. Expected:%s, given:%s" %
(hostname, port, dic['port']))
## check pg_hba.conf
remote_hba_conf = "%s/%s" % (datadir, 'pg_hba.conf')
local_hba_copy = os.path.join(os.getenv("MASTER_DATA_DIRECTORY"), "%s.%s" % ('pg_hba.conf', hostname))
cmd = Command(name="Copy remote conf to local to diff",
cmdStr='scp %s:%s %s' % (hostname, remote_hba_conf, local_hba_copy))
cmd.run(validateAfter=True)
f = open(local_hba_copy, 'r')
hba_content = f.read()
f.close()
addrinfo = socket.getaddrinfo(hostname, None)
ipaddrlist = list(set([(ai[0], ai[4][0]) for ai in addrinfo]))
key_word = '# %s\n' % hostname
for addr in ipaddrlist:
key_word += 'host\tall\tall\t%s/%s\ttrust\n' % (addr[1], '32' if addr[0] == socket.AF_INET else '128')
if key_word not in hba_content:
raise Exception("Expected line not in pg_hba.conf,%s" % key_word)
@given('the transactions are started for dml')
def impl(context):
dbname = 'gptest'
......@@ -2540,3 +2579,24 @@ def _get_row_count_per_segment(table, dbname):
cursor = dbconn.execSQL(conn, query)
rows = cursor.fetchall()
return [row[1] for row in rows] # indices are the gp segment id's, so no need to store them explicitly
@given('set fault inject "{fault}"')
@then('set fault inject "{fault}"')
@when('set fault inject "{fault}"')
def impl(context, fault):
os.environ['GPMGMT_FAULT_POINT'] = fault
@given('unset fault inject')
@then('unset fault inject')
@when('unset fault inject')
def impl(context):
os.environ['GPMGMT_FAULT_POINT'] = ""
@given('run rollback with database "{database}"')
@then('run rollback with database "{database}"')
@when('run rollback with database "{database}"')
def impl(context, database):
gpexpand = Gpexpand(context, working_directory=context.working_directory, database=database)
ret_code, std_err, std_out = gpexpand.rollback()
if ret_code != 0:
raise Exception("rollback exited with return code: %d.\nstderr=%s\nstdout=%s" % (ret_code, std_err, std_out))
......@@ -80,6 +80,8 @@ class Gpexpand:
return run_gpcommand(self.context, "gpexpand -D %s %s" % (self.database, flags))
def rollback(self):
return run_gpcommand(self.context, "gpexpand -D %s -r" % (self.database))
if __name__ == '__main__':
gpexpand = Gpexpand()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册