未验证 提交 cea594f0 编写于 作者: J Jialun 提交者: GitHub

Gpexpand minor fix (#7159)

- get port from MASTER_DATA_DIRECTORY, so there is no confusion if
  PGPORT and MASTER_DATA_DIRECTORY are set to different clusters
- delete tmp status file 'gpexpand.standby.status' and copy the
  status file to standby directly
- get standby data directory from catalog instead of assuming its
  same with master
- copy gp_segment_configuration backup file to standby also, so
  standby can restore this catalog if master is down
上级 910dd882
......@@ -208,8 +208,6 @@ def validate_options(options, args, parser):
logger.error('Master data directory %s does not exist.' % options.master_data_directory)
parser.exit()
options.pgport = int(os.getenv('PGPORT', 5432))
return options, args
......@@ -380,9 +378,12 @@ class GpExpandStatus():
self._master_data_directory = master_data_directory
self._master_mirror = master_mirror
self._status_filename = master_data_directory + '/gpexpand.status'
self._status_standby_filename = master_data_directory + '/gpexpand.standby.status'
if master_mirror:
self._status_standby_filename = master_mirror.getSegmentDataDirectory() \
+ '/gpexpand.status'
self._segment_configuration_standby_filename = master_mirror.getSegmentDataDirectory() \
+ '/' + SEGMENT_CONFIGURATION_BACKUP_FILE
self._fp = None
self._fp_standby = None
self._temp_dir = None
self._input_filename = None
self._gp_segment_configuration_backup = None
......@@ -422,11 +423,6 @@ class GpExpandStatus():
"""Creates a new gpexpand status file"""
try:
self._fp = open(self._status_filename, 'w')
if self._master_mirror:
self._fp_standby = open(self._status_standby_filename, 'w')
self._fp_standby.write('UNINITIALIZED:None\n')
self._fp_standby.flush()
os.fsync(self._fp_standby)
self._fp.write('UNINITIALIZED:None\n')
self._fp.flush()
os.fsync(self._fp)
......@@ -435,16 +431,16 @@ class GpExpandStatus():
except IOError:
raise
self._sync_status_file()
if self._master_mirror:
self._sync_status_file()
def _sync_status_file(self):
"""Syncs the gpexpand status file with the master mirror"""
if self._master_mirror:
cpCmd = Scp('gpexpand copying status file to master mirror',
srcFile=self._status_standby_filename,
dstFile=self._status_filename,
dstHost=self._master_mirror.getSegmentHostName())
cpCmd.run(validateAfter=True)
cpCmd = Scp('gpexpand copying status file to master mirror',
srcFile=self._status_filename,
dstFile=self._status_standby_filename,
dstHost=self._master_mirror.getSegmentHostName())
cpCmd.run(validateAfter=True)
def set_status(self, status, status_info=None, force=False):
"""Sets the current status. gpexpand status must be set in
......@@ -463,16 +459,13 @@ class GpExpandStatus():
self._status_values[status] != self._status_values[self._status[-1]] + 1 and \
not force:
raise InvalidStatusError('Invalid status transition from %s to %s' % (self._status[-1], status))
if self._master_mirror:
self._fp_standby.write('%s:%s\n' % (status, status_info))
self._fp_standby.flush()
os.fsync(self._fp_standby)
self._sync_status_file()
self._fp.write('%s:%s\n' % (status, status_info))
self._fp.flush()
os.fsync(self._fp)
self._status.append(status)
self._status_info.append(status_info)
if self._master_mirror:
self._sync_status_file()
def get_current_status(self):
"""Gets the current status that has been written to the gpexpand
......@@ -491,17 +484,12 @@ class GpExpandStatus():
if self._fp:
self._fp.close()
self._fp = None
if self._fp_standby:
self._fp_standby.close()
self._fp_standby = None
if os.path.exists(self._status_filename):
os.unlink(self._status_filename)
if os.path.exists(self._status_standby_filename):
os.unlink(self._status_standby_filename)
if self._master_mirror:
RemoveFile.remote('gpexpand master mirror status file cleanup',
self._master_mirror.getSegmentHostName(),
self._status_filename)
self._status_standby_filename)
def remove_segment_configuration_backup_file(self):
""" Remove the segment configuration backup file """
......@@ -509,6 +497,20 @@ class GpExpandStatus():
if self._gp_segment_configuration_backup != None and os.path.exists(
self._gp_segment_configuration_backup) == True:
os.unlink(self._gp_segment_configuration_backup)
if self._master_mirror:
RemoveFile.remote('gpexpand master mirror segment configuration backup file cleanup',
self._master_mirror.getSegmentHostName(),
self._segment_configuration_standby_filename)
def sync_segment_configuration_backup_file(self):
""" Sync the segment configuration backup file to standby """
if self._master_mirror:
self.logger.debug("Sync segment configuration backup file")
cpCmd = Scp('gpexpand copying segment configuration backup file to master mirror',
srcFile=self._gp_segment_configuration_backup,
dstFile=self._segment_configuration_standby_filename,
dstHost=self._master_mirror.getSegmentHostName())
cpCmd.run(validateAfter=True)
def get_temp_dir(self):
"""Gets temp dir that was used during template creation"""
......@@ -535,10 +537,6 @@ class GpExpandStatus():
"""Sets the filename of the gp_segment_configuration backup file"""
self._gp_segment_configuration_backup = filename
def is_standby(self):
"""Returns True if running on standby"""
return os.path.exists(self._master_data_directory + self._status_standby_filename)
def can_rollback(self, status):
"""Return if it can rollback under current status"""
if int(self._status_values[status]) >= int(self._status_values['UPDATE_CATALOG_DONE']):
......@@ -553,8 +551,6 @@ class GpExpandStatus():
if not self._fp:
self._fp = open(self._status_filename, 'a+')
if self._master_mirror and not self._fp_standby:
self._fp_standby = open(self._status_standby_filename, 'a+')
self.set_status(status, status_info, True)
......@@ -959,9 +955,6 @@ class gpexpand:
raise ExpansionError('Catalog has been changed, the cluster can not rollback.')
elif status[0] == 'BUILD_SEGMENT_TEMPLATE_STARTED':
if self.statusLogger.is_standby():
self.logger.info('Running on standby master, skipping segment template rollback')
continue
self.logger.info('Rolling back segment template build')
SegmentTemplate.cleanup_build_segment_template('gpexpand_schema.tar', status[1])
......@@ -1144,6 +1137,7 @@ class gpexpand:
self.options.master_data_directory + '/' + SEGMENT_CONFIGURATION_BACKUP_FILE)
self.gparray.dumpToFile(self.statusLogger.get_gp_segment_configuration_backup())
self.statusLogger.set_status('UPDATE_CATALOG_STARTED', self.statusLogger.get_gp_segment_configuration_backup())
self.statusLogger.sync_segment_configuration_backup_file()
# Mark expansion segment primaries not in sync
for seg in self.gparray.getExpansionSegDbList():
......@@ -2229,7 +2223,7 @@ def main(options, args, parser):
configurationImplGpdb.GpConfigurationProviderUsingGpdbCatalog())
configurationInterface.getConfigurationProvider().initializeProvider(gpEnv.getMasterPort())
dburl = dbconn.DbURL(dbname=DBNAME)
dburl = dbconn.DbURL(dbname=DBNAME, port=gpEnv.getMasterPort())
gpexpand_db_status = gpexpand.prepare_gpdb_state(logger, dburl, options)
......
......@@ -626,7 +626,7 @@ class GpRecoverSegmentProgram:
confProvider.sendPgElogFromMaster("Recovery of %d segment(s) has been started." % \
len(mirrorBuilder.getMirrorsToBuild()), True)
self.trigger_fts_probe(gpArray)
self.trigger_fts_probe(port=gpEnv.getMasterPort())
self.logger.info("******************************************************************")
self.logger.info("Updating segments for streaming is completed.")
......@@ -636,9 +636,9 @@ class GpRecoverSegmentProgram:
sys.exit(0)
def trigger_fts_probe(self, gpArray):
def trigger_fts_probe(self, port=0):
self.logger.info('Triggering FTS probe')
with dbconn.connect(dbconn.DbURL()) as conn:
with dbconn.connect(dbconn.DbURL(port=port)) as conn:
res = dbconn.execSQL(conn, "SELECT gp_request_fts_probe_scan()")
return res.fetchall()
......
......@@ -407,10 +407,10 @@ Feature: expand the cluster by adding more segments
And database "gptest" exists
And there are no gpexpand_inputfiles
And the cluster is setup for an expansion on hosts "mdw,sdw1"
And the gp_segment_configuration have been saved
And set fault inject "gpexpand retry after releaseing catalog lock fault injection"
When the user runs gpexpand with a static inputfile for a single-node cluster with mirrors without ret code check
Then gpexpand should return a return code of 3
And verify status file and gp_segment_configuration backup file exist on standby
And unset fault inject
When the user runs gpexpand with a static inputfile for a single-node cluster with mirrors without ret code check
Then gpexpand should return a return code of 0
......@@ -2810,3 +2810,37 @@ def impl(context, dbname):
UPDATE pg_class SET reltoastrelid = 0 WHERE oid = 'borked'::regclass;
""")
conn.commit()
@then('verify status file and gp_segment_configuration backup file exist on standby')
def impl(context):
status_file = 'gpexpand.status'
gp_segment_configuration_backup = 'gpexpand.gp_segment_configuration'
query = "select hostname, datadir from gp_segment_configuration where content = -1 order by dbid"
conn = dbconn.connect(dbconn.DbURL(dbname='postgres'))
res = dbconn.execSQL(conn, query).fetchall()
master = res[0]
standby = res[1]
master_datadir = master[1]
standby_host = standby[0]
standby_datadir = standby[1]
standby_remote_statusfile = "%s:%s/%s" % (standby_host, standby_datadir, status_file)
standby_local_statusfile = "%s/%s.standby" % (master_datadir, status_file)
standby_remote_gp_segment_configuration_file = "%s:%s/%s" % \
(standby_host, standby_datadir, gp_segment_configuration_backup)
standby_local_gp_segment_configuration_file = "%s/%s.standby" % \
(master_datadir, gp_segment_configuration_backup)
cmd = Command(name="Copy standby file to master", cmdStr='scp %s %s' % \
(standby_remote_statusfile, standby_local_statusfile))
cmd.run(validateAfter=True)
cmd = Command(name="Copy standby file to master", cmdStr='scp %s %s' % \
(standby_remote_gp_segment_configuration_file, standby_local_gp_segment_configuration_file))
cmd.run(validateAfter=True)
if not os.path.exists(standby_local_statusfile):
raise Exception('file "%s" is not exist' % standby_remote_statusfile)
if not os.path.exists(standby_local_gp_segment_configuration_file):
raise Exception('file "%s" is not exist' % standby_remote_gp_segment_configuration_file)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册