提交 d41ca162 编写于 作者: S Shoaib Lari 提交者: David Krieger

gprecoverseg: Show progress of pg_basebackup on each segment

The gprecoverseg utility runs pg_basebackup in parallel on all segments that are
being recovered.  In this commit, we are logging the progress of each
pg_basebackup on its host and displaying them to the user of gprecoverseg.  The
progress files are deleted upons successful completion of gprecoverseg.

Unit tests have also been added.
Authored-by: NShoaib Lari <slari@pivotal.io>
Co-authored-by: NMark Sliva <msliva@pivotal.io>
Co-authored-by: NJacob Champion <pchampion@pivotal.io>
Co-authored-by: NEd Espino <edespino@pivotal.io>
Co-authored-by: NKalen Krempely <kkrempely@pivotal.io>
上级 9a2cf8cc
......@@ -856,7 +856,9 @@ class ConfigureNewSegment(Command):
def __init__(self, name, confinfo, logdir, newSegments=False, tarFile=None,
batchSize=None, verbose=False,ctxt=LOCAL, remoteHost=None, validationOnly=False, writeGpIdFileOnly=False,
forceoverwrite=False):
cmdStr = '$GPHOME/bin/lib/gpconfigurenewsegment -c \"%s\" -l %s' % (confinfo, pipes.quote(logdir))
if newSegments:
cmdStr += ' -n'
if tarFile:
......@@ -899,7 +901,6 @@ class ConfigureNewSegment(Command):
: <segment dbid>
"""
result = {}
for segIndex, seg in enumerate(segments):
if primaryMirror == 'primary' and seg.isSegmentPrimary() == False:
continue
......@@ -923,13 +924,16 @@ class ConfigureNewSegment(Command):
isPrimarySegment = "false"
isTargetReusedLocationString = "false"
result[hostname] += '%s:%d:%s:%s:%d:%d:%s:%s' % (seg.getSegmentDataDirectory(), seg.getSegmentPort(),
progressFile = getattr(seg, 'progressFile', "")
result[hostname] += '%s:%d:%s:%s:%d:%d:%s:%s:%s' % (seg.getSegmentDataDirectory(), seg.getSegmentPort(),
isPrimarySegment,
isTargetReusedLocationString,
seg.getSegmentDbId(),
seg.getSegmentContentId(),
primaryHostname,
primarySegmentPort
primarySegmentPort,
progressFile
)
return result
......
import datetime
import os
import pipes
import signal
import time
......@@ -160,6 +162,17 @@ class GpMirrorListToBuild:
self.__logger = logger
class ProgressCommand(gp.Command):
"""
A Command, but with an associated DBID and log file path for use by
_join_and_show_segment_progress(). This class is tightly coupled to that
implementation.
"""
def __init__(self, name, cmdStr, dbid, filePath, ctxt, remoteHost):
super(GpMirrorListToBuild.ProgressCommand, self).__init__(name, cmdStr, ctxt, remoteHost)
self.dbid = dbid
self.filePath = filePath
def getMirrorsToBuild(self):
"""
Returns a newly allocated list
......@@ -396,14 +409,56 @@ class GpMirrorListToBuild:
(dbid, usedDataDirectories.get(path), hostName, path))
usedDataDirectories[path] = dbid
def __runWaitAndCheckWorkerPoolForErrorsAndClear(self, cmds, actionVerb, suppressErrorCheck=False):
def _join_and_show_segment_progress(self, cmds, inplace=False, outfile=sys.stdout, interval=0.1):
written = False
def print_progress():
if written and inplace:
outfile.write("\x1B[%dA" % len(cmds))
output = []
for cmd in cmds:
try:
# since print_progress is called multiple times,
# cache cmdStr to reset it after being mutated by cmd.run()
cmd_str = cmd.cmdStr
cmd.run(validateAfter=True)
cmd.cmdStr = cmd_str
results = cmd.get_results().stdout.rstrip()
except ExecutionError:
lines = cmd.get_results().stderr.splitlines()
if lines:
results = lines[0]
else:
results = ''
output.append("%s (dbid %d): %s" % (cmd.remoteHost, cmd.dbid, results))
if inplace:
output.append("\x1B[K")
output.append("\n")
outfile.write("".join(output))
outfile.flush()
while not self.__pool.join(interval):
print_progress()
written = True
# Make sure every line is updated with the final status.
print_progress()
def __runWaitAndCheckWorkerPoolForErrorsAndClear(self, cmds, actionVerb, suppressErrorCheck=False,
progressCmds=[]):
for cmd in cmds:
self.__pool.addCommand(cmd)
if self.__quiet:
self.__pool.join()
else:
base.join_and_indicate_progress(self.__pool)
if progressCmds:
self._join_and_show_segment_progress(progressCmds, inplace=True)
else:
base.join_and_indicate_progress(self.__pool)
if not suppressErrorCheck:
self.__pool.check_results()
......@@ -430,12 +485,15 @@ class GpMirrorListToBuild:
srcSegments = []
destSegments = []
isTargetReusedLocation = []
timeStamp = datetime.datetime.today().strftime('%Y%m%d_%H%M%S')
for directive in directives:
srcSegment = directive.getSrcSegment()
destSegment = directive.getDestSegment()
destSegment.primaryHostname = srcSegment.getSegmentHostName()
destSegment.primarySegmentPort = srcSegment.getSegmentPort()
destSegment.progressFile = '%s/pg_basebackup.%s.dbid%s.out' % (gplog.get_logger_dir(),
timeStamp,
destSegment.getSegmentDbId())
srcSegments.append(srcSegment)
destSegments.append(destSegment)
isTargetReusedLocation.append(directive.isTargetReusedLocation())
......@@ -457,7 +515,6 @@ class GpMirrorListToBuild:
remoteHost=hostName,
validationOnly=validationOnly,
forceoverwrite=self.__forceoverwrite)
#
# validate directories for target segments
#
......@@ -489,14 +546,47 @@ class GpMirrorListToBuild:
if validationErrors:
raise ExceptionNoStackTraceNeeded("\n" + ("\n".join(validationErrors)))
# Configure a new segment
#
# unpack and configure new segments
# Recover segments using gpconfigurenewsegment, which
# uses pg_basebackup. gprecoverseg generates a log filename which is
# passed to gpconfigurenewsegment as a confinfo parameter. gprecoverseg
# tails this file to show recovery progress to the user, and removes the
# file when one done. A new file is generated for each run of
# gprecoverseg based on a timestamp.
#
# There is race between when the pg_basbackup log file is created and
# when the progress command is run. Thus, the progress command touches
# the file to ensure its present before tailing.
self.__logger.info('Configuring new segments')
cmds = []
progressCmds = []
removeCmds= []
for hostName in destSegmentByHost.keys():
cmds.append(createConfigureNewSegmentCommand(hostName, 'configure blank segments', False))
self.__runWaitAndCheckWorkerPoolForErrorsAndClear(cmds, "unpacking basic segment directory")
for segment in destSegmentByHost[hostName]:
progressCmds.append(
GpMirrorListToBuild.ProgressCommand("tail the last line of the file",
"set -o pipefail; touch -a {0}; tail -1 {0} | tr '\\r' '\\n' | tail -1".format(
pipes.quote(segment.progressFile)),
segment.getSegmentDbId(),
segment.progressFile,
ctxt=base.REMOTE,
remoteHost=hostName))
removeCmds.append(
base.Command("remove file",
"rm -f %s" % pipes.quote(segment.progressFile),
ctxt=base.REMOTE,
remoteHost=hostName))
cmds.append(
createConfigureNewSegmentCommand(hostName, 'configure blank segments', False))
self.__runWaitAndCheckWorkerPoolForErrorsAndClear(cmds, "unpacking basic segment directory",
suppressErrorCheck=False,
progressCmds=progressCmds)
self.__runWaitAndCheckWorkerPoolForErrorsAndClear(removeCmds, "removing pg_basebackup progres logfiles",
suppressErrorCheck=False)
#
# copy dump files from old segment to new segment
......
#!/usr/bin/env python
#
# Copyright (c) Greenplum Inc 2008. All Rights Reserved.
# Copyright (c) Greenplum Inc 2008. All Rights Reserved.
#
from gppylib.gparray import Segment, GpArray
from gppylib.test.unit.gp_unittest import *
import tempfile, os, shutil
from gppylib.commands.base import CommandResult
import logging
import os
import shutil
import StringIO
import tempfile
from gppylib.commands import base
from mock import patch, MagicMock, Mock
from gppylib.operations.buildMirrorSegments import GpMirrorListToBuild
from gppylib.operations.startSegments import StartSegmentsResult
......@@ -27,8 +31,8 @@ class buildMirrorSegmentsTestCase(GpTestCase):
self.buildMirrorSegs = GpMirrorListToBuild(
toBuild = [],
pool = None,
quiet = True,
pool = None,
quiet = True,
parallelDegree = 0,
logger=self.logger
)
......@@ -41,7 +45,7 @@ class buildMirrorSegmentsTestCase(GpTestCase):
expected_output = []
segs = self.buildMirrorSegs._get_running_postgres_segments(toBuild)
self.assertEquals(segs, expected_output)
@patch('gppylib.operations.buildMirrorSegments.get_pid_from_remotehost')
@patch('gppylib.operations.buildMirrorSegments.is_pid_postmaster', return_value=True)
@patch('gppylib.operations.buildMirrorSegments.check_pid_on_remotehost', return_value=True)
......@@ -89,14 +93,14 @@ class buildMirrorSegmentsTestCase(GpTestCase):
self.assertEquals(segs, expected_output)
@patch('gppylib.commands.base.Command.run')
@patch('gppylib.commands.base.Command.get_results', return_value=CommandResult(rc=0, stdout='/tmp/seg0', stderr='', completed=True, halt=False))
@patch('gppylib.commands.base.Command.get_results', return_value=base.CommandResult(rc=0, stdout='/tmp/seg0', stderr='', completed=True, halt=False))
def test_dereference_remote_symlink_valid_symlink(self, mock1, mock2):
datadir = '/tmp/link/seg0'
host = 'h1'
self.assertEqual(self.buildMirrorSegs.dereference_remote_symlink(datadir, host), '/tmp/seg0')
@patch('gppylib.commands.base.Command.run')
@patch('gppylib.commands.base.Command.get_results', return_value=CommandResult(rc=1, stdout='', stderr='', completed=True, halt=False))
@patch('gppylib.commands.base.Command.get_results', return_value=base.CommandResult(rc=1, stdout='', stderr='', completed=True, halt=False))
def test_dereference_remote_symlink_unable_to_determine_symlink(self, mock1, mock2):
datadir = '/tmp/seg0'
host = 'h1'
......@@ -174,5 +178,104 @@ class buildMirrorSegmentsTestCase(GpTestCase):
with self.assertRaisesRegexp(Exception, r"Segment dbid's 2 and 1 on host samehost cannot have the same data directory '/data'"):
self.buildMirrorSegs.checkForPortAndDirectoryConflicts(gpArray)
class SegmentProgressTestCase(GpTestCase):
"""
Test case for GpMirrorListToBuild._join_and_show_segment_progress().
"""
def setUp(self):
self.pool = Mock(spec=base.WorkerPool)
self.buildMirrorSegs = GpMirrorListToBuild(
toBuild=[],
pool=self.pool,
quiet=True,
parallelDegree=0,
logger=Mock(spec=logging.Logger)
)
def test_command_output_is_displayed_once_after_worker_pool_completes(self):
cmd = Mock(spec=base.Command)
cmd.remoteHost = 'localhost'
cmd.dbid = 2
cmd.get_results.return_value.stdout = "string 1\n"
cmd2 = Mock(spec=base.Command)
cmd2.remoteHost = 'host2'
cmd2.dbid = 4
cmd2.get_results.return_value.stdout = "string 2\n"
outfile = StringIO.StringIO()
self.pool.join.return_value = True
self.buildMirrorSegs._join_and_show_segment_progress([cmd, cmd2], outfile=outfile)
results = outfile.getvalue()
self.assertEqual(results, (
'localhost (dbid 2): string 1\n'
'host2 (dbid 4): string 2\n'
))
def test_command_output_is_displayed_once_for_every_blocked_join(self):
cmd = Mock(spec=base.Command)
cmd.remoteHost = 'localhost'
cmd.dbid = 2
cmd.get_results.side_effect = [Mock(stdout="string 1"), Mock(stdout="string 2")]
outfile = StringIO.StringIO()
self.pool.join.side_effect = [False, True]
self.buildMirrorSegs._join_and_show_segment_progress([cmd], outfile=outfile)
results = outfile.getvalue()
self.assertEqual(results, (
'localhost (dbid 2): string 1\n'
'localhost (dbid 2): string 2\n'
))
def test_inplace_display_uses_ansi_escapes_to_overwrite_previous_output(self):
cmd = Mock(spec=base.Command)
cmd.remoteHost = 'localhost'
cmd.dbid = 2
cmd.get_results.side_effect = [Mock(stdout="string 1"), Mock(stdout="string 2")]
cmd2 = Mock(spec=base.Command)
cmd2.remoteHost = 'host2'
cmd2.dbid = 4
cmd2.get_results.side_effect = [Mock(stdout="string 3"), Mock(stdout="string 4")]
outfile = StringIO.StringIO()
self.pool.join.side_effect = [False, True]
self.buildMirrorSegs._join_and_show_segment_progress([cmd, cmd2], inplace=True, outfile=outfile)
results = outfile.getvalue()
self.assertEqual(results, (
'localhost (dbid 2): string 1\x1b[K\n'
'host2 (dbid 4): string 3\x1b[K\n'
'\x1b[2A'
'localhost (dbid 2): string 2\x1b[K\n'
'host2 (dbid 4): string 4\x1b[K\n'
))
def test_errors_during_command_execution_are_displayed(self):
cmd = Mock(spec=base.Command)
cmd.remoteHost = 'localhost'
cmd.dbid = 2
cmd.get_results.return_value.stderr = "some error\n"
cmd.run.side_effect = base.ExecutionError("Some exception", cmd)
cmd2 = Mock(spec=base.Command)
cmd2.remoteHost = 'host2'
cmd2.dbid = 4
cmd2.get_results.return_value.stderr = ''
cmd2.run.side_effect = base.ExecutionError("Some exception", cmd2)
outfile = StringIO.StringIO()
self.pool.join.return_value = True
self.buildMirrorSegs._join_and_show_segment_progress([cmd, cmd2], outfile=outfile)
results = outfile.getvalue()
self.assertEqual(results, (
'localhost (dbid 2): some error\n'
'host2 (dbid 4): \n'
))
if __name__ == '__main__':
run_tests()
......@@ -104,9 +104,10 @@ class GpRecoversegTestCase(GpTestCase):
options = Options()
options.masterDataDirectory = self.temp_dir
options.spareDataDirectoryFile = self.config_file_path
options.showProgressInplace = True
# import HERE so that patches are already in place!
from programs.clsRecoverSegment import GpRecoverSegmentProgram
from gppylib.programs.clsRecoverSegment import GpRecoverSegmentProgram
self.subject = GpRecoverSegmentProgram(options)
self.subject.logger = Mock(spec=['log', 'warn', 'info', 'debug', 'error', 'warning', 'fatal'])
......@@ -176,8 +177,9 @@ class GpRecoversegTestCase(GpTestCase):
options.masterDataDirectory = self.temp_dir
options.rebalanceSegments = True
options.spareDataDirectoryFile = None
options.showProgressInplace = True
# import HERE so that patches are already in place!
from programs.clsRecoverSegment import GpRecoverSegmentProgram
from gppylib.programs.clsRecoverSegment import GpRecoverSegmentProgram
self.subject = GpRecoverSegmentProgram(options)
self.subject.logger = Mock(spec=['log', 'warn', 'info', 'debug', 'error', 'warning', 'fatal'])
......@@ -195,8 +197,9 @@ class GpRecoversegTestCase(GpTestCase):
options.masterDataDirectory = self.temp_dir
options.rebalanceSegments = True
options.spareDataDirectoryFile = None
options.showProgressInplace = True
# import HERE so that patches are already in place!
from programs.clsRecoverSegment import GpRecoverSegmentProgram
from gppylib.programs.clsRecoverSegment import GpRecoverSegmentProgram
self.subject = GpRecoverSegmentProgram(options)
self.subject.logger = Mock(spec=['log', 'warn', 'info', 'debug', 'error', 'warning', 'fatal'])
......@@ -213,8 +216,9 @@ class GpRecoversegTestCase(GpTestCase):
options = Options()
options.masterDataDirectory = self.temp_dir
options.spareDataDirectoryFile = None
options.showProgressInplace = True
# import HERE so that patches are already in place!
from programs.clsRecoverSegment import GpRecoverSegmentProgram
from gppylib.programs.clsRecoverSegment import GpRecoverSegmentProgram
self.subject = GpRecoverSegmentProgram(options)
self.subject.logger = Mock(spec=['log', 'warn', 'info', 'debug', 'error', 'warning', 'fatal'])
self.mock_get_mirrors_to_build.side_effect = self._get_test_mirrors
......@@ -235,8 +239,9 @@ class GpRecoversegTestCase(GpTestCase):
options = Options()
options.masterDataDirectory = self.temp_dir
options.spareDataDirectoryFile = None
options.showProgressInplace = True
# import HERE so that patches are already in place!
from programs.clsRecoverSegment import GpRecoverSegmentProgram
from gppylib.programs.clsRecoverSegment import GpRecoverSegmentProgram
self.subject = GpRecoverSegmentProgram(options)
self.subject.logger = Mock(spec=['log', 'warn', 'info', 'debug', 'error', 'warning', 'fatal'])
self.mock_get_mirrors_to_build.side_effect = self._get_test_mirrors
......
......@@ -40,7 +40,7 @@ class ValidationException(Exception):
class ConfExpSegCmd(Command):
def __init__(self, name, cmdstr, datadir, port, dbid, contentid, newseg, tarfile, useLighterDirectoryValidation, isPrimary,
syncWithSegmentHostname, syncWithSegmentPort, verbose, validationOnly, forceoverwrite, replicationSlotName):
syncWithSegmentHostname, syncWithSegmentPort, verbose, validationOnly, forceoverwrite, replicationSlotName, logfileDirectory, progressFile):
"""
@param useLighterDirectoryValidation if True then we don't require an empty directory; we just require that
database stuff is not there.
......@@ -57,7 +57,8 @@ class ConfExpSegCmd(Command):
self.syncWithSegmentHostname = syncWithSegmentHostname
self.syncWithSegmentPort = syncWithSegmentPort
self.replicationSlotName = replicationSlotName
self.logfileDirectory = logfileDirectory
self.progressFile = progressFile
#
# validationOnly if True then we validate directories and other simple things only; we don't
# actually configure the segment
......@@ -122,11 +123,15 @@ class ConfExpSegCmd(Command):
return
if not self.isPrimary:
# Log pg_basebackup output to the same directory as gpconfigurenewsegment
pgbasebackup_start_time = datetime.datetime.today().strftime('%Y%m%d_%H%M%S')
pgbasebackup_progress_temp_file = '%s/pg_basebackup.%s.dbid%s.out' % (gplog.get_logger_dir(),
pgbasebackup_start_time,
self.dbid)
# If the caller does not specify a pg_basebackup
# progress file, then create a temporary one and handle
# its deletion upon success.
shouldDeleteProgressFile = False
if not self.progressFile:
shouldDeleteProgressFile = True
self.progressFile = '%s/pg_basebackup.%s.dbid%s.out' % (gplog.get_logger_dir(),
datetime.datetime.today().strftime('%Y%m%d_%H%M%S'),
self.dbid)
# Create a mirror based on the primary
cmd = PgBaseBackup(pgdata=self.datadir,
host=self.syncWithSegmentHostname,
......@@ -134,13 +139,13 @@ class ConfExpSegCmd(Command):
replication_slot_name=self.replicationSlotName,
forceoverwrite=self.forceoverwrite,
target_gp_dbid=self.dbid,
logfile=pgbasebackup_progress_temp_file)
logfile=self.progressFile)
try:
logger.info("Running pg_basebackup with progress output temporarily in %s" % pgbasebackup_progress_temp_file)
logger.info("Running pg_basebackup with progress output temporarily in %s" % self.progressFile)
cmd.run(validateAfter=True)
self.set_results(CommandResult(0, '', '', True, False))
os.remove(pgbasebackup_progress_temp_file)
if shouldDeleteProgressFile:
os.remove(self.progressFile)
except Exception, e:
self.set_results(CommandResult(1,'',e,True,False))
raise
......@@ -319,6 +324,7 @@ try:
# they will be junk data passed through the config.
primaryHostName = seg[6]
primarySegmentPort = int(seg[7])
progressFile = seg[8]
cmd = ConfExpSegCmd( name = 'Configure segment directory'
, cmdstr = ' '.join(sys.argv)
......@@ -336,6 +342,8 @@ try:
, validationOnly = options.validationOnly
, forceoverwrite = options.forceoverwrite
, replicationSlotName = options.replicationSlotName
, logfileDirectory = options.logfileDirectory
, progressFile= progressFile
)
pool.addCommand(cmd)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册