#!/usr/bin/env python # -*- coding: utf-8 -*- ''' gpmigrator [options] old-gphome new-gphome Options: -h, --help show this help message and exit -v, --version show the program's version number and exit -q quiet mode -d DIRECTORY the master host data directory -l DIRECTORY log file directory -R revert a previous gpmigrator run --debug debugging information ''' #============================================================ import sys, os # Python version 2.6.2 is expected, must be between 2.5-3.0 if sys.version_info < (2, 5, 0) or sys.version_info >= (3, 0, 0): sys.stderr.write("Error: %s is supported on Python versions 2.5 or greater\n" "Please upgrade python installed on this machine." % os.path.split(__file__)[-1]) sys.exit(1) try: from gppylib.operations.gpMigratorUtil import * except ImportError, e: sys.exit('Error: unable to import module: ' + str(e)) libdir = os.path.join(sys.path[0], 'lib/') logger = get_default_logger() EXECNAME = os.path.split(__file__)[-1] MIGRATIONUSER = 'gpmigrator' LOCKEXT = '.gpmigrator_orig' WORKDIR = 'gpmigrator' BACKUPDIR = 'backup' UPGRADEDIR = 'upgrade' PARALLELISM = 16 #============================================================ __version__ = '$Revision$' #============================================================ def makeCommand(oldHome, newHome, oldVersion, newVersion, command, pid, dataDirectories, filespaces, option1, option2, method, isUpgrade, isRevert, mirrors): # space separated list of directories datadirs = base64.urlsafe_b64encode(pickle.dumps(dataDirectories)) fse = base64.urlsafe_b64encode(pickle.dumps(filespaces)) hasMirrors = len(mirrors) > 0 cmd = [ EXECNAME, oldHome, newHome, '--internal-oldversion=' + urllib.quote(str(oldVersion)), '--internal-newversion=' + urllib.quote(str(newVersion)), '--internal-command=' + str(command), '--internal-pid=' + str(pid), '--internal-dirs=' + urllib.quote(datadirs), '--internal-filespaces=' + urllib.quote(fse), '--internal-option=' + urllib.quote(str(option1)), '--internal-option2=' + urllib.quote(str(option2)), '--debug', '--quiet', ] if method: cmd.append('--internal-method='+ urllib.quote(str(method))) if isUpgrade: cmd.append('--internal-isupgrade') if isRevert: cmd.append('-R') if hasMirrors: cmd.append('--internal-mirrors') return cmd #============================================================ class GpControlData(base.Command): """ Get the control data for a specified directory. """ def __init__(self, name, directory, ctxt=base.LOCAL, remoteHost=None): self.controldata = None cmdStr = "$GPHOME/bin/pg_controldata %s" % directory base.Command.__init__(self, name, cmdStr, ctxt, remoteHost) def get_controldata(self): if not self.controldata: self.controldata = {} for line in self.results.stdout.split('\n'): try: (key, value) = line.split(':', 1) self.controldata[key] = value.strip() except: pass return self.controldata @staticmethod def local(name,directory): cmd=GpControlData(name,directory) cmd.run(validateAfter=True) return cmd.get_controldata() #============================================================ class GPUpgrade(GPUpgradeBase): ''' Greenplum Database Upgrade Utility ''' # INTERNAL command: # MASTER - default, run on master, [not included in list] # MKDIR - Create directories, run on every host # RMDIR - Remove directories, run on every host # CHKDIR - Check directories, run on every host # RESETXLOG - Syncronize xlogs # DEEPLINK - Recursive hardlink # EXTRACTCATFILES - build a list of catalog files to be copied # BUILDSKEL - build a skeleton copy of the old system # TRANSFORMCAT - apply catalog transformations # SETCATVERSION - set catalog version # EXTRACTAOSEGS - get ao seg info # GETHIGHESTOID - find the highest OID in the cluster commands = ['SETSTATE', 'MKDIR', 'RMDIR', 'CHKDIR', 'RESETXLOG', 'DEEPLINK', 'CHKDOWN', 'LOCKDOWN', 'UNLOCK', 'EXTRACTCATFILES', 'BUILDSKEL', 'TRANSFORMCAT', 'SETCATVERSION', 'EXTRACTAOSEGS', 'GETHIGHESTOID', 'TOUCHUPCONFIG', 'FIXCONFIG'] #------------------------------------------------------------ def __init__(self): ''' The most basic of initialization ''' super(GPUpgrade, self).__init__() self.resume = False self.mirrormode = None self.pid = os.getpid() # Process ID of the master # Non-master variables self.option2 = None # Argument passed to cmd # Environment and system info self.datadirs = None # Set of all data and mirror directories self.filespaces= None # Set of filespace directory (dboid as key) self.host = None # master hostname self.upgrade = None # True for upgrade, False for downgrade self.method = None # transform or dumprestore ? # Upgrade information and state self.quiet = False self.revert = False self.checkschema = True self.warnings = False self.gpperfmon = None self.oldversion = None self.newversion = None self.newmaster = None # directory: workdir/upgrade/gp-1 self.state = None # file: backup/state self.config = {} # gp_init_config for new db self.segments = [] # list of segments #------------------------------------------------------------ def Setup(self): ''' Basic initialization, separate from __init__ for exception handling purposes. ''' # GPHOME, PYTHONPATH must be setup properly # GPHOME must match the location of this file # The first PYTHONPATH must be based on the proper gphome gphome_bin = os.path.realpath(os.path.split(__file__)[0]) gphome = os.path.split(gphome_bin)[0] env_GPHOME = os.path.realpath(os.environ.get('GPHOME')) if (env_GPHOME != gphome): logger.fatal(" $GPHOME is set to %s which is not newhome" % env_GPHOME) logger.fatal(' source the greenplum.sh from the newhome to setup env ') raise UpgradeError('Initialization failed') pythonpath = os.path.join(gphome, "lib", "python") env_PYTHONPATH = os.path.realpath(os.environ.get('PYTHONPATH').split(':')[0]) if (env_PYTHONPATH != pythonpath): logger.fatal(' $PYTHONPATH is incorrect ') logger.fatal(' source the greenplum.sh from the newhome to setup env ') raise UpgradeError('Initialization failed') # This is the same path used by gpinitsystem self.path = '/usr/kerberos/bin:/usr/sfw/bin:/opt/sfw/bin' self.path += ':/usr/local/bin:/bin:/usr/bin:/sbin:/usr/sbin:/usr/ucb' self.path += ':/sw/bin' # Set defaults self.user = os.environ.get('USER') or os.environ.get('LOGNAME') self.host = self.RunCmd('hostname') self.masterdir = os.environ.get('MASTER_DATA_DIRECTORY') self.ParseInput() # Setup worker pool self.pool = base.WorkerPool(numWorkers=PARALLELISM); # Extract path from input master directory if self.cmd == 'MASTER': logger.info('Beginning upgrade') logger.info('Checking configuration') if not self.masterdir or len(self.masterdir) == 0: raise UpgradeError('MASTER_DATA_DIRECTORY is not defined') self.masterdir = self.masterdir.rstrip('/') # The configuration file conf = os.path.join(self.masterdir, 'postgresql.conf') # Simple function to look for settings in the conf file def getconf(x): conf_re = re.compile('^\s*%s\s*=\s*(\w+)' % x) try: conf_str = self.RunCmd('grep %s %s' % (x, conf)) except CmdError: conf_str = "" # grep returns errorcode on no match value = None for line in conf_str.split('\n'): match = conf_re.search(line) if match: value = match.group(1) return value # Find the port for this segment: self.masterport = getconf('port') if self.masterport == None: raise UpgradeError('Could not determine master port from ' + conf) self.masterport = int(self.masterport) # Determine if perfmon is enabled self.gpperfmon = getconf('gp_enable_gpperfmon') self.sock_dir = getconf('unix_socket_directory') if not self.sock_dir: self.sock_dir = '/tmp/' # Verify that (max_connections == superuser_reserved_connections) # max_conn = getconf('max_connections') # reserved = getconf('superuser_reserved_connections') masterpath, masterdir = os.path.split(self.masterdir) self.workdir = os.path.join(masterpath, WORKDIR) self.newmaster = os.path.join(self.workdir, UPGRADEDIR, masterdir) self.oldenv = self.SetupEnv(self.oldhome, self.masterdir) self.newenv = self.SetupEnv(self.newhome, self.newmaster) else: self.oldenv = self.SetupEnv(self.oldhome, None) self.newenv = self.SetupEnv(self.newhome, None) #------------------------------------------------------------ def ParseInput(self): ''' Parses and validates input to the script ''' try: parser = optparse.OptionParser(usage=(cli_help(EXECNAME) or __doc__), add_help_option=False) parser.add_option('-v', '--version', action='store_true', help=optparse.SUPPRESS_HELP) parser.add_option('-h', '--help', action='store_true', help=optparse.SUPPRESS_HELP) parser.add_option('-d', dest='directory', help=optparse.SUPPRESS_HELP) parser.add_option('-l', dest='logdir', help=optparse.SUPPRESS_HELP) parser.add_option('-q', '--quiet', action='store_true', help=optparse.SUPPRESS_HELP) parser.add_option('-R', dest='revert', action='store_true', help=optparse.SUPPRESS_HELP) parser.add_option('-c', '--check-only', dest='checkonly', action='store_true', help=optparse.SUPPRESS_HELP) parser.add_option('--debug', action='store_true', help=optparse.SUPPRESS_HELP) parser.add_option('--internal-disable-checkschema', dest='nocheckschema', action='store_true', help=optparse.SUPPRESS_HELP) parser.add_option('--internal-command', dest='command', help=optparse.SUPPRESS_HELP) parser.add_option('--internal-pid', dest='pid', help=optparse.SUPPRESS_HELP) parser.add_option('--internal-dirs', dest='datadirs', help=optparse.SUPPRESS_HELP) parser.add_option('--internal-filespaces', dest='fse', help=optparse.SUPPRESS_HELP) parser.add_option('--internal-option', dest='option', help=optparse.SUPPRESS_HELP) parser.add_option('--internal-option2', dest='option2', help=optparse.SUPPRESS_HELP) parser.add_option('--internal-method', dest='method', help=optparse.SUPPRESS_HELP) parser.add_option('--internal-oldversion', dest='oldversion', help=optparse.SUPPRESS_HELP) parser.add_option('--internal-newversion', dest='newversion', help=optparse.SUPPRESS_HELP) parser.add_option('--internal-isupgrade', dest='upgrade', action='store_true', help=optparse.SUPPRESS_HELP) parser.add_option('--internal-mirrors', dest='mirrors', action='store_true', help=optparse.SUPPRESS_HELP) parser.add_option('--skip-checkcat', dest='skipcheckcat', action='store_true', help=optparse.SUPPRESS_HELP) parser.add_option('--fault-injection', dest='faultinjection', type='int', help=optparse.SUPPRESS_HELP) (options, args) = parser.parse_args() if options.version: print EXECNAME + ' ' + __version__ sys.exit(0) if options.help: usage(EXECNAME) sys.exit(0) if len(args) != 2: usage(EXECNAME) msg = "incorrect number of arguments" if len(args) > 0: msg += ": %s" % str(args) parser.error(msg) except Exception, e: usage(EXECNAME) raise UpgradeError('Error parsing input: ' + str(e)) if options.revert: self.revert = True if options.checkonly: self.checkonly = True if options.directory: self.masterdir = options.directory if options.command: if options.command not in self.commands: parser.error('INVALID INTERNAL COMMAND: ' + options.command) self.cmd = options.command if options.pid: if self.cmd == 'MASTER': parser.error('INVALID INTERNAL COMMAND: ' + self.cmd) self.pid = int(options.pid) self.oldhome = args[0].rstrip('/') self.newhome = args[1].rstrip('/') if options.datadirs: self.datadirs = pickle.loads(base64.urlsafe_b64decode((urllib.unquote(options.datadirs)))) if options.fse: self.filespaces = pickle.loads(base64.urlsafe_b64decode((urllib.unquote(options.fse)))) if options.option: self.option = urllib.unquote(options.option) if options.option2: self.option2 = urllib.unquote(options.option2) if options.debug: self.debug = True enable_verbose_logging() if options.quiet: self.quiet = True quiet_stdout_logging() if options.logdir: self.logdir = options.logdir else: self.logdir = os.path.join(os.environ['HOME'], 'gpAdminLogs') # --internal_mirrors just indicates that the cluster has mirrors, # for simplicity we have self.mirrors still point to a list, but # its contents are not directly useful. if options.mirrors: self.mirrors = [True] try: setup_tool_logging(EXECNAME, unix.getLocalHostname(), unix.getUserName(), self.logdir) except OSError, e: logger.fatal('cannot log to %s: %s' % (self.logdir, str(e))) exit(1) if options.nocheckschema: self.checkschema = False if self.cmd != 'MASTER': self.oldversion = GpVersion(urllib.unquote(options.oldversion)) self.newversion = GpVersion(urllib.unquote(options.newversion)) self.method = options.method if options.upgrade: self.upgrade = True else: self.upgrade = False self.skipcheckcat = options.skipcheckcat self.faultinjection = options.faultinjection if self.cmd == 'MASTER': self.method = "transform" # Currently the only supported method today = date.today().strftime('%Y%m%d') if not os.path.isdir(self.logdir): os.makedirs(self.logdir, 0700) logname = os.path.join(self.logdir, '%s_%s.log' % (EXECNAME, today)) self.logfile = open(logname, 'a') self.pid = os.getpid() if self.masterdir == None: logger.fatal('MASTER_DATA_DIRECTORY parameter not set') raise UpgradeError('Initialization failed') if not os.path.exists(self.masterdir): logger.fatal('MASTER_DATA_DIRECTORY: %s not found' % self.masterdir) raise UpgradeError('Initialization failed') if not os.path.isabs(self.masterdir): self.masterdir = os.path.abspath(self.masterdir) if not os.path.exists(self.oldhome): logger.fatal(' directory not found: ' + self.oldhome) raise UpgradeError('Initialization failed') if not os.path.exists(self.newhome): logger.fatal(' directory not found: ' + self.newhome) raise UpgradeError('Initialization failed') if not os.path.isabs(self.oldhome): self.oldhome = os.path.abspath(self.oldhome) if not os.path.isabs(self.newhome): self.newhome = os.path.abspath(self.newhome) #------------------------------------------------------------ def Cleanup(self): ''' Cleanup open connections. Separate from __del__ because that caused weird issues with exit() behavior. ''' if self.cmd == 'MASTER': logger.fatal('Fatal error occurred - Recovering') try: self.Shutdown() except Exception, e: logger.fatal('Cleanup failure: ' + str(e)) try: self.ReleaseLockdown(self.oldenv) except Exception, e: logger.fatal('Cleanup failure: ' + str(e)) if self.pool: del self.pool #------------------------------------------------------------ def SetState(self, newstate): ''' Sets the state in the upgrader state file ''' # Set state across all hosts, if all hosts succeed then set the # master state if self.cmd == 'MASTER': self.CallSlaves('SETSTATE', newstate) try: self.state.write(str(newstate) + "\n") self.state.flush() except Exception, e: raise UpgradeError('Error writing to statefile: (%s)' % str(e)) else: # Build up a unique set of workdirectories on this host locations = set() for d in self.datadirs: (location, _) = os.path.split(d) locations.add(os.path.join(location, WORKDIR)) for d in locations: statefile = os.path.join(d, 'state') file = open(statefile, 'a') file.write(str(newstate) + "\n") file.close() #------------------------------------------------------------ def CallSlaves(self, cmd, option='', option2='', includeMirrors=False): ''' Calls every host to execute the given command with the given option value ''' logger.debug("Remote call: %s" % cmd) # Check for things that should never happen if self.cmd != 'MASTER': raise UpgradeError("Recursive communication error") if not self.array: raise UpgradeError("Failure initializing array") if not self.hostcache: raise UpgradeError("Failure initializing host cache") if not self.pool: raise UpgradeError("Failure initializing worker pool") if self.upgrade or self.revert: gphome = self.newhome else: gphome = self.oldhome # Construct the commands to pass to the worker pool hosts = self.hostcache.get_hosts() for host in hosts: hostname = host.hostname # Skip any hosts in the cache that contain no segments for this # configuration. if len(host.dbs) == 0: continue # Get the data directories for this host: datadirs = [] for seg in host.dbs: if includeMirrors or seg.isSegmentPrimary(): datadirs.append(seg.getSegmentDataDirectory()) fse = [] for seg in host.dbs: if includeMirrors or seg.isSegmentPrimary(): sfs = seg.getSegmentFilespaces(); for (oid, dir) in sfs.iteritems(): fse.append(dir) # Skip any hosts that have no applicable data directories if len(datadirs) == 0: continue cmdList = makeCommand(oldHome=self.oldhome, newHome=self.newhome, oldVersion=self.oldversion, newVersion=self.newversion, command=cmd, pid=self.pid, dataDirectories=datadirs, filespaces=fse, option1=option, option2=option2, method=self.method, isUpgrade=self.upgrade, isRevert=self.revert, mirrors=self.mirrors) c = GpUpgradeCmd("gpmigrator_mirror remote call", cmdList, ctxt=base.REMOTE, remoteHost=hostname) self.pool.addCommand(c) # Wait for the segments to finish try: self.pool.join() except: self.pool.haltWork() self.pool.joinWorkers() failure = False results = [] for cmd in self.pool.getCompletedItems(): r = cmd.get_results() # Going through the gppylib Command interface all stderr from the # remote calls gets redirected to stdout, which is unfortunate # because we'd like to be able to differentiate between the two. # # We keep the stdout chatter to a minimum by passing --quiet to # the remote calls which performs quiet stdout logging. # sys.stderr.write(r.stderr) msg = r.stdout.strip() results.append(msg) if not cmd.was_successful(): log_literal(logger, logging.ERROR, msg) failure=True if failure: raise UpgradeError("Fatal Segment Error") # Warning this output contains everything written to stdout, # which unfortunately includes some of the logging information return "\n".join(results) #------------------------------------------------------------ def CheckDirectories(self): if self.cmd == 'MASTER': fsdirs = self.masterfilespace else: fsdirs = self.filespaces err = False # Build up a unique set of workdirectories on this host locations = set() for d in fsdirs: (location, _) = os.path.split(d) locations.add(os.path.join(location, WORKDIR)) # Check that none of them exist for d in locations: if os.path.exists(d): logger.warn('%s : File exists' % d) err = True if err: raise UpgradeError('Directories exist') #------------------------------------------------------------ def Revert(self): ''' Revert to backup ''' # Check for indication that something is running: if not self.dbup: env = None try: env = self.CheckUp() except UpgradeError: pass # We use the catalog versions we cached during versionCheck to # determine if the MASTER_DATA_DIRECTORY is actually an old # master data directory, or a partially upgraded one. # # Note: it would be better to adjust gpstop so that it knows how # to stop old versions and always shutdown using a new environment. # # Note: It would also be good to avoid starting and stopping more # than necessary here. if env == self.oldenv and self.datacat == self.newcat: self.newenv['MASTER_DATA_DIRECTORY'] = \ self.oldenv['MASTER_DATA_DIRECTORY'] env = self.newenv if env: self.dbup = [env, False] # Now that we've determined what, if anything, is running, we must # stop it. try: if self.faultinjection == -1: raise UpgradeError("faultinjection=%d" % self.faultinjection) self.Shutdown() self.CheckDown(self.revert) except BaseException, e: logger.fatal('***************************************') logger.fatal(str(e)) logger.fatal('***************************************') raise UpgradeError("Unable to REVERT\nShutdown the running " + "database and rerun gpmigrator with '-R'") logger.info('Reverting to Backup') self.revert = True statefile = os.path.join(self.workdir, 'state') if not os.path.exists(statefile): logger.warn(" '%s' not found" % statefile) logger.info('Revert aborted') sys.exit(0) state = self.RunCmd('cat ' + statefile) needrevert = False for line in state.split('\n'): if line == 'BACKUP_VALID': needrevert = True hba = os.path.join(self.masterdir, 'pg_hba.conf'+LOCKEXT) ident = os.path.join(self.masterdir, 'pg_ident.conf'+LOCKEXT) if os.path.exists(hba) or os.path.exists(ident): needunlock = True else: needunlock = False if (needrevert or needunlock): try: self.ReadInfo() except UpgradeError, e: logger.fatal(str(e)) logger.info('Revert aborted') sys.exit(1) # Attempt revert if needrevert: logger.info('Restoring directories') self.DeepLink('REVERT') # Truncate the statefile # + Must occur AFTER actual Revert is processed open(statefile, 'w').truncate() if needunlock: self.ReleaseLockdown(self.oldenv) self.Startup(self.oldenv) logger.info('Removing upgrade user') self.Update('DROP USER IF EXISTS ' + MIGRATIONUSER) self.Shutdown() # Cleanup anything leftover from gpupgrademirror # + Should occur when the database is STOPPED # + This forces a full remirroring of the system again, we # should avoid this whenever possible. logger.info('Performing Mirror rollback') cmd = '%s/sbin/gpupgrademirror.py --rollback' % self.newhome self.RunCmd(cmd, env=self.newenv) # done logger.info('Revert Successful') self.revert = False #------------------------------------------------------------ def CheckVersions(self): ''' Validates that the upgrade from old->new is okay ''' # Log the OS type info os_type = os.uname()[0] os_ver = os.uname()[2] logger.info('Operating System: %s %s' % (os_type, os_ver)) # Log version checking logger.info('Checking version compatibility') # If we got a version, but it isn't recognized by the GpVersion class # then it is likely not a Greenplum database. oldversion = self.getversion(self.oldhome, self.oldenv) newversion = self.getversion(self.newhome, self.newenv) try: oldversion = GpVersion(oldversion) except: raise UpgradeError('Source not a Greenplum Database: ' + oldversion) try: newversion = GpVersion(newversion) except: raise UpgradeError('Target not a Greenplum Database: ' + newversion) logger.info('Source Version: (Greenplum Database) %s' % str(oldversion)) logger.info('Target Version: (Greenplum Database) %s' % str(newversion)) self.oldversion = oldversion self.newversion = newversion self.upgrade = (newversion >= oldversion) is_supported_version(oldversion, self.upgrade) is_supported_version(newversion, self.upgrade) if newversion == oldversion: raise UpgradeError("Greenplum Database is already version '%s'" % str(newversion)) elif newversion.isVersionRelease(oldversion): raise UpgradeError("Upgrade not needed to go from version '%s' to version '%s'" % (str(oldversion), str(newversion))) # We don't support downgrade if not self.upgrade: raise UpgradeError("Downgrade to Greenplum Database %s not supported from gpmigrator" % str(newversion)) if self.upgrade and not newversion.isVersionCurrentRelease(): main = GpVersion('main') raise UpgradeError( "Upgrade from '%s' to '%s' not supported. Target version should be Greenplum Database %s" % (str(oldversion), str(newversion), main.getVersionRelease())) logger.debug('Using %s method for migration' % self.method) # Compare old version catalog number with the catalog in # MASTER_DATA_DIRECTORY catalog_re = re.compile('Catalog version number: *(.*)') oldcat = self.RunCmd('postgres --catalog-version', env=self.oldenv) logger.info("Source %s" % oldcat) newcat = self.RunCmd('postgres --catalog-version', env=self.newenv) logger.info("Target %s" % newcat) # May need this info later self.oldcat = oldcat self.newcat = newcat control = self.RunCmd('pg_controldata ' + self.masterdir, env=self.oldenv) for line in control.split('\n'): m = catalog_re.match(line) if m: self.datacat = line logger.info("Data %s " % line) if ((not self.revert and line != oldcat) or (self.revert and line != oldcat and line != newcat)): logger.debug('catalog mismatch: expected %s, found %s' % (oldcat, line)) msg = 'Catalog in %s does not match source binary' % self.masterdir raise UpgradeError(msg) break # For the moment everything goes: # - Additional checks will be made once the old database is up logger.info('Versions are compatible') #------------------------------------------------------------ def RemoveDirectories(self, force=False): ''' Removes upgrade/ and backup/ directories on all segments ''' # If force is set then we remove the directories regardless of # whether we created them or they were created by another process. # If force is not set then we will throw an exception when we are # done if there were directories that we failed to remove. doerror = False if self.cmd == 'MASTER': logger.info('Removing temporary directories') self.CallSlaves('RMDIR', force, includeMirrors=True) fsdirs = self.masterfilespace else: fsdirs = self.filespaces # Build up a unique set of workdirectories on this host locations = set() for d in fsdirs: (location, _) = os.path.split(d) locations.add(os.path.join(location, WORKDIR)) for d in filter(os.path.exists, locations): flag = os.path.join(d, 'flag') if (not force) and os.path.exists(flag): try: dpid = self.RunCmd('cat ' + flag) dpid = int(dpid) except Exception, e: logger.warn('Read error on file: ' + str(e)) doerror = True continue if dpid != self.pid: logger.warn('Directory %s owned by pid %s' % (d, dpid)) doerror = True continue # Regardless of the setting of force, if there is an active # process running, we won't remove the directory. running = self.RunCmd('find %s -name postmaster.pid -print' % d) if (running): logger.info(running) logger.fatal('Active processes running') doerror = True else: self.RunCmd('rm -rf ' + d) # We hold off throwing the error until the end, because we want to make # sure we removed all of our own directories. if doerror: raise UpgradeError('Failed to remove old upgrade directories') #------------------------------------------------------------ def CreateDirectories(self): ''' Creates the upgrade/ and backup/ directories on all segments ''' if self.cmd == 'MASTER': dirs_exist = False try: self.CheckDirectories() self.CallSlaves('CHKDIR', includeMirrors=True) except UpgradeError, e: dirs_exist = True if dirs_exist: self.RemoveDirectories(force=True) logger.info('Creating temporary directories') # Have the slaves create their directories self.CallSlaves('MKDIR', includeMirrors=True) fsdirs = self.masterfilespace else: fsdirs = self.filespaces if self.cmd != 'MKDIR': raise Exception(self.cmd + ' != MKDIR]') # Given all the actual segment datadirs we need to look one # directory up for the datadir locations and create the gpmigrator # subdirectory there. locations = set() for d in fsdirs: if not os.path.exists(d): raise UpgradeError('Missing Data Directory: %s' % d) (location, _) = os.path.split(d) workpath = os.path.join(location, WORKDIR) locations.add(workpath) # Actually create the directory, none of the should exist yet! for d in locations: # Creating directories is an atomic operation, if mkdir # succeeds then it did not exist previously making it ours # This prevents mulitiple upgrades running simultaneously. os.mkdir(d) self.RunCmd('chmod 700 %s' % d) # Claim it in the name of our parent process. We make use of # this flag during tear down to confirm that we created it. flag = open(os.path.join(d, 'flag'), 'w') flag.write('%s\n' % str(self.pid)) flag.close() # Create subdirectories for /upgrade and /backup upgradepath = os.path.join(d, UPGRADEDIR) os.mkdir(upgradepath) self.RunCmd('chmod 700 %s' % upgradepath) backuppath = os.path.join(d, BACKUPDIR) os.mkdir(backuppath) self.RunCmd('chmod 700 %s' % backuppath) if (self.cmd == 'MASTER'): # Finally now that we have those directories we'll make a state file self.state = open('%s/state' % self.workdir, 'w') #------------------------------------------------------------ def GenerateConfigFiles(self): ''' Creates 'gp_databases' 'gp_array_config' and 'gp_config' files ''' logger.info('Generating config files') # Write a dump of the databases hash db_file = os.path.join(self.workdir, 'gp_databases') o = open(db_file, 'w') for (oid, name) in self.dbs.iteritems(): o.write('%s:%s\n' % (oid, name)) o.close() # Write a dump of the gparray object array_file = os.path.join(self.workdir, 'gp_array_config') self.array.dumpToFile(array_file) # Write our new gp_config file, this contains the configuration # information not captured in gp_array_config. config_file = os.path.join(self.workdir, 'gp_config') o = open(config_file, 'w') for item, value in self.config.iteritems(): if type(value) == int: o.write('%s=%d\n' % (str(item), value)) else: o.write('%s=%s\n' % (str(item), str(value))) o.close() logger.info('Configuration files generated') #------------------------------------------------------------ def ReadInfo(self): ''' When restarting an upgrade this method reads the configuration information from the cached status files. It corresponds directly with ExtractInfo() which gathers the same information from an active database. It is also the inverse of GenerateConfigFiles() which creates the files that this function reads. In normal processing usually ExtractInfo() is called rather than ReadInfo(). This function is used /instead/ when we are doing Revert() processing, or when resuming an upgrade. E.g. cases when we do not want to start the database to get the information we need because the database may not be in a stable state. ''' # self.array - configuration information array_config = os.path.join(self.workdir, 'gp_array_config') if not os.path.exists(array_config): raise UpgradeError(" '%s' not found" % array_config) self.array = GpArray.initFromFile(array_config) # self.dbs - database information db_file = os.path.join(self.workdir, 'gp_databases') if not os.path.exists(db_file): raise UpgradeError(" '%s' not found" % db_file) f = open(db_file, 'r') for line in f: (oid, name) = line.split(':', 1) self.dbs[oid] = name.strip() f.close() # self.hostcache - cache of hostnames self.hostcache = GpHostCache(self.array, self.pool) failed_pings = self.hostcache.ping_hosts(self.pool) if len(failed_pings) > 0: raise UpgradeError( "Cannot upgrade while there are unreachable hosts") # Setup the master filespace dir self.masterfilespace = [] for (id, loc) in self.array.master.getSegmentFilespaces().iteritems(): self.masterfilespace.append(loc) # self.primaries and self.mirrors dbs = self.array.getSegDbList() self.segments = filter(GpDB.isSegmentPrimary, dbs) self.mirrors = filter(GpDB.isSegmentMirror, dbs) # We don't support mirror any more in this script. if (self.mirrors): raise UpgradeError( "Please use gpmigrator_mirror to upgrade a cluster with mirroring") # self.config config_file = os.path.join(self.workdir, 'gp_config') if not os.path.exists(config_file): raise UpgradeError(" '%s' not found" % config_file) f = open(config_file, 'r') for line in f: (key, value) = line.split('=', 1) self.config[key] = value f.close() #------------------------------------------------------------ def ExtractInfo(self): ''' Get all the information we need from the old instance ''' # Can only extract with a running database env = self.CheckUp() # -- Get the array information logger.info('Extracting configuration information') port = self.masterport user = env['USER'] url = dbconn.DbURL(port=port, dbname='template1', username=user) array = GpArray.initFromCatalog(url, utility=True) self.array = array # -- Determine if there are any invalid segments logger.info('Checking that all segments are valid') invalid = array.get_invalid_segdbs() if len(invalid) > 0: for db in invalid: logger.fatal('INVALID SEGMENT: %s:/%s' % (db.getSegmentHostName(), db.getSegmentDataDirectory())) raise UpgradeError('Cannot upgrade database with invalid segments') # -- Get a list of available databases, note this includes "template0"! logger.info('Creating list of databases') databases = self.Select("SELECT oid, datname from pg_database") for row in databases: self.dbs[str(row['oid'])] = row['datname'] # -- Look for an existing user named MIGRATIONUSER logger.info('Checking gpmigrator user') upgradeuser = self.Select( "SELECT count(*)::numeric FROM pg_user " + "WHERE usename = '%s'" % MIGRATIONUSER) if upgradeuser[0] > 0: logger.warn("Database user '%s' already exists" % MIGRATIONUSER) self.Update('DROP USER ' + MIGRATIONUSER) if (not self.upgrade): raise UpgradeError("Downgrade not supported") # -- Older releases didn't have hostname as part of configuration # make sure we have an acurate list of address->host lookups. logger.info('Validating hosts') self.hostcache = GpHostCache(self.array, self.pool) for host in self.hostcache.get_hosts(): for seg in host.dbs: seg.setSegmentHostName(host.hostname) failed_pings = self.hostcache.ping_hosts(self.pool) if len(failed_pings) > 0: raise UpgradeError("Cannot upgrade while there are unreachable " "hosts") # Setup the master filespace dirs self.masterfilespace = [] for (id, loc) in self.array.master.getSegmentFilespaces().iteritems(): self.masterfilespace.append(loc) master = array.master standbymaster = array.standbyMaster # Oddly gparray doesn't have methods for fetching primary segments # specifically dbs = array.getSegDbList() self.segments = filter(GpDB.isSegmentPrimary, dbs) self.mirrors = filter(GpDB.isSegmentMirror, dbs) # We don't support mirror any more in this script. if (self.mirrors): raise UpgradeError( "Please use gpmigrator_mirror to upgrade a cluster with mirroring") # Internal sanity checking if len(self.segments) == 0: raise UpgradeError('No segments found') if standbymaster > 0: raise UpgradeError('Cannot upgrade while standbymaster is running') # To upgrade mirrors from 3.3 format mirrors to 4.0 format mirrors we # must have been provided with the replication ports. if len(self.mirrors) > 0: if (not self.config['REPLICATION_PORT_BASE'] or not self.config['MIRROR_REPLICATION_PORT_BASE'] or not self.mirrormode): logger.fatal("System configured with mirrors:") if not self.config['REPLICATION_PORT_BASE']: logger.fatal(" missing option: " "--replication_port_base=") if not self.config['MIRROR_REPLICATION_PORT_BASE']: logger.fatal(" missing option: " "--mirror_replication_port_base=") if not self.mirrormode: logger.fatal(" missing option: --mirror-mode") raise UpgradeError("Missing required upgrade options") if self.mirrormode not in ('redundant', 'single'): logger.fatal("Unrecognised mirror-mode: %s" % self.mirrormode) logger.fatal("Valid modes are: redundant, single") raise UpgradeError("Invalid mirror-mode") # Setup the replication ports for all segments in the array by host for host in self.hostcache.get_hosts(): # The port ranges we are assigning into p_port = self.config['REPLICATION_PORT_BASE'] + 1 m_port = self.config['MIRROR_REPLICATION_PORT_BASE'] + 1 # Used to determine overlapping port ranges pri_port = [65535, 0] # [min, max] mir_port = [65535, 0] pri_rep_port = [65535, 0] mir_rep_port = [65535, 0] for seg in host.dbs: port = seg.getSegmentPort() if seg.isSegmentPrimary(): seg.setSegmentReplicationPort(p_port) pri_port[0] = min(pri_port[0], port) pri_port[1] = max(pri_port[1], port) pri_rep_port[0] = min(pri_rep_port[0], p_port) pri_rep_port[1] = max(pri_rep_port[1], p_port) p_port += 1 else: seg.setSegmentReplicationPort(m_port) mir_port[0] = min(mir_port[0], port) mir_port[1] = max(mir_port[1], port) mir_rep_port[0] = min(mir_rep_port[0], m_port) mir_rep_port[1] = max(mir_rep_port[1], m_port) m_port += 1 # Check for overlapping port ranges: port_mat = [pri_port, mir_port, pri_rep_port, mir_rep_port] label_mat = ["Primary", "Mirror", "Replication", "Mirror-Replication"] for x in range(0, 4): for y in range(x+1, 4): if ((port_mat[x][0] >= port_mat[y][0] and port_mat[x][0] <= port_mat[y][1]) or (port_mat[x][1] >= port_mat[y][0] and port_mat[x][1] <= port_mat[y][1])): logger.error("Port collision on host %s" % host.hostname) logger.error("... %s port range: [%d-%d]" % (label_mat[x], port_mat[x][0], port_mat[x][1])) logger.error("... %s port range: [%d-%d]" % (label_mat[y], port_mat[y][0], port_mat[y][1])) raise UpgradeError("Port collision on host %s" % host.hostname) # Check for unsupported index types. found = False logger.info("Validating indexes") oids = sorted(self.dbs.keys()) for dboid in oids: db = self.dbs[dboid] if db == "template0": continue indexes = self.Select(''' SELECT quote_ident(n.nspname) || '.' || quote_ident(c.relname) as index, m.amname as kind FROM pg_class c join pg_namespace n on (c.relnamespace = n.oid) join pg_am m on (c.relam = m.oid) WHERE c.relkind = 'i' and m.amname in ('gin', 'hash') ''', db=db) if len(indexes) > 0: if found == False: logger.fatal("Unable to upgrade the following indexes") found = True logger.fatal(" Database: %s" % db) for row in indexes: logger.fatal(" %s [%s]" % (row['index'], row['kind'])) if found: raise UpgradeError("Deprecated index types must be removed prior " "to upgrade") # Check for SQL_ASCII database encoding logger.info("Validating database encoding") encoding = self.Select("SELECT datname FROM pg_database WHERE encoding=0") if len(encoding) > 0: logger.error("Deprecated database encodings found:") for datname in encoding: logger.error(" %s [SQL_ASCII]" % datname) raise UpgradeError("Deprecated database encodings found - contact " "Greenplum Support") # ARRAY_NAME arrayname = self.Select('SELECT gpname FROM gp_id') self.config['ARRAY_NAME'] = arrayname[0] # Determine PORT_BASE, (min of segments - 1) port_base = array.get_min_primary_port() - 1 self.config['PORT_BASE'] = port_base # Determine MIRROR_PORT_BASE, if len(self.mirrors) > 0: mirror_port_base = array.get_min_mirror_port() - 1 self.config['MIRROR_PORT_BASE'] = mirror_port_base # MASTER_HOSTNAME, super easy self.config['MASTER_HOSTNAME'] = master.getSegmentHostName() # MASTER_DIRECTORY, also easy - we already set it up self.config['MASTER_DIRECTORY'] = self.newmaster # MASTER_PORT, super easy self.config['MASTER_PORT'] = master.getSegmentPort() # TRUSTED_SHELL, set manually since there's only one supported value self.config['TRUSTED_SHELL'] = 'ssh' # CHECK_POINT_SEGMENTS self.config['CHECKPOINT_SEGMENTS'] = \ self.Select('show checkpoint_segments')[0] # ENCODING self.config['CLIENT_ENCODING'] = \ self.Select('show client_encoding')[0] # LOCALE self.config['LOCALE'] = \ self.Select('show lc_ctype')[0] # MAX_CONNECTIONS self.config['MAX_CONNECTIONS'] = \ self.Select('show max_connections')[0] logger.info('Configuration acquired') def copy_rewrite_hba(self, frm, to, upgrdfmt): ''' Between 3.2 and 3.3 we have a change in pg_hba.conf. The differences are: - "ident sameuser" becomes "ident" - "ident " becomes "ident map=" - "ldap "ldap url" becomes "ldap ldapserver=..." - "pam servicename" becomes "pam pamservice="servicename"" ''' f = open(frm, 'r') t = open(to, 'w') logger.debug('writing %s to %s for upgrade = %s' % (frm, to, str(upgrdfmt))) for line in f: # translate to new pg_hba.conf format if upgrdfmt: # translate from 3.2 format => 3.3 format: if self.oldversion < '3.3.0.0' and self.newversion >= '3.3.0.0': # ident line = re.compile('ident sameuser$').sub('ident', line) line = re.compile('ident (?!map=)(.*)$').sub('ident map=\\1', line) # pam line = re.compile('pam (?!pamservice=)(.*)$').sub('pam pamservice="\\1"', line) # ldap is a little more complex # our objective is to parse the following: # ldap[s]://[:]/[;prefix[;suffix]] logger.debug("upgrading ldap entries") r1 = re.compile('ldap "?(ldap[s]?)://([^:/]+)(:\d+)?/([^"]*)"?') match = r1.search(line) if match: s = "ldapserver=" + match.group(2).strip() if match.group(3): s += " ldapport=" + str(match.group(3)[1:]).strip() if match.group(4): fixes = match.group(4).strip().split(';') suffix = "" prefix = "" basedn = "" if len(fixes) > 3: raise UpgradeError('cannot rebuild ldap auth ' + 'entry: ' + line) if len(fixes) == 3: suffix = ' ldapsuffix="' + fixes[2] + '"' if len(fixes) >= 2: prefix = ' ldapprefix="' + fixes[1] + '"' if len(fixes) >= 1: basedn = "" # not used s += prefix + suffix if match.group(1) == "ldaps": s += " ldaptls=1" line = re.compile('ldap (.*)$').sub('ldap ' + s, line) else: # downgrade if self.newversion < '3.3.0.0' and self.oldversion >= '3.3.0.0': line = re.compile('ident$').sub('ident sameuser', line) line = re.compile('ident map=(.*)$').sub('ident \\1', line) # logger.debug('writing to %s line %s' % (to, line)) t.write(line) f.close() t.close() def move_rewrite_hba(self, frm, to, upgrade): self.copy_rewrite_hba(frm, to, upgrade) os.unlink(frm) #------------------------------------------------------------ def SetupLockdown(self): ''' Change pg_hba.conf to disallow access to everyone except the upgrade user. ''' os_type = os.uname()[0].lower() if self.cmd == 'MASTER': logger.info('Locking database') self.CheckUp() [env, utility] = self.dbup; if utility: raise UpgradeError("Cannot lock database in utility mode") logger.info('... Creating upgrade user') self.Update('DROP USER IF EXISTS ' + MIGRATIONUSER) self.Update('CREATE USER %s with superuser' % MIGRATIONUSER) logger.info('... Creating pg_hba.conf ') # To write out a new pg_hba.conf we need to know all the ip # addresses that the master identifies with. # # If we re-enable segment locking then this will need to be # passed to the segments as well. if os_type == 'sunos': cmd = 'ifconfig -a inet' else: cmd = 'ifconfig' ifconf = self.RunCmd(cmd) ipv4_re = re.compile('inet (?:addr:)?(\d+\.\d+\.\d+\.\d+)') ipv6_re = re.compile('inet6 (?:addr: )?([a-zA-Z0-9:\.]+[a-zA-Z0-9:])') ip = [] for line in ifconf.split('\n'): m = ipv4_re.search(line) if m: ip.append(m.group(1)) m = ipv6_re.search(line) if m: ip.append(m.group(1)) # Currently we only lock the MASTER node # # locking down the segments requires having gpstart # pass the PGUSER to the segments otherwise the segments # don't have the permissions to startup. # # This leaves a "hole" in the lockdown that still allows # utility connections to the segments. hba = os.path.join(self.masterdir, 'pg_hba.conf') ident = os.path.join(self.masterdir, 'pg_ident.conf') if env == self.oldenv or not os.path.exists(hba + LOCKEXT): # pre-upgrade database makes a backup of files and then # writes a new one # Solaris doesn't support ident authentication if os_type != 'sunos': self.RunCmd('mv -f %s %s' % (ident, ident+LOCKEXT)) file = open(ident, 'w') file.write('%s %s %s\n' % (MIGRATIONUSER, self.user, MIGRATIONUSER)) file.write('%s %s %s\n' % (self.user, self.user, self.user)) file.close() self.RunCmd('mv -f %s %s' % (hba, hba+LOCKEXT)) file = open(hba, 'w') if os_type == 'sunos': file.write('local all %s trust\n' % MIGRATIONUSER) file.write('local all %s trust\n' % self.user) else: file.write('local all %s ident map=%s\n' % (MIGRATIONUSER, MIGRATIONUSER)) file.write('local all %s ident map=%s\n' % (self.user, self.user)) for addr in ip: cidr_suffix = '/128' if ':' in addr else '/32' # MPP-15889 file.write('host all %s %s%s trust\n' % (MIGRATIONUSER, addr, cidr_suffix)) file.write('host all %s %s%s trust\n' % (self.user, addr, cidr_suffix)) file.close() if env == self.newenv: # upgrade database just copies all locking information # from the pre-upgrade database which should already be # locked dir = self.newmaster self.copy_rewrite_hba(hba, os.path.join(dir, 'pg_hba.conf'), self.upgrade) self.RunCmd('cp -f %s %s' % (ident, dir)) # Grab the original versions to (if any) if os.path.exists(hba + LOCKEXT): self.copy_rewrite_hba(hba+LOCKEXT, os.path.join(dir, 'pg_hba.conf'+LOCKEXT), self.upgrade) if os.path.exists(ident + LOCKEXT): self.RunCmd('cp -f %s %s' % (ident+LOCKEXT, dir)) # With the new lockfiles in place force the server to reload logger.info('... Syncing') logger.debug('calling gpstop -u with env: %s' % str(env)) self.RunCmd('gpstop -u -a -l %s' % self.logdir, env=env) logger.info('Database Locked') # Check if there are any other sessions connected. Since we # have modified the pg_hba.conf and run gpstop -u no new sessions # will be allowed, but any that were already connected still # need to be booted. active = self.Select("SELECT count(*) FROM pg_stat_activity") if active[0] > 1: [env, utility] = self.dbup logger.info('Active sessions detected, restarting server') self.Shutdown() self.Startup(env, utility) def CheckClusterReady(self, databases, port): ''' Pre-flight checks ''' for db in databases: xact = self.Select("SELECT count(*)::numeric FROM " + "pg_prepared_xacts", db=db['dbname'], port=port); if xact[0] > 0: raise UpgradeError("Database contains prepared transactions"); #------------------------------------------------------------ def ReleaseLockdown(self, env): ''' Revert pg_hba.conf to pre-lockdown state ''' # Two main cases: # 1) Release lockdown on old environment # 2) Release lockdown on newly installed environment # Both cases have the database installed under MASTER_DATA_DIRECTORY # so we actually treat them exactly the same if self.cmd == 'MASTER': # nothing to do if we haven't setup an array object yet. if not self.array: return logger.info('Unlocking database') # See comment about disabling lockdown on segments in SetupLockdown() # self.CallSlaves('UNLOCK') datadirs = [self.masterdir] else: datadirs = self.datadirs for dir in datadirs: hba = os.path.join(dir, 'pg_hba.conf') ident = os.path.join(dir, 'pg_ident.conf') if os.path.exists(hba + LOCKEXT): forward = False if self.upgrade and env == self.newenv: forward = True elif not self.upgrade and env == self.oldenv: forward = True self.move_rewrite_hba(hba+LOCKEXT, hba, forward) if os.path.exists(ident + LOCKEXT): self.RunCmd('mv -f %s %s' % (ident+LOCKEXT, ident)) # if we don't think the database is up, double check! if not self.dbup: env = None try: env = self.CheckUp() except UpgradeError: pass if env: self.dbup = [env, False] # Re-source the conf files if the database is up. if self.dbup: self.RunCmd('gpstop -u -l %s' % self.logdir, env=env) # MPP-10107 - The server is still running with PGUSER set in its # environment, which is wrong since that user doesn't exist. # Ideally that shouldn't matter, but unfortunately it does. We # deal with this by restarting the server. # # Note: the step above is still required because otherwise we # can't even run this gpstop due to pg_hba.conf lock. # # A better solution would be to have gpstart make a constrained # environment when it starts and stops the server, but that is # more than a one-line change, and this solves this case with # minimal impact to anything else. self.RunCmd('gpstop -ral %s' % self.logdir, env=env) # We would love to actually delete the user, but it's not actually # safe to do so here because we unlock the source database before we # are done with the upgrade. return #------------------------------------------------------------ def ResetXLogs(self): ''' Resets the xlog prior to recreating the schema ''' if self.cmd == 'MASTER': logger.info('Syncing XLogs') datadirs = [self.masterdir] self.CallSlaves('RESETXLOG') else: datadirs = self.datadirs # We should always have at least one data directory... if len(self.datadirs) < 1: return # Check the mapping in all data directories for olddir in datadirs: (d, seg) = os.path.split(olddir) newdir = os.path.join(d, WORKDIR, UPGRADEDIR, seg) # Links are in place, but data will not be visible until # we sync the pg_control info oldsync = self.RunCmd('pg_resetxlog -n ' + olddir, env=self.oldenv) newsync = self.RunCmd('pg_resetxlog -n ' + newdir, env=self.newenv) logger.debug('*****************************') logger.debug(' - old xlog -') logger.debug(oldsync) logger.debug('*****************************') logger.debug(' - new xlog -') logger.debug(newsync) logger.debug('*****************************') # Nasty bit of code to turn the control data into a # nice dict object sync = {} for line in oldsync.split('\n'): if len(line) == 0: continue (field, value) = line.split(':') sync[field] = value.lstrip() # Set the next transaction id in new control x = sync["Latest checkpoint's NextXID"].find('/') if x < 0: nextxid = sync["Latest checkpoint's NextXID"] else: nextxid = sync["Latest checkpoint's NextXID"][x+1:] self.RunCmd('pg_resetxlog -f -x %s %s' % (nextxid, newdir), env=self.newenv) # Set the WAL archives self.RunCmd('pg_resetxlog -l %s,%s,%s %s' % (sync["Latest checkpoint's TimeLineID"], sync["Current log file ID"], sync["Next log file segment"], newdir), env=self.newenv) # Set next oid self.RunCmd('pg_resetxlog -o %s %s' % (sync["Latest checkpoint's NextOID"], newdir), env=self.newenv) # Set next multitransaction ID/offset self.RunCmd('pg_resetxlog -m %s %s' % (sync["Latest checkpoint's NextMultiXactId"], newdir), env=self.newenv) self.RunCmd('pg_resetxlog -O %s %s' % (sync["Latest checkpoint's NextMultiOffset"], newdir), env=self.newenv) # Replace the transaction logs with the old ones # since the old logs actually correspond to the data for f in ['pg_clog', 'pg_distributedlog', 'pg_subtrans', 'pg_multixact']: oldlogs = os.path.join(olddir, f) newlogs = os.path.join(newdir, f) # the only one that wouldn't exist is pg_distributedlog # and only if we were upgrading/downgrading to/from a # version < 3.1.1.5 if (os.path.exists(newlogs)): self.RunCmd('rm -rf %s' % newlogs) if (os.path.exists(oldlogs)): self.RunCmd('cp -rf %s %s' % (oldlogs, newlogs)) #------------------------------------------------------------ def DeepLink(self, style): ''' Makes a deep hard link copy of one directory into another ''' if style == 'CLEANUP': return if style not in ['BACKUP', 'INSTALL', 'REVERT', 'SKELETON']: raise UpgradeError('FATAL', ' DeepLink(%s)!?' % style) if self.cmd == 'MASTER': fsdirs = self.masterfilespace if style == 'BACKUP': logger.info('Linking backup') logger.info('... This may take a while') includeMirrors = True elif style == 'INSTALL': logger.info('Linking upgrade') logger.info('... This may take a while') includeMirrors = True elif style == 'SKELETON': logger.info('Linking skeleton') logger.info('... This may take a while') includeMirrors = False else: logger.info('Reverting to backup') logger.info('... This may take a while') includeMirrors = True # Have all hosts perform the link first self.CallSlaves('DEEPLINK', style, includeMirrors=includeMirrors) else: fsdirs = self.filespaces for dir in fsdirs: (location, content) = os.path.split(dir) # For Backup fsdirs is the source, for INSTALL it's the target if style == 'BACKUP': source = os.path.join(location, content) destination = os.path.join(location, WORKDIR, BACKUPDIR, content) if style == 'SKELETON': source = os.path.join(location, content) destination = os.path.join(location, WORKDIR, UPGRADEDIR, content) if style == 'INSTALL': source = os.path.join(location, WORKDIR, UPGRADEDIR, content) destination = os.path.join(location, content) if style == 'REVERT': source = os.path.join(location, WORKDIR, BACKUPDIR, content) destination = os.path.join(location, content) if not os.path.exists(source): continue # Link the log file if os.path.isfile(source + '.log'): oldfile = source + '.log' newfile = destination + '.log' if os.path.exists(newfile): if style == 'BACKUP': logger.warn('File exists ' + newfile) os.remove(newfile) # logger.debug('Linking file %s => %s' % (oldfile, newfile)) os.link(oldfile, newfile) # Remove any files from the destination location # Not including the target directory itself # Also not including the workdir and contents #if os.path.exists(destination): # for file in os.listdir(destination): # If the file/directory also exists in the source # then delete it in anticipation of receiving the # replacement. # if os.path.exists(os.path.join(source, file)): # self.RunCmd('rm -rf ' + os.path.join(destination, file)) # if the source contains a gpperfmon/data directory we need to # check that it hasn't grown to ridiculous sizes. If it has # that implies that the perfmon is in a "runaway" state where # the server is creating files and nothing is consuming them. # in this case we should just delete the directory. # # Note: This is deleted in the SOURCE, it will NOT be restored # if the upgrade fails or reverts. This is deemed acceptable # because with this many files the system was almost certainly # misconfigured to begin with. Removing the directory will # prevent further acrual of files. gpperfdir = os.path.join(source, 'gpperfmon', 'data') if os.path.exists(gpperfdir): nlink = os.stat(gpperfdir).st_nlink # We define "too large" as >10000 files if nlink > 10000: logger.info('Performance Monitor directory contains ' '%s files - removing' % nlink) self.RunCmd('rm -rf ' + gpperfdir) # now walk the source directory and link every file back into # the target directory. for root, dirs, files in os.walk(source, topdown = True): if WORKDIR in dirs: dirs.remove(WORKDIR) # don't visit the work directory newroot = root.replace(source, destination, 1) if not os.path.exists(newroot): os.mkdir(newroot) self.RunCmd('chmod 700 ' + newroot) for name in files: oldfile = os.path.join(root, name) newfile = os.path.join(newroot, name) if os.path.exists(newfile): if style == 'BACKUP': logger.warn('File exists ' + newfile) os.remove(newfile) # logger.debug('Linking file %s => %s ' % (oldfile, newfile)) os.link(oldfile, newfile) #------------------------------------------------------------ def FixConfig(self): ''' After moving the upgraded db from DATA_DIR/upgrade to just DATA_DIR we need to go and fix the gp_configuration table to point to the correct location. ''' # We can do this in utility mode since the master host is the only # host that contains gp_configuration information logger.info('Adjusting configuration') if self.cmd == 'MASTER': self.Startup(self.newenv, utility=True) self.Update(""" UPDATE pg_filespace_entry SET fselocation = regexp_replace(fselocation, E'/gpmigrator/upgrade/','/') """, forceutility=True, upgradeMode=True, modSysTbl=True) self.Update("VACUUM FREEZE pg_filespace_entry", forceutility=True, upgradeMode=True, modSysTbl=True) self.Shutdown() self.Startup(self.newenv, upgrade=True) self.CallSlaves('FIXCONFIG') datadirs = [self.masterdir] else: datadirs = self.datadirs # Update the persistent table for each segment for dir in datadirs: # Find the last line in the conf file that matches our expression conf = os.path.join(dir, 'postgresql.conf') def getconf(x): conf_re = re.compile('^\s*%s\s*=\s*(\d+)' % x) try: conf_str = self.RunCmd('grep %s %s' % (x, conf)) except CmdError: conf_str = "" # grep returns errorcode on no match value = None for line in conf_str.split('\n'): match = conf_re.search(line) if match: value = int(match.group(1)) return value # Find the port for this segment: port = getconf('port') if port == None: raise UpgradeError('Could not determine port from %s/postgresql.conf' % dir) self.Update(""" UPDATE gp_persistent_filespace_node SET location_1 = regexp_replace(location_1, E'/gpmigrator/upgrade/','/')||' ' where persistent_state = 2 """, port=port, forceutility=True, upgradeMode=True, modSysTbl=True) if self.cmd == 'MASTER': self.Shutdown() self.newenv['MASTER_DATA_DIRECTORY'] = \ self.oldenv['MASTER_DATA_DIRECTORY'] #------------------------------------------------------------ def TouchupConfig(self): ''' Touch up the config for a transform method upgrade/downgrade ''' logger.info('Adjusting configuration') if self.cmd == 'MASTER': # Before we can start the master we need to tag it with a gp_dbid file. # We will tag all the other segment directories after re-mirroring. GpCreateDBIdFile.local('create master gp_dbid file', directory=self.newmaster, dbid=1) # Use upgrade mode to tell the backend code to do some trick, # since at this point the catalog has not yet updated so there # are some discrepancy between code and catalog. The code handles # that case by looking at gp_upgrade_mode flag. self.Startup(self.newenv, utility=True, upgrade=True) # Move datadirectory location to gpmigrator/upgrade subdirectories self.Update(""" UPDATE pg_filespace_entry SET fselocation = regexp_replace(fselocation, E'(/[^/]*)$', E'/gpmigrator/upgrade\\\\1') """, forceutility=True, upgradeMode=True, modSysTbl=True) self.Update("VACUUM FREEZE pg_filespace_entry", forceutility=True, upgradeMode=True) # Update the gp_segment configuration table to correctly fill in # the hostname and replication ports. for db in self.array.getDbList(): dbid = db.getSegmentDbId() hostname = db.getSegmentHostName() or db.getSegmentAddress() rport = db.getSegmentReplicationPort() or "null" self.Update(""" UPDATE gp_segment_configuration SET hostname='%s', replication_port=%s WHERE dbid=%s """ % (hostname, rport, dbid), forceutility=True, upgradeMode=True, modSysTbl=True) self.Update("VACUUM FREEZE gp_segment_configuration", forceutility=True, upgradeMode=True) self.Shutdown() self.Startup(self.newenv, upgrade=True) self.CallSlaves('TOUCHUPCONFIG') datadirs = [ self.masterdir ] else: datadirs = self.datadirs # Update the persistent table for each segment for dir in datadirs: # Find the last line in the conf file that matches our expression conf = os.path.join(dir, 'postgresql.conf') def getconf(x): conf_re = re.compile('^\s*%s\s*=\s*(\d+)' % x) try: conf_str = self.RunCmd('grep %s %s' % (x, conf)) except CmdError: conf_str = "" # grep returns errorcode on no match value = None for line in conf_str.split('\n'): match = conf_re.search(line) if match: value = int(match.group(1)) return value # Find the port for this segment: port = getconf('port') if port == None: raise UpgradeError('Could not determine port from %s/postgresql.conf' % dir) self.Update(""" UPDATE gp_persistent_filespace_node SET location_1 = regexp_replace(location_1, E'(/[^/]*) $', E'/gpmigrator/upgrade\\\\1') where persistent_state = 2 """, port=port, forceutility=True, upgradeMode=True, modSysTbl=True) if self.cmd == 'MASTER': self.Shutdown() #------------------------------------------------------------ def ExtractCatFiles(self): ''' Get the set of data files which are necessary to bring up a skeleton system: just those necessary for bringing up the system in utility mode and querying the catalogs -- no user data. We write the set of files to the file system for later use. ''' logger.info('Extracting list of catalog files') if self.cmd == 'MASTER': datadirs = [ self.masterdir ] self.CallSlaves('EXTRACTCATFILES') else: datadirs = self.datadirs for dir in datadirs: # The directory for this segment (d, seg) = os.path.split(dir) logdir = os.path.join(d, WORKDIR) # Open the mapping file mapfilename = os.path.join(logdir, '%s_catfiles' % seg) mapfile = open(mapfilename, 'w') # Find the last line in the conf file that matches our expression conf = os.path.join(dir, 'postgresql.conf') def getconf(x): conf_re = re.compile('^\s*%s\s*=\s*(\d+)' % x) try: conf_str = self.RunCmd('grep %s %s' % (x, conf)) except CmdError: conf_str = "" # grep returns errorcode on no match value = None for line in conf_str.split('\n'): match = conf_re.search(line) if match: value = int(match.group(1)) return value # Find the port for this segment: port = getconf('port') if port == None: raise UpgradeError('Could not determine port from %s/postgresql.conf' % dir) # Get a list of databases: # Ideally we would like switch to using self.dbs instead # of re-issuing this query, but that requires establishing # a method of passing the database dictionary to the slaves. databases = self.Select( ''' SELECT datname FROM pg_database where datname != 'template0' ORDER BY datname ''', port=port) # Build up the list of catalog entries files = {} for db in databases: l = self.Select( ''' SELECT d.oid as datoid, ts.spcfsoid, case when d.dattablespace = 1663 then '%s/base' else rtrim(fse.location_1)||'/'||(d.dattablespace::text) end || '/'||d.oid as catloc, c1.relname as rel1, c1.relfilenode as node1, c2.relname as rel2, c2.relfilenode as node2, c3.relname as rel3, c3.relfilenode as node3 FROM pg_class c1 left outer join pg_namespace n on (c1.relnamespace = n.oid) left outer join pg_class c2 on (c1.reltoastrelid = c2.oid) left outer join pg_class c3 on (c2.reltoastidxid = c3.oid), pg_database d, pg_tablespace ts, pg_filespace fs left outer join gp_persistent_filespace_node fse on (fs.oid = fse.filespace_oid) WHERE (nspname = 'pg_catalog' or nspname = 'pg_aoseg' or nspname = 'information_schema') and c1.relkind in ('r', 'i', 'o') and not c1.relisshared and d.dattablespace = ts.oid and ts.spcfsoid = fs.oid and d.datname = '%s'; ''' % (dir, db), port=port, db=db) first = True f = [] nlist = ["1", "2", "3"] for row in l: if first: first = False file = os.path.join(row.get("catloc"), 'PG_VERSION') f.append(file) for n in nlist: if row.get("rel" + n): dfile = os.path.join(row.get("catloc"), str(row['node' + n])) if not os.path.exists(dfile): raise UpgradeError( 'found path "%s" for table "%s" but file does not exist' % (dfile, row['rel'+n])) f.append(dfile) files[db] = f # make sure tblspc dir is empty pgtblspc = os.path.join(dir, 'pg_tblspc') if os.path.exists(pgtblspc): if os.listdir(pgtblspc): raise UpgradeError('cannot upgrade system using ' + 'tablespaces') # meta data directories, like commit log meta = ['global', 'pg_clog', 'pg_distributedlog', 'pg_distributedxidmap', 'pg_multixact', 'pg_subtrans', 'pg_twophase', 'pg_utilitymodedtmredo', 'pg_xlog', 'pg_tblspc', 'postgresql.conf', 'pg_hba.conf', 'pg_ident.conf', 'PG_VERSION'] for name, fs in files.iteritems(): for f in fs: # Prefix each name with the database it's in mapfile.write('db:%s\n' % f) # add template0 r = self.Select("""select oid from pg_database where datname = 'template0'""", port=port) if len(r) > 0: mapfile.write('dir:%s\n' % os.path.join('base', str(r[0]))) for f in meta: mapfile.write('meta:%s\n' % f) # files to just link, like SSL key files, backup # postgresql.conf, other stuff admins leave lying around nolink = ['pg_hba.conf', 'pg_ident.conf', 'pg_hba.conf'+LOCKEXT, 'pg_ident.conf'+LOCKEXT, 'postmaster.pid', 'postmaster.opts'] for f in os.listdir(dir): if (os.path.isfile(os.path.join(dir, f)) and f not in meta and f not in nolink): # logger.debug('adding link for file %s' % f) mapfile.write('link:%s\n' % f) mapfile.close() #------------------------------------------------------------ def ExtractAosegs(self): ''' Build AO seg upgrade/downgrade files for this node Also builds the persistent file upgrade/downgrade files for this node. ''' logger.info('Building append only segment upgrader') if self.cmd == 'MASTER': self.CallSlaves('EXTRACTAOSEGS') datadirs = [ self.masterdir ] else: datadirs = self.datadirs for dir in datadirs: (d, _) = os.path.split(dir) logdir = os.path.join(d, WORKDIR) # We want to do this once per directory location, if a previous # datadir already handled this we can skip doing it again. fname = os.path.join(logdir, 'aoseg_template0_rewrite.sql') if os.path.exists(fname): continue conf = os.path.join(dir, 'postgresql.conf') def getconf(x): conf_re = re.compile('^\s*%s\s*=\s*(\d+)' % x) try: conf_str = self.RunCmd('grep %s %s' % (x, conf)) except CmdError: conf_str = "" # grep returns errorcode on no match value = None for line in conf_str.split('\n'): match = conf_re.search(line) if match: value = int(match.group(1)) return value # Find the port for this segment: port = getconf('port') if port == None: raise UpgradeError('Could not determine port ' + 'from %s/postgresql.conf' % dir) # Get a list of databases: # Ideally we would like switch to using self.dbs instead # of re-issuing this query, but that requires establishing # a method of passing the database dictionary to the slaves. databases = self.Select('''select datname from pg_database where datname != 'template0';''', port=port) # do it for every database for db in databases: fname = os.path.join(logdir,'aoseg_%s_rewrite.sql' % db) logger.debug('creating AO rewrite: %s' % fname) f = open(fname, 'w') # The addition of eofuncompressed was 32 => 33 # # Because this version of the upgrader does not support upgrades # directly from 32 the old query is removed. # # See the version of gpmigrator that shipped with 33 for # details. f.close() fname = os.path.join(logdir, 'aoseg_template0_rewrite.sql') f = open(fname, 'w') f.write('vacuum freeze;\n') f.close() # upgrade to persistent tables: 33 => 40 # # Produce upgrade files for persistent tables: # - template1 must be processed first and handles the updates for # the shared tables # - the master is always called with gp_persistent_build_db(false) # - segments call gp_persistent_build_db(true) when there are # mirrors fname = os.path.join(logdir, 'persistent_master.sql') f = open(fname, 'w') f.write('select gp_persistent_build_db(false);\n') f.close() fname = os.path.join(logdir, 'persistent_segment.sql') f = open(fname, 'w') if len(self.mirrors) > 0: f.write('select gp_persistent_build_db(true);\n') else: f.write('select gp_persistent_build_db(false);\n') f.close() #------------------------------------------------------------ def BuildSkel(self): ''' Copy data files to build a skeleton database. ''' logger.info('Building skeleton cluster') if self.cmd == 'MASTER': # It would be nice to avoid having to make a full deep link, # but the gp_persistent creation requires reading in a bunch # of user tables to handle part of the ao conversion. #self.DeepLink('SKELETON') # Once that is done make a REAL copy (not hardlink) of the # actual catalog tables self.CallSlaves('BUILDSKEL', self.highestoid, self.gpperfmon) datadirs = [ self.masterdir ] else: datadirs = self.datadirs self.highestoid = int(self.option) self.gpperfmon = str(self.option2) if self.gpperfmon == 'None': self.gpperfmon = None def do_copy(src, dst, f): ''' Copy a directory tree ''' #logger.debug("copying %s to %s" % (os.path.join(src, f), # os.path.join(dst, f))) if os.path.isdir(os.path.join(src, f)): if os.path.exists(os.path.join(dst,f)): logger.debug('rmtree(%s)' % os.path.join(dst,f)) shutil.rmtree(os.path.join(dst,f)) logger.debug('cptree(%s)' % os.path.join(dst,f)) shutil.copytree(os.path.join(src, f), os.path.join(dst, f)) else: if os.path.exists(os.path.join(dst,f)): logger.debug('rm(%s)' % os.path.join(dst,f)) os.remove(os.path.join(dst,f)) logger.debug('cp(%s)' % os.path.join(dst,f)) shutil.copy2(os.path.join(src, f), os.path.join(dst, f)) # for this is a file, copy all segments if os.path.isfile(os.path.join(src, f)): for i in range(1, 66000): i = '.' + str(i) fseg = f + i if os.path.exists(os.path.join(src, fseg)): if os.path.exists(os.path.join(dst,fseg)): logger.debug('rm(%s)' % os.path.join(dst,fseg)) os.remove(os.path.join(dst,fseg)) logger.debug('cp(%s)' % os.path.join(dst,fseg)) shutil.copy2(os.path.join(src, fseg), os.path.join(dst, fseg)) else: break for src in datadirs: (d, seg) = os.path.split(src) dst = os.path.join(d, WORKDIR, UPGRADEDIR, seg) logdir = os.path.join(d, WORKDIR) # Open the mapping file mapfilename = os.path.join(logdir, '%s_catfiles' % seg) mapfile = open(mapfilename, 'r') # build list for directories to create dirs = [] for line in mapfile: line = line[:-1] (maptype, f) = line.split(':') # f is in the form of filespace/seg/tablespace/dbid/filenode if maptype is db # destination dir is filespace/WORKDIR/UPGRADEDIR/seg/tablespace/dbid if (maptype=='db'): (filespace, segid, tablespace, dbid, filenode) = f.rsplit("/", 4) path = os.path.join(filespace, WORKDIR, UPGRADEDIR, segid, tablespace, dbid) dirs.append(path) dirs = list(set(dirs)) # make unique logger.debug('dirs = %s' % str(dirs)) # make sure dst exists if not os.path.exists(dst): os.mkdir(dst, 0700) dirs.append(os.path.join(dst, 'pg_log')) dirs.append(os.path.join(dst, 'pg_changetracking')) for dir in dirs: logger.debug("making dir %s" % dir) if not os.path.exists(dir): os.makedirs(dir, 0700) logger.debug("copying files") # rewind mapfile.seek(0) for line in mapfile: (maptype, f) = line.split(':') f = f[:-1] if maptype == 'link': left = os.path.join(src, f) right = os.path.join(dst, f) logger.debug('linking %s to %s' % (left, right)) if os.path.exists(right): os.remove(right) os.link(left, right) elif maptype == 'meta': do_copy(src, dst, f) elif maptype == 'dir': do_copy(src, dst, f) else: (filespace, segid, tablespace, dbid, filenode) = f.rsplit("/", 4) srcdir = os.path.join(filespace, segid, tablespace, dbid) dstdir = os.path.join(filespace, WORKDIR, UPGRADEDIR, segid, tablespace, dbid) do_copy(srcdir, dstdir, filenode) def overwrite(frm, to): if not os.path.exists(frm): return f = open(frm, 'r') t = open(to, 'w') for line in f: t.write(line) f.close() t.close() # remove the two pg_hba.conf hardlinks from DeepLink('SKELETON') # otherwise they get in the way of the recreate logic below. if os.path.exists(os.path.join(dst, 'pg_hba.conf'+LOCKEXT)): os.remove(os.path.join(dst, 'pg_hba.conf'+LOCKEXT)) if os.path.exists(os.path.join(dst, 'pg_hba.conf')): os.remove(os.path.join(dst, 'pg_hba.conf')) # master needs locked pg_hba.conf if self.cmd == 'MASTER': self.copy_rewrite_hba(os.path.join(src, 'pg_hba.conf'+LOCKEXT), os.path.join(dst, 'pg_hba.conf'+LOCKEXT), self.upgrade) overwrite(os.path.join(src, 'pg_ident.conf'+LOCKEXT), os.path.join(dst, 'pg_ident.conf'+LOCKEXT)) self.copy_rewrite_hba(os.path.join(src, 'pg_hba.conf'), os.path.join(dst, 'pg_hba.conf'), self.upgrade) overwrite(os.path.join(src, 'pg_ident.conf'), os.path.join(dst, 'pg_ident.conf')) # Perform updates to the postgresql.conf file newconf = os.path.join(dst, 'postgresql.conf') oldconf = os.path.join(dst, 'postgresql.conf.bak') if os.path.exists(oldconf): os.remove(oldconf) shutil.move(newconf, oldconf) f = open(oldconf, 'r') t = open(newconf, 'w') foundPerfmon = False conf_re = re.compile(r"^\s*(\w+)\s*=\s*([^#]*)(.*)$") for line in f: m = conf_re.match(line) if m: setting = m.group(1) value = m.group(2) rest = m.group(3) # Comment out defunct gucs if setting in ('gp_fault_action'): t.write('#%s = %s # defunct setting\n' % (setting, value)) continue # Bump the value of max_resource_queues by one if setting in ('max_resource_queues'): value = str(int(value)+1) t.write('%s = %s %s\n' % (setting, value, rest)) continue # Make perfmon setting consistent for all segments if setting in ('gp_enable_gpperfmon'): if self.gpperfmon == None: t.write('#%s = %s %s\n' % (setting, value, rest)) else: t.write('%s = %s %s\n' % (setting, self.gpperfmon, rest)) foundPerfmon = True continue t.write(line) # If gp_enable_gpperfmon was not mentioned anywhere in the file # and the setting exists on the master then add the line. if (foundPerfmon == False and self.gpperfmon != None): t.write('gp_enable_gpperfmon = %s\n' % self.gpperfmon) f.close() t.close() # set highest oid # XXX: should check that this oid is higher! self.RunCmd("pg_resetxlog -o %d %s" % \ (self.highestoid, dst), env=self.oldenv) #------------------------------------------------------------ def TransformDbs(self): ''' Transform the databases. ''' logger.info('Performing catalog transformation') def transform(dbs, tfile): # MPP-10166 - because some of our transformations involve creates # of views with unstable oids we must make sure that we process # the databases in exactly the same order on every segment, so # sorting the oids is essential. oids = sorted(dbs.keys()) for dboid in oids: db = dbs[dboid] logger.info('... ' + db) transform_db(db, dboid, tfile) # Test failure during transformation if self.faultinjection == 1: raise UpgradeError("faultinjection=%d" % self.faultinjection) def transform_db(dbname, dboid, tfile): ''' Transform the specified database for a single database. ''' logger.debug("transforming db = %s dboid = %s tfile = %s" % \ (str(dbname), str(dboid), tfile)) # Execute the upgrade script cmd = ' '.join(["PGOPTIONS='-c gp_maintenance_conn=true'", 'psql', '-f', tfile, dbname, self.user]) logger.debug('running ' + cmd) p = subprocess.Popen(cmd, shell = True, close_fds = True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.newenv) result = p.communicate() if p.returncode == 0: logger.debug('successfully transformed ' + dbname) logger.debug(result[0].strip()) logger.debug(result[1].strip()) else: logger.debug(result[0]) raise CmdError(str(cmd), result[0], result[1]) # run the upgrade file runfile = "%s/share/postgresql/upgrade/upg2_catupgrade.sql" % self.newhome transform(self.dbs, runfile) #------------------------------------------------------------ def SetCatVersion(self, frm, to): ''' Set the catalog version to something different. Cluster should be down. ''' logger.info('Modifying catalog version of skeleton cluster') releases = {"3.0": "200703112", "3.1": "200712072", "3.2": "200808253", "3.3": "200905011", "4.0": "201005134", "4.1": "201101130", "4.2": "201109210"} self.CheckDown() def release2catverno(rno): if not rno in releases.keys(): raise Exception("unknown version %s" % rno) return releases[rno] def get_control_data(datadir): ''' Parse the output of pg_controldata run on data directory, returning catalog version and state ''' cmd = ' '.join(['pg_controldata', datadir]) p = subprocess.Popen(cmd, shell=True, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.newenv) result = p.communicate() if p.returncode != 0: raise CmdError(cmd, result[0], result[1]) out = result[0].strip() ver = "" state = "" for line in out.split('\n'): s = line.split(':') if s[0] == 'Catalog version number': ver = s[1].strip() elif s[0] == 'Database cluster state': state = s[1].strip() return [ver, state] def setcatversion(datadir, frm, to): ''' Set catalog version to 'to' from 'frm'. Make sure the database is already set to frm. We use gpmodcontrol to update pg_control file since in this version (4.3) we change the ControlFile structure. If only catalog_version_no needs to change, we could use gpmodcatversion as was previously. In case of gpmodcontrol (but not the case for gpmodcatversion), the newer environment is responsible to take care of downgrade too since older version does not have knowledge of the newer structure. ''' (ver, state) = get_control_data(datadir) frmcatverno = release2catverno(frm) if ver != frmcatverno: raise Exception("Expected version %s but found %s" % (frmcatverno, ver)) # It is an error if the current state is not "shut down", that # means that there may be pending xlog records which will cause # problems for the upgrade. if state != "shut down": raise UpgradeError("Cannot upgrade: Database did not shutdown cleanly") # gpmodcatversion is only in the 3.3 env so we must use that cmd = ['%s/bin/lib/gpmodcontrol' % self.newhome] if not self.upgrade: cmd.append('--downgrade') cmd.append(datadir) p = subprocess.Popen(cmd, shell=False, close_fds=True, env=self.newenv, # always newenv stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = p.communicate() if p.returncode != 0: raise Exception("could not update catalog to %s" % to) if self.cmd == 'MASTER': datadirs = [ self.masterdir ] self.CallSlaves('SETCATVERSION') else: datadirs = self.datadirs for olddir in datadirs: (d, seg) = os.path.split(olddir) newdir = os.path.join(d, WORKDIR, UPGRADEDIR, seg) setcatversion(newdir, frm, to) #------------------------------------------------------------ def GetHighestOid(self): ''' Get the highest OID declared on the node, according to the control file. We use this number to reset all the control files in the new cluster we're building, so that all the new objects will be added with the same OID. Since we're not dispatching the SQL to create the objects, this is our only way to synchronise the OIDs. Database must be down. ''' logger.info('Looking for highest OID') self.CheckDown(self.revert) self.highestoid = 16384 # first normal OID oids = [] if self.cmd == 'MASTER': datadirs = [ self.masterdir ] out = self.CallSlaves('GETHIGHESTOID') for l in out.splitlines(): logger.debug("GETHIGHESTOID> '%s'" % l) a = l.split() if a[0] == "highestoid": oids.append(int(a[1])) else: datadirs = self.datadirs for dir in datadirs: cmd = "pg_controldata %s" % dir out = self.RunCmd(cmd, env=self.oldenv) for l in out.splitlines(): a = l.split(':') if a[0] == "Latest checkpoint's NextOID": oids.append(int(a[1])) logger.debug("oids: %s" % str(oids)) for o in oids: # XXX: what about overflow ? if o > self.highestoid: self.highestoid = o if self.cmd != 'MASTER': print "highestoid " + str(self.highestoid) logger.debug("highest oid found: %d" % self.highestoid) #------------------------------------------------------------ def StampDBId(self): ''' Populates the gp_dbid file for every segment ''' logger.info("Adding gp_dbid file") # Already handled the master during TouchupConfig() for seg in self.segments: c = GpCreateDBIdFile('create gp_dbid file', seg.getSegmentDataDirectory(), seg.getSegmentDbId(), ctxt=base.REMOTE, remoteHost=seg.getSegmentAddress()) self.pool.addCommand(c) for seg in self.mirrors: c = GpCreateDBIdFile('create gp_dbid file', seg.getSegmentDataDirectory(), seg.getSegmentDbId(), ctxt=base.REMOTE, remoteHost=seg.getSegmentAddress()) self.pool.addCommand(c) try: self.pool.join() except: self.pool.haltWork() self.pool.joinWorkers() failure = False for cmd in self.pool.getCompletedItems(): if not cmd.was_successful(): msg = cmd.get_results().stdout.strip() log_literal(logger, logging.ERROR, msg) failure = True if failure: raise UpgradeError("Fatal Segment Error") #------------------------------------------------------------ def Run(self): """ The main execution method - switches between submodes. The main frontend will execute the main PerformUpgrade() function, during which it will execute CallSlaves with various backend operations. For each one of the main backend operations we will spawn the backend executables, this switches between each of these operations. """ # Execution on the master if self.cmd == 'MASTER': self.PerformUpgrade() # The rest of the options are executed on remote hosts elif self.cmd == 'SETSTATE': self.SetState(self.option) elif self.cmd == 'CHKDIR': try: self.CheckDirectories() except UpgradeError, e: if str(e) == 'Directories exist': sys.exit(1) raise e elif self.cmd == 'CHKDOWN': self.CheckDown(self.revert) elif self.cmd == 'LOCKDOWN': self.SetupLockdown() elif self.cmd == 'UNLOCK': self.ReleaseLockdown(None) elif self.cmd == 'MKDIR': self.CreateDirectories() elif self.cmd == 'RMDIR': if self.option == 'False': self.RemoveDirectories(False) else: self.RemoveDirectories(True) elif self.cmd == 'RESETXLOG': self.ResetXLogs() elif self.cmd == 'DEEPLINK': self.DeepLink(self.option) elif self.cmd == 'EXTRACTCATFILES': self.ExtractCatFiles() elif self.cmd == 'BUILDSKEL': self.BuildSkel() elif self.cmd == 'TOUCHUPCONFIG': self.TouchupConfig() elif self.cmd == 'FIXCONFIG': self.FixConfig() elif self.cmd == 'SETCATVERSION': self.SetCatVersion(self.oldversion.getVersionRelease(), self.newversion.getVersionRelease()) elif self.cmd == 'TRANSFORMCAT': self.TransformDbs() elif self.cmd == 'EXTRACTAOSEGS': self.ExtractAosegs() elif self.cmd == 'GETHIGHESTOID': self.GetHighestOid() else: raise Exception('Unknown cmd: ' + str(self.cmd)) if self.pool: t = self.pool self.pool = None del t #------------------------------------------------------------ def PerformUpgrade(self): """ This is the main method of the upgrader which controls the overallflow of the upgrade. """ logger.info('gpmigrator version ' + __version__) logger.info('Python version %d.%d.%d %s %d' % sys.version_info) # Check that we can do the upgrade between the versions self.CheckVersions() # User requested a revert to previous version if self.revert: self.Revert() sys.exit(0) # Read the statefile if we are trying to restart current_state = {} statefile = os.path.join(self.workdir, 'state') if os.path.exists(statefile): logger.info("Found previous upgrade") self.state = open(statefile, 'r') for line in self.state: tup = line.split(':', 1) if len(tup) == 1: current_state[line.strip()] = True else: current_state[tup[0].strip()] = tup[1].strip() self.state = open(statefile, 'a') # Be careful if the last upgrade died in an unstable state if 'BACKUP_VALID' in current_state: self.Revert() # Determine if it is safe to continue from where we left off, or # if we need to start over from the beginning. # 1) We only try to resume if we made it to NEWDB_VALID # 2) We only try to resume if the Latest checkpoint's NextXid # maches our expectations. if 'NEWDB_VALID' not in current_state: if os.path.exists(statefile): logger.info("... State not resumable, restarting upgrade") current_state = {} else: control_data = \ GpControlData.local('Get Control Data', self.masterdir) current_xid = control_data["Latest checkpoint's NextXID"] # We expect gpupgrade mirrors to bump the latest xid by exactly # one as it starts the database and extracts the configuration # information. old_xid = current_state['NEWDB_VALID'] expected_xid = current_state.get('REMIRROR_XID') logger.info("... Old XID = %s" % old_xid) logger.info("... New XID = %s" % current_xid) logger.info("... Expected XID = %s" % expected_xid) if current_xid not in [old_xid, expected_xid]: logger.info("... Latest checkpoint mismatch, restarting upgrade") current_state = {} else: logger.info("... Latest checkpoint matches, resuming upgrade") # -------- # Stage 1: # -------- # We have 1 main restart state which is the state of NEWDB_VALID. This # is the state we record right before we start remirroring. After the # first phase of remirroring we stop the script to allow for the # possibility of a zfs snapshot (which isn't really feasible before # remirroring due to the amount of data written during the remirroring # step). if 'NEWDB_VALID' in current_state: self.ReadInfo() else: # Test failure before we bring up the database if self.faultinjection == 1: raise UpgradeError("faultinjection=%d" % self.faultinjection) # Test failure due to a bad lockfile in /tmp # For this injection we are assuming that the segments are running # on the local machine and using port 50000, not necessarily true # but should be good enough for testing. if self.faultinjection == 2: # XXX try: env = self.CheckUp() self.dbup = [env, False] self.Shutdown() except: pass logger.fatal("faultinjection=%d" % self.faultinjection) open('/tmp/.s.PGSQL.50001.lock', 'w') # We always pull the current configuration from the old instance. # (It may be nice to rewrite to allow caching state information) shutdown = None lockfile = os.path.join(self.oldenv['MASTER_DATA_DIRECTORY'], 'pg_hba.conf'+LOCKEXT) if os.path.exists(lockfile): raise UpgradeError("migration lock file already exists at: %s" % lockfile) try: env = self.CheckUp(True) shutdown = False self.dbup = [env, False] if env == self.newenv: logger.fatal('Cannot prepare with running new db') except: shutdown = True self.Startup(self.oldenv) # Test failure after we bring up the database # - we should stop it again if self.faultinjection == 3: raise UpgradeError("faultinjection=%d" % self.faultinjection) # The way things are structured we will only issue selects against # the database that's "up" so mark the oldenv as running. self.ExtractInfo() # Create working directories on all hosts # + Requires configuration info collected in ExtractInfo # + Requires Execution on all hosts if 'CREATE_DIRS' not in current_state: self.CreateDirectories() self.SetState('BOOTSTRAP: ' + self.RunCmd('date +%Y%m%d:%H:%M:%S')) self.SetState('CREATE_DIRS') # Run the common pre-upgrade check if 'CHECKCAT' not in current_state: self.PreUpgradeCheck() self.SetState('CHECKCAT') # If we're in checkcat only mode, return here. Don't do the actual upgrade if self.checkonly: return # SetupLockdown: # + Requires the database to be up # + Creates the upgrade user # + Shuts the database down # + Adjusts hba_conf to only allow upgrade user access # + Database is left down self.SetupLockdown() # Test failure after we have locked the database # - we should unlock it if self.faultinjection == 4: raise UpgradeError("faultinjection=%d" % self.faultinjection) # Write configuration files # + Required for Revert() to work correctly if 'CONFIG_FILES' not in current_state: self.GenerateConfigFiles() self.SetState('CONFIG_FILES') if 'EXTRACTCATFILES' not in current_state: self.ExtractCatFiles() self.SetState('EXTRACTCATFILES') #if 'EXTRACTAOSEGS' not in current_state: # self.ExtractAosegs() # self.SetState('EXTRACTAOSEGS') # All information collected, shutdown old database # - We can't release the lockdown yet because we rely on copying # the lockdown files when building the upgraded version self.Shutdown() # Test failure # - unlock database if self.faultinjection == 5: raise UpgradeError("faultinjection=%d" % self.faultinjection) if 'GETHIGHESTOID' not in current_state: self.GetHighestOid() self.SetState('GETHIGHESTOID') # Note: Because the source array is already in a lockdown state # the skeleton we build will also automatically be in a lockdown # state, so there is no need to Explicitly lock it. if 'BUILDSKEL' not in current_state: self.BuildSkel() self.SetState('BUILDSKEL') if 'SETCATVERSION' not in current_state: sov = self.oldversion.getVersionRelease() snv = self.newversion.getVersionRelease() self.SetCatVersion(sov, snv) self.SetState('SETCATVERSION') # TouchupConfig() performs final modifications to the catalog # to get the configuration information in line with the upgrade. # This involves adjusting the replication ports, hostnames, # and other configuration information. if 'TOUCHUPCONFIG' not in current_state: self.TouchupConfig() self.SetState('TOUCHUPCONFIG') if 'TRANSFORMCAT' not in current_state: self.Startup(self.newenv, upgrade=True) self.TransformDbs() self.SetState('TRANSFORMCAT') self.Shutdown() # Test failure # - unlock database if self.faultinjection == 6: raise UpgradeError("faultinjection=%d" % self.faultinjection) # Release the lockdown on the old database self.ReleaseLockdown(self.oldenv) # Test failure # - unlock database if self.faultinjection == 7: raise UpgradeError("faultinjection=%d" % self.faultinjection) # We are now at the state where we have a fully upgraded version # but we haven't yet installed it or brought the mirrors in sync. # This is an important state because we may try to resume from # this point. control_data = \ GpControlData.local('Get Control Data', self.masterdir) current_xid = control_data["Latest checkpoint's NextXID"] self.SetState('NEWDB_VALID:%s' % str(current_xid)) logger.info("--------------------------------------------------") logger.info("Stage 1 complete") logger.info("--------------------------------------------------") # -------- # Stage 2: # -------- # We have no fully upgraded the existing old configuration into a # new configuration. # # The current state of the world is as follows # DATA_DIR/ = untouched original db # DATA_DIR/upgrade = fully upgraded new db # DATA_DIR/backup = empty except for the state and flag files. # # We haven't modified anything under the master directory yet. # If anything has gone wrong up to this point it is pretty much # safe to blow away everything under 'upgrade/' and 'backup/' # (The only thing to be careful about is that we shutdown the # new db before we remove the storage from under it.) # Test failure after releasing the lockdown if self.faultinjection == 8: raise UpgradeError("faultinjection=%d" % self.faultinjection) # Add the gp_dbid file to every segment # + Requires all mirror directories to exist if 'GP_DBID' not in current_state: self.StampDBId() self.SetState('GP_DBID') # To install the new db under the old master path we go through # pains to make it as robust as possible. This is done via a # combination of redundancy and hard links. # # => Make a deep hardlink copy of DATA_DIR -> DATA_DIR/backup if 'BACKUP_VALID' not in current_state: self.DeepLink('BACKUP') self.SetState('BACKUP_VALID') try: # Test failure after installing the backup # - Revert is necessary if self.faultinjection == 9: raise UpgradeError("faultinjection=%d" % self.faultinjection) # Test failure during Revert # - See Revert() if self.faultinjection == -1: raise UpgradeError("faultinjection=%d" % self.faultinjection) # -------- # Stage 3: # -------- # The current state of the world is as follows # DATA_DIR/ = contents unreliable # DATA_DIR/upgrade = fully upgraded new db # DATA_DIR/backup = untouched original db # => Make a deep hard link copy of DATA_DIR/upgrade -> DATA_DIR # => Adjust new pg_configuration to point to DATA_DIR self.DeepLink('INSTALL') self.FixConfig() # Test failure after installing the newdb # - Revert is necessary if self.faultinjection == 10: raise UpgradeError("faultinjection=%d" % self.faultinjection) # Everything upgraded and validated # + Mark install valid self.Startup(self.newenv) # Install the gp_toolkit schema # + Requires a fully running 4.0 system including dispatch # and thus must occur after remirroring. if 'POSTUPGRADE' not in current_state: self.PerformPostUpgrade() self.SetState('POSTUPGRADE') # VACUUM FREEZE template1, since we haven't already self.Update('VACUUM FREEZE;', db='template1') # Finally, drop the upgrade user: # - The migrator user shouldn't own anything, but if the # "dumprestore" method was ever used on this database # then it might. logger.info("Removing upgrade user"); try: self.Update('DROP USER ' + MIGRATIONUSER, forceUseEnvUser=True) except: logger.warn('Objects owned by gpmigrator reassigned to dba') admin = self.Select('''select rolname from pg_authid where oid = 10''', forceUseEnvUser=True)[0]; oids = sorted(self.dbs.keys()) for dboid in oids: db = self.dbs[dboid] if db == 'template0': continue self.Update('REASSIGN OWNED BY gpmigrator TO ' + admin, db, forceUseEnvUser=True) self.Update('DROP USER ' + MIGRATIONUSER) # Test failure after installing the newdb # - Revert is necessary # - gpmigrator user no longer exists if self.faultinjection == 11: raise UpgradeError("faultinjection=%d" % self.faultinjection) if self.warnings: logger.warn('***************************************') logger.warn('Warnings encountered during upgrade.') logger.warn('***************************************') # Release the lockdown self.ReleaseLockdown(self.newenv) except Exception, e: logger.fatal(str(e)) self.Revert() raise e # -------- # Stage 4: # -------- # The current state of the world is as follows # DATA_DIR/ = fully upgraded new db # DATA_DIR/upgrade = fully upgraded new db # DATA_DIR/backup = untouched original db # => Remove redundant DATA_DIR/upgrade # => Remove historical DATA_DIR/backup self.RemoveDirectories(force=True) # ----------------- # Upgrade Complete: # ----------------- # The current state of the world is as follows # DATA_DIR/ = fully upgraded new db logger.info("---------------------------------------------------") logger.info("Upgrade Successful") logger.info("---------------------------------------------------") logger.info("Please consult release notes for post-upgrade ") logger.info("instructions to complete environment configuration") logger.info("---------------------------------------------------") return #============================================================ if __name__ == '__main__': coverage = GpCoverage() coverage.start() # Create a new GPUpgrade - should never throw exception u = GPUpgrade() # parses and validates input try: u.Setup() except Exception, e: logger.fatal(str(e)) sys.exit(1) # Execute the main upgrade routine, the Run() function itself is # just a switch on the "cmd", which switches between the main # PerformUpgrade() function and all the various subcommands that # are upgraded by backend executions. try: u.Run() # Any error that can happen except KeyboardInterrupt: try: u.Cleanup() except Exception, e2: logger.fatal(str(e2)) logger.fatal('***************************************') logger.fatal('== Upgrade interrupted by user == <<<<< ') logger.fatal('***************************************') sys.exit(1) except ConnectionError, e: try: u.Cleanup() except Exception, e2: logger.fatal(str(e2)) logger.fatal('***************************************') logger.fatal(str(e)) logger.fatal('***************************************') sys.exit(1) except UpgradeError, e: try: u.Cleanup() except Exception, e2: logger.fatal(str(e2)) logger.fatal('***************************************') logger.fatal(str(e)) logger.fatal('***************************************') sys.exit(1) except CmdError, e: try: u.Cleanup() except Exception, e2: logger.fatal(str(e2)) logger.fatal('***************************************') logger.fatal('Error executing command:') logger.fatal(' ' + str(e.cmd.strip())) logger.fatal(' ' + str(e.stderr.strip())) logger.fatal('***************************************') sys.exit(1) # Shouldn't get any of these, if they occur it is probably a bug in # the upgrader. except Exception, e: try: u.Cleanup() except Exception, e2: logger.fatal(str(e2)) logger.fatal('***************************************') logger.fatal(str(e)) logger.fatal('***************************************') logger.fatal(traceback.format_exc()) sys.exit(1) finally: coverage.stop() coverage.generate_report() # This shouldn't need to be explicit, but if something goes wrong in # thread shutdown it forces the issue. sys.exit(0)