#!/usr/bin/env python # Line too long - pylint: disable=C0301 # Invalid name - pylint: disable=C0103 # # Copyright (c) EMC Inc 2010. All Rights Reserved. # from gppylib.mainUtils import SimpleMainLock, ExceptionNoStackTraceNeeded import os import sys import signal import itertools try: import pg from gppylib.commands.unix import * from gppylib.commands.gp import * from gppylib.commands.pg import PgControlData from gppylib.gparray import * from gppylib.gpparseopts import OptParser, OptChecker from gppylib.gplog import * from gppylib.db import dbconn from gppylib.userinput import * from gppylib.operations.startSegments import * from pgdb import DatabaseError from gppylib import gparray, gplog, pgconf, userinput, utils from gppylib.parseutils import line_reader, check_values, canonicalize_address except ImportError, e: sys.exit('ERROR: Cannot import modules. Please check that you have sourced greenplum_path.sh. Detail: ' + str(e)) # constants MAX_BATCH_SIZE = 128 GPDB_STOPPED = 1 GPDB_STARTED = 2 GPDB_UTILITY = 3 GP_MOVEMIRRORS_LOCK_PATH = "gpmovemirrors.lock" description = (""" Moves mirror segments in a pre-existing GPDB Array. """) EXECNAME = os.path.split(__file__)[-1] _help = [""" The input file should be a plain text file with the format: || || """, """ An input file must be specified. """, ] # ----------------------- Command line option parser ---------------------- def parseargs(): parser = OptParser(option_class=OptChecker, description=' '.join(description.split()), version='%prog version $Revision$') parser.setHelp(_help) parser.remove_option('-h') parser.add_option('-i', '--input', dest="input_filename", help="input expansion configuration file.", metavar="FILE") parser.add_option('-d', '--master-data-directory', dest='master_data_directory', help='The master data directory for the system. If this option is not specified, \ the value is obtained from the $MASTER_DATA_DIRECTORY environment variable.') parser.add_option('-B', '--batch-size', dest='batch_size', type='int', default=16, metavar="", help='Expansion configuration batch size. Valid values are 1-%d' % MAX_BATCH_SIZE) parser.add_option('-v', '--verbose', dest="verbose", action='store_true', help='debug output.') parser.add_option('-l', '--log-file-directory', dest="logfile_directory", help='The directory where the gpmovemirrors log file should be put. \ The default location is ~/gpAdminLogs.') parser.add_option('-C', '--continue', dest='continue_move', action='store_true', help='Continue moving mirrors') parser.add_option('', '--hba-hostnames', action='store_true', dest='hba_hostnames', help='use hostnames instead of CIDR in pg_hba.conf') parser.add_option('-h', '-?', '--help', action='help', help='show this help message and exit.') parser.add_option('--usage', action="briefhelp") parser.set_defaults(verbose=False, filters=[], slice=(None, None)) # Parse the command line arguments (options, args) = parser.parse_args() if len(args) > 0: logger.error('Unknown argument %s' % args[0]) parser.exit() if options.batch_size < 1 or options.batch_size > 128: logger.error('Invalid argument. -B value must be >= 1 and <= %s' % MAX_BATCH_SIZE) parser.print_help() parser.exit() if options.input_filename is None: logger.error('Missing argument. -i or --input is a required argument.') parser.print_help() parser.exit() try: if options.master_data_directory is None: options.master_data_directory = get_masterdatadir() options.gphome = get_gphome() except GpError, msg: logger.error(msg) parser.exit() if not os.path.exists(options.master_data_directory): logger.error('Master data directory does not exist.') parser.exit() options.pgport = int(os.getenv('PGPORT', 5432)) return options, args # ------------------------------------------------------------------------- def logOptionValues(options): """ """ logger.info("Option values for this invocation of gpmovemirrors are:") logger.info("") logger.info(" --input = " + str(options.input_filename)) logger.info(" --master-data-directory = " + str(options.master_data_directory)) logger.info(" --batch-size = " + str(options.batch_size)) if options.verbose != None: logger.info(" --verbose = " + str(options.verbose)) if options.continue_move != None: logger.info(" --continue = " + str(options.continue_move)) logger.info("") # ------------------------------------------------------------------------- pool = None def shutdown(): """used if the script is closed abrubtly""" logger.info('Shutting down gpmovemirrors...') if pool != None: pool.haltWork() pool.joinWorkers() # ------------------------------------------------------------------------- def gpmovemirrors_status_file_exists(master_data_directory): """Checks if status exists""" return os.path.exists(master_data_directory + '/gpmovemirrors.status') # ------------------------------------------------------------------------- def sig_handler(sig, arg): shutdown() signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGHUP, signal.SIG_DFL) # raise sig os.kill(os.getpid(), sig) # ------------------------------------------------------------------------- def lookupGpdb(address, port, dataDirectory): """ Look up the segment gpdb by address, port, and dataDirectory """ gpdbList = gpArrayInstance.getDbList() for gpdb in gpdbList: if address == str(gpdb.getSegmentAddress()) and port == str(gpdb.getSegmentPort()) and dataDirectory == str( gpdb.getSegmentDataDirectory()): return gpdb return None # ------------------------------------------------------------------------- # ------------------------------------------------------------------------- class Mirror: """ This class represents information about a mirror. """ def __init__(self, address, port, dataDirectory, inPlace=False): self.address = address self.port = port self.dataDirectory = dataDirectory self.inPlace = inPlace def __str__(self): tempStr = "address = " + str(self.address) + '\n' tempStr = tempStr + "port = " + str(self.port) + '\n' tempStr = tempStr + "dataDirectory = " + str(self.dataDirectory) + '\n' return tempStr # ------------------------------------------------------------------------- # ------------------------------------------------------------------------- class Configuration: """ This class represents the mirrors. """ def __init__(self): self.inputFile = None self.fileData = None self.oldMirrorList = [] self.newMirrorList = [] def _getParsedRow(self, lineno, line): groups = line.split() # NOT line.split(' ') due to MPP-15675 if len(groups) != 2: raise ExceptionNoStackTraceNeeded("expected 2 groups but found %d" % len(groups)) oldParts = groups[0].split('|') newParts = groups[1].split('|') if len(oldParts) != 3 or len(newParts) != 3: msg = "expected 3 parts in old and new groups, got %d and %d" % (oldParts, newParts) raise ExceptionNoStackTraceNeeded(msg) oldAddress, oldPort, oldDataDirectory = oldParts newAddress, newPort, newDataDirectory = newParts check_values(lineno, address=oldAddress, port=oldPort, datadir=oldDataDirectory) check_values(lineno, address=newAddress, port=newPort, datadir=newDataDirectory) return (oldAddress, oldPort, oldDataDirectory), (newAddress, newPort, newDataDirectory) def read_input_file(self, inputFile): self.inputFile = inputFile with open(inputFile) as f: for lineno, line in line_reader(f): try: (oldAddress, oldPort, oldDataDirectory), (newAddress, newPort, newDataDirectory) = \ self._getParsedRow(lineno, line) oldGpdb = lookupGpdb(oldAddress, oldPort, oldDataDirectory) if oldGpdb is None or oldGpdb.getSegmentRole() != ROLE_MIRROR: if oldGpdb is None: raise Exception( "Old mirror segment does not exist with given information: address = %s, port = %s, segment data directory = %s" % (oldAddress, str(oldPort), oldDataDirectory)) else: raise Exception( "Old mirror segment is not currently in a mirror role: address = %s, port = %s, segment data directory = %s" % (oldAddress, str(oldPort), oldDataDirectory)) inPlace = (oldAddress == newAddress and oldDataDirectory == newDataDirectory) if inPlace and oldPort == newPort: logger.warn("input file %s contained request to move a mirror with identical attributes (%s, %s, %s)" % (inputFile, newAddress, str(newPort), newDataDirectory)) oldMirror = Mirror(address=oldAddress , port=oldPort , dataDirectory=oldDataDirectory , inPlace=inPlace ) newMirror = Mirror(address=newAddress , port=newPort , dataDirectory=newDataDirectory ) self.oldMirrorList.append(oldMirror) self.newMirrorList.append(newMirror) except ValueError: raise ValueError('Missing or invalid value on line %d of file: %s.' % (line, inputFile)) except Exception, e: raise Exception('Invalid input file on line %d of file %s: %s' % (line, inputFile, str(e))) def write_output_file(self, filename, oldToNew=True): """ Write out the configuration in a format appropriate for use with the -i option. """ fp = open(filename, 'w') if oldToNew == True: firstList = self.oldMirrorList secondList = self.newMirrorList else: firstList = self.newMirrorList secondList = self.oldMirrorList for i in range(0, len(firstList)): oldEntry = firstList[i] newEntry = secondList[i] line = canonicalize_address(oldEntry.address) + \ "|" + str(oldEntry.port) + "|" + oldEntry.dataDirectory line = line + " " line = line + canonicalize_address(newEntry.address) + \ "|" + str(newEntry.port) + "|" + newEntry.dataDirectory line = line + '\n' fp.write(line) # ------------------------------------------------------------------------- # --------------------------------- main ---------------------------------- # ------------------------------------------------------------------------- gp_movemirrors = None remove_pid = True options = None args = None pidfilepid = None # pid of the process which has the lock locktorelease = None sml = None # sml (simple main lock) logger = get_default_logger() try: # setup signal handlers so we can clean up correctly signal.signal(signal.SIGTERM, sig_handler) signal.signal(signal.SIGHUP, sig_handler) options, args = parseargs() setup_tool_logging(appName=EXECNAME , hostname=getLocalHostname() , userName=getUserName() , logdir=options.logfile_directory ) enable_verbose_logging() mainOptions = {"pidlockpath": GP_MOVEMIRRORS_LOCK_PATH} sml = SimpleMainLock(mainOptions) otherpid = sml.acquire() if otherpid is not None: logger.error("Lockfile %s indicates that an instance of %s is already running with PID %s" % (sml.ppath, "gpmovemirrors", otherpid)) logger.error("If this is not the case, remove the lockfile directory at %s" % (sml.ppath)) sys.exit(1) logger.info("Invocation of gpmovemirrors mirrors") logOptionValues(options) dburl = dbconn.DbURL() # Get array configuration try: gpArrayInstance = GpArray.initFromCatalog(dburl, utility=True) except DatabaseError, ex: logger.error('Failed to connect to database. Make sure the') logger.error('Greenplum instance is running, and that') logger.error('your environment is correct, then rerun') logger.error('gpmovemirrors ' + ' '.join(sys.argv[1:])) sys.exit(1) """ Get the old to new mirror configuration. """ newConfig = Configuration() newConfig.read_input_file(options.input_filename) PgHbaEntriesToUpdate = [] """ Do some sanity checks on the input. """ for oldMirror, newMirror in itertools.izip(newConfig.oldMirrorList, newConfig.newMirrorList): seg = lookupGpdb(oldMirror.address, oldMirror.port, oldMirror.dataDirectory) if seg is None: raise Exception( "Old mirror segment does not exist with given information: address = %s, port = %s, segment data directory = %s" % (oldMirror.address, str(oldMirror.port), oldMirror.dataDirectory)) if seg.getSegmentContentId() == MASTER_CONTENT_ID: raise Exception( "Cannot move master or master mirror segments: address = %s, port = %s, segment data directory = %s" % (oldMirror.address, str(oldMirror.port), oldMirror.dataDirectory)) if seg.getSegmentRole() != ROLE_MIRROR: raise Exception( "Old mirror segment is not currently in a mirror role: address = %s, port = %s, segment data directory = %s" % (oldMirror.address, str(oldMirror.port), oldMirror.dataDirectory)) # exclude any in place mirrors, i.e if the host on which the mirror reside still stays same, in that # case we need not update the entries if not oldMirror.inPlace: for segment in gpArrayInstance.getDbList(): # identify the corresponding primary pair of the mirror being moved if segment.getSegmentContentId() == seg.getSegmentContentId() and segment.getSegmentRole() != seg.getSegmentRole(): PgHbaEntriesToUpdate.append((segment.getSegmentDataDirectory(), segment.getSegmentHostName(), newMirror.address)) """ Prepare common execution steps for running commands on segments """ mirrorsToDelete = [mirror for mirror in newConfig.oldMirrorList if not mirror.inPlace] totalDirsToDelete = len(mirrorsToDelete) numberOfWorkers = min(totalDirsToDelete, options.batch_size) """ Update pg_hba.conf on primary segments with new mirror information """ if numberOfWorkers > 0: pool = WorkerPool(numWorkers=numberOfWorkers) try: for primary_datadir, primary_hostname, newMirror_hostname in PgHbaEntriesToUpdate: # Start with an empty string so that the later .join prepends a newline to the first entry entries = [''] # Add the samehost replication entry to support single-host development entries.append("host replication {username} samehost trust".format(username=unix.getUserName())) if options.hba_hostnames: hostname, _, _ = socket.gethostbyaddr(newMirror_hostname) entries.append("host all {username} {hostname} trust".format(username=unix.getUserName(), hostname=hostname)) entries.append("host replication {username} {hostname} trust".format(username=unix.getUserName(), hostname=hostname)) else: newMirror_ips = unix.InterfaceAddrs.remote('get mirror ips', newMirror_hostname) for ip in newMirror_ips: cidr_suffix = '/128' if ':' in ip else '/32' cidr = ip + cidr_suffix entries.append("host all {username} {cidr} trust".format(username=unix.getUserName(), cidr=cidr)) new_mirror_if_addrs = IfAddrs.list_addrs(newMirror_hostname) for ip in new_mirror_if_addrs: cidr_suffix = '/128' if ':' in ip else '/32' cidr = ip + cidr_suffix entries.append("host replication {username} {cidr} trust".format(username=unix.getUserName(), cidr=cidr)) cmdStr = ". {gphome}/greenplum_path.sh; echo '{entries}' >> {datadir}/pg_hba.conf; pg_ctl -D {datadir} reload".format( gphome=os.environ["GPHOME"], entries="\n".join(entries), datadir=primary_datadir ) logger.info("Updating pg_hba.conf entries on primary %s:%s with new mirror %s information" % (primary_hostname, primary_datadir, newMirror_hostname)) cmd = Command(name="append to pg_hba.conf", cmdStr=cmdStr, ctxt=REMOTE, remoteHost=primary_hostname) pool.addCommand(cmd) # Wait for the segments to finish pool.join() except: pool.haltWork() pool.joinWorkers() failure = False results = [] for cmd in pool.getCompletedItems(): r = cmd.get_results() if not cmd.was_successful(): logging.error("Unable to update pg_hba conf on primary segment: " + str(r)) failure = True pool.haltWork() pool.joinWorkers() if failure: logging.error("Unable to update pg_hba.conf on the primary segments") raise Exception("Unable to update pg_hba.conf on the primary segments.") """ Create a backout file for the user if they choose to go back to the original configuration. """ backout_filename = options.input_filename + "_backout_" + str(time.strftime("%m%d%y%H%M%S")) newConfig.write_output_file(backout_filename, False) """ Start gprecoverseg. """ recoversegOptions = "-i " + newConfig.inputFile + " -v -a -d " + options.master_data_directory if options.logfile_directory != None: recoversegOptions = recoversegOptions + " -l " + str(options.logfile_directory) logger.info('About to run gprecoverseg with options: ' + recoversegOptions) cmd = GpRecoverSeg("Running gprecoverseg", options=recoversegOptions) cmd.run(validateAfter=True) """ Delete old mirror directories. """ if numberOfWorkers > 0: pool = WorkerPool(numWorkers=numberOfWorkers) for mirror in mirrorsToDelete: logger.info("About to remove old mirror segment directory: " + mirror.dataDirectory) cmd = RemoveDirectory("remove old mirror segment directories", mirror.dataDirectory, ctxt=REMOTE, remoteHost=mirror.address) pool.addCommand(cmd) # Wait for the segments to finish try: pool.join() except: pool.haltWork() pool.joinWorkers() failure = False results = [] for cmd in pool.getCompletedItems(): r = cmd.get_results() if not cmd.was_successful(): logging.error("Unable to remove old mirror segment directory: " + str(r)) failure = True pool.haltWork() pool.joinWorkers() if failure: logging.error("Although the system is in the process of recovering the mirrors to their new location,") logging.error("There was an issue removing the old mirror segment directories.") raise Exception("Unable to complete removal of old mirror segment directories.") except Exception, e: if options is not None and options.verbose: logger.exception("gpmovemirrors failed. exiting...") else: logger.error("gpmovemirrors failed: %s \n\nExiting..." % e) sys.exit(3) except KeyboardInterrupt: # Disable SIGINT while we shutdown. signal.signal(signal.SIGINT, signal.SIG_IGN) shutdown() # Re-enabled SIGINT signal.signal(signal.SIGINT, signal.default_int_handler) sys.exit('\nUser Interrupted') finally: if sml is not None: sml.release()