提交 4534d171 编写于 作者: S Shoaib Lari 提交者: Nadeem Ghani

gpexpand: Make it work with segwalrep.

The gpexpand utility and associated code are modified to work with the WALREP
code.  Previously, gpexpand only worked with the primaries and relied on Filerep
to populate the mirrors. We are changing gpexpand such that it initializes the
mirrors using pg_basebackup and set them up for WAL replication.

Also, since the WALREP branch removed Filespaces, so we have also done the same
and replaced references to Filespaces by the Data Directory of the segments.

Author: Marbin Tan <mtan@pivotal.io>
Author: Shoaib Lari <slari@pivotal.io>
Author: Nadeem Ghani <nghani@pivotal.io>
上级 373760a5
......@@ -2558,16 +2558,15 @@ if (1)
# to get file system location and port number
$psql_str .= " -c \' ".
"select f.fselocation as fselocation, " .
"g.port " .
"from pg_filespace_entry f, gp_segment_configuration g " .
"where f.fsedbid = 1 and g.dbid = 1 \' ";
"select port, datadir " .
"from gp_segment_configuration " .
"where dbid = 1 \' ";
my $tabdef = `$psql_str`;
my $fseloc = tablelizer($tabdef);
my $port_and_datadir = tablelizer($tabdef);
pod2usage(-msg => "\nERROR: Could not connect to database. Please check connect string.\n\n",
-exitstatus => 1) unless (defined($fseloc));
-exitstatus => 1) unless (defined($port_and_datadir));
my $strtime = ctime();
$strtime =~ s/ /_/g;
$strtime =~ s/:/-/g;
......@@ -2576,7 +2575,7 @@ if (1)
print "Timestamp used in filenames for this run \"", $fselochsh, "\"\n\n";
# XXX XXX: port number!
$glob_port = $fseloc->[0]->{port};
$glob_port = $port_and_datadir->[0]->{port};
$psql_str = $glob_psql_str;
......@@ -2637,7 +2636,7 @@ if (1)
$psql_str .= $glob_connect if (defined($glob_connect));
my $arr = `pg_controldata $fseloc->[0]->{fselocation}`;
my $arr = `pg_controldata $port_and_datadir->[0]->{datadir}`;
my @lines = split(/\n/, $arr);
my %pairs = map { my ($key, $value) = split(/:\s+/); {$key => $value} } @lines;
......@@ -2646,7 +2645,7 @@ if (1)
$psql_str .= " -c \' checkpoint \'" ;
$tabdef = `$psql_str`;
$arr = `pg_controldata $fseloc->[0]->{fselocation}`;
$arr = `pg_controldata $port_and_datadir->[0]->{datadir}`;
@lines = split(/\n/, $arr);
my %pairs_post = map { my ($key, $value) = split(/:\s+/); {$key => $value} } @lines;
......@@ -2665,11 +2664,11 @@ if (1)
# Fetch the latest clog file and copy it at temp location to compare at end of tool
# if any diference is found means IO was performed while tool was running, which invalidates the results.
my @clog_list = `ls -t $fseloc->[0]->{fselocation}/pg_clog`;
my @clog_list = `ls -t $port_and_datadir->[0]->{datadir}/pg_clog`;
chomp($clog_list[0]);
my $prev_clog_file=$clog_list[0];
my $ret = `cp $fseloc->[0]->{fselocation}/pg_clog/${prev_clog_file} /tmp/${fselochsh}_clog.${prev_clog_file}`;
warn "Failed to copy file \"$fseloc->[0]->{fselocation}/pg_clog/${prev_clog_file}\", ERROR: $ret" if ($ret);
my $ret = `cp $port_and_datadir->[0]->{datadir}/pg_clog/${prev_clog_file} /tmp/${fselochsh}_clog.${prev_clog_file}`;
warn "Failed to copy file \"$port_and_datadir->[0]->{datadir}/pg_clog/${prev_clog_file}\", ERROR: $ret" if ($ret);
my $do_mm_skip = 0;
# MPP-10881: separate validation check for standby master
......@@ -3320,12 +3319,12 @@ pg_xlog
print "Please upload the same along with OUTPUT and FIX file while opening bug ticket.\n";
printdivider();
@clog_list = `ls -t $fseloc->[0]->{fselocation}/pg_clog`;
@clog_list = `ls -t $port_and_datadir->[0]->{datadir}/pg_clog`;
chomp($clog_list[0]);
if (($prev_clog_file ne $clog_list[0]) or
(length(`$glob_diff -b $fseloc->[0]->{fselocation}/pg_clog/$clog_list[0] /tmp/${fselochsh}_clog.$prev_clog_file`)))
(length(`$glob_diff -b $port_and_datadir->[0]->{datadir}/pg_clog/$clog_list[0] /tmp/${fselochsh}_clog.$prev_clog_file`)))
{
print "PreClogName=\"/tmp/${fselochsh}_clog.$prev_clog_file\", PostClogName=\"$fseloc->[0]->{fselocation}/pg_clog/$clog_list[0]\"";
print "PreClogName=\"/tmp/${fselochsh}_clog.$prev_clog_file\", PostClogName=\"$port_and_datadir->[0]->{datadir}/pg_clog/$clog_list[0]\"";
print "\nNOTE: Write happened to CLOG file while tool was running, indicating IO was perfomed on DB";
print "\nSince this tool requires no IO activity, above results reported cannot be trusted.\n";
printdivider();
......
......@@ -238,13 +238,12 @@ def delete_cluster(options):
logger.info('Deleting segments and removing data directories...')
for segdb in segments:
filespaces = segdb.getSegmentFilespaces().items()
for (oid, filespace) in filespaces:
logger.debug('Queueing up command to remove %s:%s' % (segdb.getSegmentHostName(),
filespace))
cmd = unix.RemoveDirectory('remove fs dir', filespace, ctxt=base.REMOTE,
remoteHost=segdb.getSegmentHostName())
pool.addCommand(cmd)
segmentDataDirectory = segdb.getSegmentDataDirectory()
logger.debug('Queueing up command to remove %s:%s' % (segdb.getSegmentHostName(),
segmentDataDirectory))
cmd = unix.RemoveDirectory('remove data dir', segmentDataDirectory, ctxt=base.REMOTE,
remoteHost=segdb.getSegmentHostName())
pool.addCommand(cmd)
logger.info('Waiting for worker threads to complete...')
pool.join()
......
......@@ -756,6 +756,44 @@ class SegmentTemplate:
self.pool.join()
self.pool.check_results()
def _start_new_primary_segments(self):
newSegments = self.gparray.getExpansionSegDbList()
for seg in newSegments:
if seg.isSegmentMirror() == True:
continue
""" Start all the new segments in utilty mode. """
segStartCmd = SegmentStart(
name="Starting new segment dbid %s on host %s." % (str(seg.getSegmentDbId()), seg.getSegmentHostName())
, gpdb=seg
, numContentsInCluster=0 # Starting seg on it's own.
, era=None
, mirrormode=MIRROR_MODE_MIRRORLESS
, utilityMode=True
, ctxt=REMOTE
, remoteHost=seg.getSegmentHostName()
, noWait=False
, timeout=SEGMENT_TIMEOUT_DEFAULT)
self.pool.addCommand(segStartCmd)
self.pool.join()
self.pool.check_results()
def _stop_new_primary_segments(self):
newSegments = self.gparray.getExpansionSegDbList()
for seg in newSegments:
if seg.isSegmentMirror() == True:
continue
segStopCmd = SegmentStop(
name="Stopping new segment dbid %s on host %s." % (str(seg.getSegmentDbId), seg.getSegmentHostName())
, dataDir=seg.getSegmentDataDirectory()
, mode='smart'
, nowait=False
, ctxt=REMOTE
, remoteHost=seg.getSegmentHostName()
)
self.pool.addCommand(segStopCmd)
self.pool.join()
self.pool.check_results()
def _configure_new_segments(self):
"""Configures new segments. This includes modifying the postgresql.conf file
and setting up the gp_id table"""
......@@ -773,9 +811,19 @@ class SegmentTemplate:
self.pool.join()
self.pool.check_results()
self._start_new_primary_segments()
self.logger.info('Configuring new segments (mirror)')
new_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(self.gparray.getExpansionSegDbList(),
primaryMirror='mirror')
mirrorsList = []
for segPair in self.gparray.getExpansionSegPairList():
mirror = segPair.mirrorDB
mirror.primaryHostname = segPair.primaryDB.getSegmentHostName()
mirror.primarySegmentPort = segPair.primaryDB.getSegmentPort()
mirrorsList.append(mirror)
new_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(mirrorsList, primaryMirror='mirror')
for host in iter(new_segment_info):
segCfgCmd = ConfigureNewSegment('gpexpand configure new segments', new_segment_info[host],
tarFile=self.schema_tar_file, newSegments=True,
......@@ -786,6 +834,8 @@ class SegmentTemplate:
self.pool.join()
self.pool.check_results()
self._stop_new_primary_segments()
def _fixup_template(self):
"""Copies postgresql.conf and pg_hba.conf files from a valid segment on the system.
Then modifies the template copy of pg_hba.conf"""
......
......@@ -1101,9 +1101,9 @@ class ConfigureNewSegment(Command):
: if primary then 'true' else 'false'
: if target is reused location then 'true' else 'false'
: <segment dbid>
"""
result = {}
for segIndex, seg in enumerate(segments):
if primaryMirror == 'primary' and seg.isSegmentPrimary() == False:
continue
......@@ -1115,16 +1115,37 @@ class ConfigureNewSegment(Command):
else:
result[hostname] = ''
isTargetReusedLocation = isTargetReusedLocationArr and isTargetReusedLocationArr[segIndex]
# only a mirror segment has these two attributes
# added on the fly, by callers
primaryHostname = getattr(seg, 'primaryHostname', "")
primarySegmentPort = getattr(seg, 'primarySegmentPort', "-1")
result[hostname] += '%s:%d:%s:%s:%d' % (seg.getSegmentDataDirectory(), seg.getSegmentPort(),
"true" if seg.isSegmentPrimary(current_role=True) else "false",
"true" if isTargetReusedLocation else "false",
seg.getSegmentDbId()
isTargetReusedLocation = isTargetReusedLocationArr and isTargetReusedLocationArr[segIndex]
result[hostname] += '%s:%d:%s:%s:%d:%s:%s' % (seg.getSegmentDataDirectory(), seg.getSegmentPort(),
"true" if seg.isSegmentPrimary(current_role=True) else "false",
"true" if isTargetReusedLocation else "false",
seg.getSegmentDbId(),
primaryHostname,
primarySegmentPort
)
return result
def buildSegInfo(seg, isTargetReusedLocation="false", primarySegment=None):
# if primaryMirror == 'primary' and seg.isSegmentPrimary() == False:
# continue
# elif primaryMirror == 'mirror' and seg.isSegmentPrimary() == True:
# continue
result = '%s:%d:%s:%s:%d:%s' % (seg.getSegmentDataDirectory(),
seg.getSegmentPort(),
"true" if seg.isSegmentPrimary(current_role=True) else "false",
isTargetReusedLocation,
seg.getSegmentDbId(),
"notapplicableonprimary",
"notapplicableonprimary",
#segPrimary.getSegmentHostName() if seg.isSegmentMirror() else "notapplicableonprimary"
)
return result
#-----------------------------------------------
class GpVersion(Command):
......
......@@ -1199,6 +1199,12 @@ class GpArray:
dbs.extend(segPair.get_dbs())
return dbs
# --------------------------------------------------------------------
def getExpansionSegPairList(self):
"""Returns a list of all SegmentPair objects that make up the new segments
of an expansion"""
return self.expansionSegmentPairs
# --------------------------------------------------------------------
def getSegmentContainingDb(self, db):
for segPair in self.segmentPairs:
......
......@@ -203,13 +203,6 @@ class GpConfigurationProviderUsingGpdbCatalog(GpConfigurationProvider) :
# gp_add_segment_primary() will update the mode and status.
# get the newly added segment's content id
# MPP-12393 et al WARNING: there is an unusual side effect going on here.
# Although gp_add_segment_primary() executed by __callSegmentAdd() above returns
# the dbId of the new row in gp_segment_configuration, the following
# select from gp_segment_configuration can return 0 rows if the updates
# done by __updateSegmentModeStatus()
# are not done first. Don't change the order of these operations unless you
# understand why gp_add_segment_primary() behaves as it does.
sql = "select content from pg_catalog.gp_segment_configuration where dbId = %s" % self.__toSqlIntValue(seg.getSegmentDbId())
logger.debug(sql)
sqlResult = self.__fetchSingleOutputRow(conn, sql)
......
......@@ -5,7 +5,7 @@ import tempfile
from StringIO import StringIO
from commands.base import CommandResult
from commands.gp import GpReadConfig
from commands.gp import GpReadConfig, ConfigureNewSegment
from gparray import Segment, GpArray, SegmentPair
import shutil
from mock import *
......
......@@ -17,7 +17,7 @@ from gppylib.gp_dbid import writeGpDbidFile
from time import sleep
from gppylib.operations.buildMirrorSegments import gDatabaseDirectories, gDatabaseFiles
description = ("""
description = ("""
Configure segment directories for installation into a pre-existing GPDB array.
......@@ -41,7 +41,7 @@ class ValidationException(Exception):
class ConfExpSegCmd(Command):
def __init__(self, name, cmdstr, datadir, port, dbid, newseg, tarfile, useLighterDirectoryValidation, isPrimary,
filespaceOids, filespaceDirs, verbose, validationOnly, writeGpDbidFileOnly):
syncWithSegmentHostname, syncWithSegmentPort, verbose, validationOnly, writeGpDbidFileOnly):
"""
@param useLighterDirectoryValidation if True then we don't require an empty directory; we just require that
database stuff is not there.
......@@ -53,9 +53,9 @@ class ConfExpSegCmd(Command):
self.tarfile = tarfile
self.verbose = verbose
self.useLighterDirectoryValidation = useLighterDirectoryValidation
self.filespaceOids = filespaceOids
self.filespaceDirs = filespaceDirs
self.isPrimary = isPrimary
self.syncWithSegmentHostname = syncWithSegmentHostname
self.syncWithSegmentPort = syncWithSegmentPort
#
# validationOnly if True then we validate directories and other simple things only; we don't
......@@ -66,39 +66,31 @@ class ConfExpSegCmd(Command):
self.writeGpDbidFileOnly = writeGpDbidFileOnly
Command.__init__(self, name, cmdstr)
def validatePath(self, path, isSystemFilespaceDir):
def validatePath(self, path):
"""
Raises ValidationException when a validation problem is detected
"""
if not os.path.exists(os.path.dirname(path)):
logger.info("gpconfigurenewsegment:validatePath path %s does not exist" % path)
raise ValidationException("Parent directory for %s directory '%s' does not exist" %
( "system data" if isSystemFilespaceDir else "filespace", path) )
("system data", path))
if not os.path.exists(path):
#
# dir doesn't exist is okay -- other scripts, or filerep mirroring code, will create it
#
return
if self.useLighterDirectoryValidation:
if isSystemFilespaceDir:
# validate that we don't contain database directories or files
for toCheck in gDatabaseDirectories:
if toCheck != "pg_log": # it's okay to have pg_log -- to save old logs around to look at
if os.path.exists(os.path.join(path, toCheck)):
raise ValidationException("Segment directory '%s' contains directory %s but should not!" %
(path, toCheck))
for toCheck in gDatabaseFiles:
# validate that we don't contain database directories or files
for toCheck in gDatabaseDirectories:
if toCheck != "pg_log": # it's okay to have pg_log -- to save old logs around to look at
if os.path.exists(os.path.join(path, toCheck)):
raise ValidationException("Segment directory '%s' contains file %s but should not!" %
raise ValidationException("Segment directory '%s' contains directory %s but should not!" %
(path, toCheck))
else:
for name in os.listdir( path ):
if name[0] in "0123456789": # the database files here will have
raise ValidationException("Filespace directory contains invalid file(s): '%s'" % path)
for toCheck in gDatabaseFiles:
if os.path.exists(os.path.join(path, toCheck)):
raise ValidationException("Segment directory '%s' contains file %s but should not!" %
(path, toCheck))
else:
# it better be empty
if len(os.listdir(path)) != 0:
......@@ -109,27 +101,6 @@ class ConfExpSegCmd(Command):
os.chmod(path, 0700)
else:
os.mkdir(path, 0700)
def fixupFilespaces(self):
"""
This method will take file spaces stored in the system data directory and put them in their proper locations.
"""
if self.filespaceOids == None:
return
try:
logger.info("copy filespaces to their locations")
# only on primary do we create it -- otherwise, mirroring code creates it on replay
filespaceDataDir = self.datadir + "/" + DESTINATION_FILE_SPACES_DIRECTORY
i = 0
for i in range(len(self.filespaceOids)):
sourceDir = filespaceDataDir + "/" + str(self.filespaceOids[i])
targetDir = self.filespaceDirs[i]
cpCmd = unix.LocalDirCopy("copy filespace from %s to %s" % (sourceDir, targetDir), sourceDir, targetDir)
cpCmd.run(validateAfter=True)
unix.RemoveDirectory.local("remove filespace template after copy to proper location", filespaceDataDir)
except Exception, e:
self.set_results(CommandResult(1, '', e, True, False))
raise
def run(self):
try:
......@@ -141,9 +112,7 @@ class ConfExpSegCmd(Command):
# make directories, extract template update postgresql.conf
logger.info("Validate data directories for new segment")
try:
self.validatePath(self.datadir, True)
for path in self.filespaceDirs:
self.validatePath(path, False)
self.validatePath(self.datadir)
except ValidationException, e:
msg = "for segment with port %s: %s" % (self.port, e.getMessage())
self.set_results(CommandResult(1, '', msg, True, False))
......@@ -153,16 +122,37 @@ class ConfExpSegCmd(Command):
raise
if self.validationOnly:
self.set_results(CommandResult(0, '', '', True, False))
self.set_results(CommandResult(0, '', '', True, False))
return
if not self.isPrimary:
# Create a mirror based on the primary
# We exclude certain unnecessary directories from being copied as they will greatly
# slow down the speed of gpinitstandby if containing a lot of data
cmd_str = ' '.join(['pg_basebackup',
'-x', '-R',
'-c', 'fast',
'-E', './pg_log',
'-E', './db_dumps',
'-E', './gpperfmon/data',
'-E', './gpperfmon/logs',
'-D', self.datadir,
'-h', self.syncWithSegmentHostname,
'-p', str(self.syncWithSegmentPort)])
cmd = Command('pg_basebackup', cmd_str)
try:
cmd.run(validateAfter=True)
self.set_results(CommandResult(0, '', '', True, False))
except Exception, e:
self.set_results(CommandResult(1,'',e,True,False))
raise
logger.info("ran pg_backbackup: %s" % cmd.cmdStr)
return
logger.info("Create or update data directories for new segment")
try:
self.makeOrUpdatePathAsNeeded(self.datadir)
if self.isPrimary:
# only on primary do we create it -- otherwise, mirroring code creates it on replay
for path in self.filespaceDirs:
self.makeOrUpdatePathAsNeeded(path)
except Exception, e:
self.set_results(CommandResult(1,'',e,True,False))
raise
......@@ -177,10 +167,6 @@ class ConfExpSegCmd(Command):
logger.error(extractTarCmd.get_results())
raise
if self.isPrimary:
# only on primary do we create it -- otherwise, mirroring code creates it on replay
self.fixupFilespaces()
logger.info("create gp_dbid file for segment")
writeGpDbidFile(self.datadir, self.dbid, logger=gplog.get_logger_if_verbose())
......@@ -209,7 +195,7 @@ class ConfExpSegCmd(Command):
stopCmd.run(validateAfter=True)
except:
pass
except Exception, e:
self.set_results(CommandResult(1, '', e, True, False))
if self.verbose:
......@@ -229,7 +215,7 @@ def getOidDirLists(oidDirs):
dirList.append(oidDirs[i])
i = i + 1
return oidList, dirList
def parseargs():
parser = OptParser(option_class=OptChecker,
......@@ -237,9 +223,9 @@ def parseargs():
version='%prog version $Revision: $')
parser.set_usage('%prog is a utility script used by gpexpand, gprecoverseg, and gpaddmirrors and is not intended to be run separately.')
parser.remove_option('-h')
parser.add_option('-v','--verbose', action='store_true', help='debug output.')
parser.add_option('-c', '--confinfo', type='string')
parser.add_option('-c', '--confinfo', type='string')
parser.add_option('-t', '--tarfile', type='string')
parser.add_option('-n', '--newsegments', action='store_true')
parser.add_option('-B', '--batch-size', type='int', default=DEFAULT_BATCH_SIZE, metavar='<batch_size>')
......@@ -247,7 +233,7 @@ def parseargs():
parser.add_option("-W", "--write-gpid-file-only", dest="writeGpidFileOnly", action='store_true', default=False)
parser.set_defaults(verbose=False, filters=[], slice=(None, None))
# Parse the command line arguments
(options, args) = parser.parse_args()
......@@ -297,18 +283,22 @@ try:
(options, args, seg_info) = parseargs()
if options.verbose:
gplog.enable_verbose_logging()
logger.info("Starting gpconfigurenewsegment with args: %s" % ' '.join(sys.argv[1:]))
pool = WorkerPool(numWorkers=options.batch_size)
for seg in seg_info:
dataDir = seg[0]
port = seg[1]
isPrimary = seg[2] == "true"
directoryValidationLevel = seg[3] == "true"
dbid = int(seg[4])
filespaceOids, filespaceDirs = getOidDirLists(seg[5:])
# These variables should not be used if it's a primary
# they will be junk data passed through the config.
primaryHostName = seg[5]
primarySegmentPort = int(seg[6])
cmd = ConfExpSegCmd( name = 'Configure segment directory'
, cmdstr = ' '.join(sys.argv)
......@@ -319,8 +309,8 @@ try:
, tarfile = options.tarfile
, useLighterDirectoryValidation = directoryValidationLevel
, isPrimary = isPrimary
, filespaceOids = filespaceOids
, filespaceDirs = filespaceDirs
, syncWithSegmentHostname = primaryHostName
, syncWithSegmentPort = primarySegmentPort
, verbose = options.verbose
, validationOnly = options.validationOnly
, writeGpDbidFileOnly = options.writeGpidFileOnly
......
......@@ -566,7 +566,7 @@ gp_add_segment_mirror(PG_FUNCTION_ARGS)
new.db.dbid = get_availableDbId();
new.db.mode = GP_SEGMENT_CONFIGURATION_MODE_NOTINSYNC;
new.db.status = GP_SEGMENT_CONFIGURATION_STATUS_DOWN;
new.db.status = GP_SEGMENT_CONFIGURATION_STATUS_UP;
new.db.role = GP_SEGMENT_CONFIGURATION_ROLE_MIRROR;
new.db.preferred_role = GP_SEGMENT_CONFIGURATION_ROLE_MIRROR;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册