gpmovemirrors 18.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
#!/usr/bin/env python
# Line too long            - pylint: disable=C0301
# Invalid name             - pylint: disable=C0103
#
# Copyright (c) EMC Inc 2010. All Rights Reserved.
#
from gppylib.mainUtils import SimpleMainLock, ExceptionNoStackTraceNeeded

import copy
import datetime
import os
import sys
import socket
import signal
from time import strftime, sleep

try:
    from gppylib.commands.unix import *
    from gppylib.commands.gp import *
    from gppylib.commands.pg import PgControlData
    from gppylib.gparray import *
    from gppylib.gpparseopts import OptParser, OptChecker
    from gppylib.gplog import *
    from gppylib.db import dbconn
    from gppylib.userinput import *
    from gppylib.operations.startSegments import *
    from pygresql.pgdb import DatabaseError
    from pygresql import pg
L
Larry Hamel 已提交
29
    from gppylib import gparray, gplog, pgconf, userinput, utils
30 31 32 33 34 35 36
    from gppylib.parseutils import line_reader, parse_filespace_order, parse_gpmovemirrors_line, \
        canonicalize_address

except ImportError, e:
    sys.exit('ERROR: Cannot import modules.  Please check that you have sourced greenplum_path.sh.  Detail: ' + str(e))

# constants
L
Larry Hamel 已提交
37
MAX_BATCH_SIZE = 128
38

L
Larry Hamel 已提交
39 40 41
GPDB_STOPPED = 1
GPDB_STARTED = 2
GPDB_UTILITY = 3
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57

GP_MOVEMIRROS_PID_FILE = "gpmovemirrors.pid"

description = ("""
Moves mirror segments in a pre-existing GPDB Array.
""")

EXECNAME = os.path.split(__file__)[-1]

_help = ["""
The input file should be be a plain text file with the format:

  filespaceOrder=[<filespace1_fsname>[:<filespace2_fsname>:...]
  <old_address>:<port>:<system_filespace_location> <new_address:port>:<replication_port>:<system_filespace_location>[:<fselocation>:...]

""",
L
Larry Hamel 已提交
58 59 60 61
         """
         An input file must be specified.
         """,
         ]
62

L
Larry Hamel 已提交
63 64

# ----------------------- Command line option parser ----------------------
65 66
def parseargs():
    parser = OptParser(option_class=OptChecker,
L
Larry Hamel 已提交
67 68
                       description=' '.join(description.split()),
                       version='%prog version $Revision$')
69 70 71
    parser.setHelp(_help)
    parser.remove_option('-h')

L
Larry Hamel 已提交
72 73 74
    parser.add_option('-i', '--input', dest="input_filename",
                      help="input expansion configuration file.", metavar="FILE")
    parser.add_option('-d', '--master-data-directory', dest='master_data_directory',
75 76 77 78
                      help='The master data directory for the system. If this option is not specified, \
                            the value is obtained from the $MASTER_DATA_DIRECTORY environment variable.')
    parser.add_option('-B', '--batch-size', dest='batch_size', type='int', default=16, metavar="<batch_size>",
                      help='Expansion configuration batch size. Valid values are 1-%d' % MAX_BATCH_SIZE)
L
Larry Hamel 已提交
79
    parser.add_option('-v', '--verbose', dest="verbose", action='store_true',
80
                      help='debug output.')
L
Larry Hamel 已提交
81
    parser.add_option('-l', '--log-file-directory', dest="logfile_directory",
82 83
                      help='The directory where the gpmovemirrors log file should be put. \
                            The default location is ~/gpAdminLogs.')
L
Larry Hamel 已提交
84
    parser.add_option('-C', '--continue', dest='continue_move', action='store_true',
85 86
                      help='Continue moving mirrors')
    parser.add_option('-h', '-?', '--help', action='help',
L
Larry Hamel 已提交
87
                      help='show this help message and exit.')
88 89
    parser.add_option('--usage', action="briefhelp")

L
Larry Hamel 已提交
90
    parser.set_defaults(verbose=False, filters=[], slice=(None, None))
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110

    # Parse the command line arguments
    (options, args) = parser.parse_args()

    if len(args) > 0:
        logger.error('Unknown argument %s' % args[0])
        parser.exit()

    if options.batch_size < 1 or options.batch_size > 128:
        logger.error('Invalid argument.  -B value must be >= 1 and <= %s' % MAX_BATCH_SIZE)
        parser.print_help()
        parser.exit()

    if options.input_filename == None:
        logger.error('Missing argument. -i or --input is a required argument.')
        parser.print_help()
        parser.exit()

    try:
        if options.master_data_directory == None:
L
Larry Hamel 已提交
111
            options.master_data_directory = get_masterdatadir()
112 113 114 115 116 117
        options.gphome = get_gphome()
    except GpError, msg:
        logger.error(msg)
        parser.exit()

    if not os.path.exists(options.master_data_directory):
L
Larry Hamel 已提交
118 119
        logger.error('Master data directory does not exist.')
        parser.exit()
120 121 122 123 124 125

    options.pgport = int(os.getenv('PGPORT', 5432))

    return options, args


L
Larry Hamel 已提交
126
# -------------------------------------------------------------------------
127 128 129 130 131 132 133 134 135 136 137 138 139
def logOptionValues(options):
    """ """
    logger.info("Option values for this invocation of gpmovemirrors are:")
    logger.info("")
    logger.info("  --input                 = " + str(options.input_filename))
    logger.info("  --master-data-directory = " + str(options.master_data_directory))
    logger.info("  --batch-size            = " + str(options.batch_size))
    if options.verbose != None:
        logger.info("  --verbose               = " + str(options.verbose))
    if options.continue_move != None:
        logger.info("  --continue              = " + str(options.continue_move))
    logger.info("")

L
Larry Hamel 已提交
140 141

# -------------------------------------------------------------------------
142 143
pool = None

L
Larry Hamel 已提交
144

145 146 147 148
def shutdown():
    """used if the script is closed abrubtly"""
    logger.info('Shutting down gpmovemirrors...')
    if pool != None:
L
Larry Hamel 已提交
149 150 151 152 153
        pool.haltWork()
        pool.joinWorkers()


# -------------------------------------------------------------------------
154 155 156 157 158
def gpmovemirrors_status_file_exists(master_data_directory):
    """Checks if status exists"""
    return os.path.exists(master_data_directory + '/gpmovemirrors.status')


L
Larry Hamel 已提交
159 160
# -------------------------------------------------------------------------
def sig_handler(sig, arg):
161 162 163 164 165 166 167
    shutdown()

    signal.signal(signal.SIGTERM, signal.SIG_DFL)
    signal.signal(signal.SIGHUP, signal.SIG_DFL)

    # raise sig
    os.kill(os.getpid(), sig)
L
Larry Hamel 已提交
168 169 170


# -------------------------------------------------------------------------
171 172
def lookupGpdb(address, port, dataDirectory):
    """ Look up the segment gpdb by address, port, and dataDirectory """
L
Larry Hamel 已提交
173 174

    gpdbList = gparray.getDbList()
175
    for gpdb in gpdbList:
L
Larry Hamel 已提交
176 177 178
        if address == str(gpdb.getSegmentAddress()) and port == str(gpdb.getSegmentPort()) and dataDirectory == str(
                gpdb.getSegmentDataDirectory()):
            return gpdb
179
    return None
L
Larry Hamel 已提交
180 181 182


# -------------------------------------------------------------------------
183 184
def lookupReplicatonPort(gpdb):
    """ Look up the segment's replication port associated with the input parameters. """
L
Larry Hamel 已提交
185

186 187
    retValue = None
    if gpdb != None:
L
Larry Hamel 已提交
188
        retValue = gpdb.getSegmentReplicationPort()
189 190 191 192

    return retValue


L
Larry Hamel 已提交
193 194 195 196 197 198 199 200 201 202
# -------------------------------------------------------------------------
def lookupFilespaces(gpdb):
    """ Look up the segment's filespaces, and return map of filespace name : location. """

    retValue = {}
    gpFilespaceObjList = gparray.getFilespaces(includeSystemFilespace=False)
    gpdbFilespaceDict = gpdb.getSegmentFilespaces()

    for oid in gpdbFilespaceDict:
        if oid == SYSTEM_FILESPACE:
203
            continue
L
Larry Hamel 已提交
204 205 206 207 208 209 210 211
        name = gparray.getFileSpaceName(oid)
        location = gpdbFilespaceDict[oid]
        retValue[name] = location
    return retValue


# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
212 213 214 215
class Mirror:
    """ This class represents information about a mirror. """

    def __init__(self, address, port, replicationPort, dataDirectory, filespaces):
L
Larry Hamel 已提交
216 217
        self.address = address
        self.port = port
218
        self.replicationPort = replicationPort
L
Larry Hamel 已提交
219 220
        self.dataDirectory = dataDirectory
        self.filespaces = filespaces
221 222 223

    def __str__(self):
        tempStr = "address = " + str(self.address) + '\n'
L
Larry Hamel 已提交
224 225
        tempStr = tempStr + "port = " + str(self.port) + '\n'
        tempStr = tempStr + "replicationPort = " + str(self.replicationPort) + '\n'
226 227 228 229
        tempStr = tempStr + "dataDirectory   = " + str(self.dataDirectory) + '\n'
        tempStr = tempStr + "filespaces      = " + str(self.filespaces)
        return tempStr

L
Larry Hamel 已提交
230 231 232

# -------------------------------------------------------------------------
# -------------------------------------------------------------------------
233 234 235 236 237
class Configuration:
    """ This class represents the mirrors. """

    def __init__(self):
        self.inputFile = None
L
Larry Hamel 已提交
238
        self.fileData = None
239
        self.filespaceNames = []
L
Larry Hamel 已提交
240 241 242
        self.oldMirrorList = []
        self.newMirrorList = []

243 244 245 246 247 248 249 250 251 252 253 254 255 256
    def read_input_file(self, inputFile):
        self.inputFile = inputFile

        fslist = None
        with open(inputFile) as f:

            for lineno, line in line_reader(f):
                if fslist is None:
                    fslist = parse_filespace_order(inputFile, lineno, line)
                    self.filespaceNames = fslist
                    continue

                rowMap, newFilespaces = parse_gpmovemirrors_line(inputFile, lineno, line, fslist)

L
Larry Hamel 已提交
257 258 259 260
                oldAddress = rowMap['oldAddress']
                oldPort = rowMap['oldPort']
                oldDataDirectory = rowMap['oldDataDirectory']
                oldGpdb = lookupGpdb(oldAddress, oldPort, oldDataDirectory)
261
                if oldGpdb == None or oldGpdb.getSegmentRole() != ROLE_MIRROR:
L
Larry Hamel 已提交
262 263 264 265 266 267 268 269
                    if oldGpdb == None:
                        raise Exception(
                            "Old mirror segment does not exist with given information: address = %s, port = %s, segment data directory = %s"
                            % (oldAddress, str(oldPort), oldDataDirectory))
                    else:
                        raise Exception(
                            "Old mirror segment is not currently in a mirror role: address = %s, port = %s, segment data directory = %s"
                            % (oldAddress, str(oldPort), oldDataDirectory))
270
                oldReplicationPort = lookupReplicatonPort(oldGpdb)
L
Larry Hamel 已提交
271 272 273 274 275 276 277
                oldFilespaces = lookupFilespaces(oldGpdb)
                oldMirror = Mirror(address=oldAddress
                                   , port=oldPort
                                   , replicationPort=oldReplicationPort
                                   , dataDirectory=oldDataDirectory
                                   , filespaces=oldFilespaces
                                   )
278
                self.oldMirrorList.append(oldMirror)
L
Larry Hamel 已提交
279 280 281
                newAddress = rowMap['newAddress']
                newPort = rowMap['newPort']
                newDataDirectory = rowMap['newDataDirectory']
282
                newReplicationPort = rowMap['newReplicationPort']
L
Larry Hamel 已提交
283 284 285 286 287 288
                newMirror = Mirror(address=newAddress
                                   , port=newPort
                                   , replicationPort=newReplicationPort
                                   , dataDirectory=newDataDirectory
                                   , filespaces=newFilespaces
                                   )
289 290
                self.newMirrorList.append(newMirror)

L
Larry Hamel 已提交
291
    def write_output_file(self, filename, oldToNew=True):
292 293
        """ Write out the configuration in a format appropriate for use with the -i option. """
        fp = open(filename, 'w')
L
Larry Hamel 已提交
294

295 296 297 298 299
        """ Set the first line up for the filespaceOrder entry. """
        fsString = ":".join(self.filespaceNames)
        line = "filespaceOrder=" + fsString
        line = line + '\n'
        fp.write(line)
L
Larry Hamel 已提交
300

301
        if oldToNew == True:
L
Larry Hamel 已提交
302 303
            firstList = self.oldMirrorList
            secondList = self.newMirrorList
304
        else:
L
Larry Hamel 已提交
305 306
            firstList = self.newMirrorList
            secondList = self.oldMirrorList
307 308 309
        for i in range(0, len(firstList)):
            oldEntry = firstList[i]
            newEntry = secondList[i]
L
Larry Hamel 已提交
310 311
            line = canonicalize_address(oldEntry.address) + \
                   ":" + str(oldEntry.port) + ":" + oldEntry.dataDirectory
312
            line = line + " "
L
Larry Hamel 已提交
313 314
            line = line + canonicalize_address(newEntry.address) + \
                   ":" + str(newEntry.port) + ":" + str(newEntry.replicationPort) + ":" + newEntry.dataDirectory
315 316 317 318 319 320
            for fsName in self.filespaceNames:
                line = line + ":" + newEntry.filespaces[fsName]
            line = line + '\n'
            fp.write(line)


L
Larry Hamel 已提交
321 322 323 324 325 326 327 328 329 330
# -------------------------------------------------------------------------
# --------------------------------- main ----------------------------------
# -------------------------------------------------------------------------
gp_movemirrors = None
remove_pid = True
options = None
args = None
pidfilepid = None  # pid of the process which has the lock
locktorelease = None
sml = None  # sml (simple main lock)
331 332

try:
L
Larry Hamel 已提交
333

334 335 336 337 338 339 340 341
    # setup signal handlers so we can clean up correctly
    signal.signal(signal.SIGTERM, sig_handler)
    signal.signal(signal.SIGHUP, sig_handler)

    logger = get_default_logger()

    options, args = parseargs()

L
Larry Hamel 已提交
342 343 344 345 346
    setup_tool_logging(appName=EXECNAME
                       , hostname=getLocalHostname()
                       , userName=getUserName()
                       , logdir=options.logfile_directory
                       )
347 348 349

    enable_verbose_logging()

L
Larry Hamel 已提交
350 351
    mainOptions = {"pidfilename": GP_MOVEMIRROS_PID_FILE}
    sml = SimpleMainLock(mainOptions)
352 353
    otherpid = sml.acquire()
    if otherpid is not None:
L
Larry Hamel 已提交
354 355
        logger.error("An instance of %s is already running (pid %s)" % ("gpmovemirrors", otherpid))
        sys.exit(1)
356 357 358 359 360 361 362 363

    logger.info("Invocation of gpmovemirrors mirrors")
    logOptionValues(options)

    dburl = dbconn.DbURL()

    # Get array configuration
    try:
L
Larry Hamel 已提交
364
        gparray = GpArray.initFromCatalog(dburl, utility=True)
365 366 367 368 369 370 371 372 373 374
    except DatabaseError, ex:
        logger.error('Failed to connect to database.  Make sure the')
        logger.error('Greenplum instance is running, and that')
        logger.error('your environment is correct, then rerun')
        logger.error('gpmovemirrors ' + ' '.join(sys.argv[1:]))
        sys.exit(1)

    """ Get the old to new mirror configuration. """
    newConfig = Configuration()
    newConfig.read_input_file(options.input_filename)
L
Larry Hamel 已提交
375

376 377 378 379
    """ Do some sanity checks on the input. """
    for mirror in newConfig.oldMirrorList:
        seg = lookupGpdb(mirror.address, mirror.port, mirror.dataDirectory)
        if seg == None:
L
Larry Hamel 已提交
380 381 382
            raise Exception(
                "Old mirror segment does not exist with given information: address = %s, port = %s, segment data directory = %s"
                % (mirror.address, str(mirror.port), mirror.dataDirectory))
383
        if seg.getSegmentContentId() == MASTER_CONTENT_ID:
L
Larry Hamel 已提交
384 385 386
            raise Exception(
                "Cannot move master or master mirror segments: address = %s, port = %s, segment data directory = %s"
                % (mirror.address, str(mirror.port), mirror.dataDirectory))
387
        if seg.getSegmentRole() != ROLE_MIRROR:
L
Larry Hamel 已提交
388 389 390 391
            raise Exception(
                "Old mirror segment is not currently in a mirror role: address = %s, port = %s, segment data directory = %s"
                % (mirror.address, str(mirror.port), mirror.dataDirectory))

392 393 394 395 396 397 398 399 400
    """ Create a backout file for the user if they choose to go back to the original configuration. """
    backout_filename = options.input_filename + "_backout_" + str(time.strftime("%m%d%y%H%M%S"))
    newConfig.write_output_file(backout_filename, False)

    """ Start gprecoverseg. """
    recoversegOptions = "-i " + newConfig.inputFile + " -v -a -d " + options.master_data_directory
    if options.logfile_directory != None:
        recoversegOptions = recoversegOptions + " -l " + str(options.logfile_directory)
    logger.info('About to run gprecoverseg with options: ' + recoversegOptions)
L
Larry Hamel 已提交
401 402 403
    cmd = GpRecoverSeg("Running gprecoverseg", options=recoversegOptions)
    cmd.run(validateAfter=True)

404 405
    """ Delete old mirror directories. """
    try:
L
Larry Hamel 已提交
406
        fileSpacesObjList = gparray.getFilespaces(includeSystemFilespace=False)
407 408
        totalDirsToDelete = len(newConfig.oldMirrorList) + len(fileSpacesObjList)
        numberOfWorkers = min(totalDirsToDelete, options.batch_size)
L
Larry Hamel 已提交
409
        pool = WorkerPool(numWorkers=numberOfWorkers)
410 411 412
        for mirror in newConfig.oldMirrorList:
            logger.info("About to remove old mirror segment directory: " + mirror.dataDirectory)
            cmd = RemoveFiles("remove old mirror segment directories"
L
Larry Hamel 已提交
413 414 415 416
                              , mirror.dataDirectory
                              , ctxt=REMOTE
                              , remoteHost=mirror.address
                              )
417 418
            pool.addCommand(cmd)
            for filespaceName in mirror.filespaces.keys():
L
Larry Hamel 已提交
419 420 421
                logger.info(
                    "About to remove old mirror filespace directory: " + filespaceName + " : " + mirror.filespaces[
                        filespaceName])
422
                cmd = RemoveFiles("remove old mirror segment directories"
L
Larry Hamel 已提交
423 424 425 426
                                  , mirror.filespaces[filespaceName]
                                  , ctxt=REMOTE
                                  , remoteHost=mirror.address
                                  )
427
                pool.addCommand(cmd)
L
Larry Hamel 已提交
428

429 430
        # Wait for the segments to finish
        try:
L
Larry Hamel 已提交
431
            pool.join()
432
        except:
L
Larry Hamel 已提交
433 434 435
            pool.haltWork()
            pool.joinWorkers()

436 437 438 439 440
        failure = False
        results = []
        for cmd in pool.getCompletedItems():
            r = cmd.get_results()
            if not cmd.was_successful():
L
Larry Hamel 已提交
441 442
                logging.error("Unable to remove old mirror segment directory: " + str(r))
                failure = True
443 444 445 446

        pool.haltWork()
        pool.joinWorkers()
        if failure:
L
Larry Hamel 已提交
447 448 449 450
            logging.error("Although the system is in the process of recovering the mirrors to their new location,")
            logging.error("There was an issue removing the old mirror segment directories.")
            raise Exception("Unable to complete removal of old mirror segment directories.")

451 452 453
    except Exception, e:
        raise e

L
Larry Hamel 已提交
454
except Exception, e:
455 456 457
    if options is not None and options.verbose:
        logger.exception("gpmovemirrors failed. exiting...")
    else:
L
Larry Hamel 已提交
458
        logger.error("gpmovemirrors failed: %s \n\nExiting..." % e)
459 460 461 462
    sys.exit(3)

except KeyboardInterrupt:
    # Disable SIGINT while we shutdown.
L
Larry Hamel 已提交
463
    signal.signal(signal.SIGINT, signal.SIG_IGN)
464 465 466 467

    shutdown()

    # Re-enabled SIGINT
L
Larry Hamel 已提交
468
    signal.signal(signal.SIGINT, signal.default_int_handler)
469 470 471 472 473

    sys.exit('\nUser Interrupted')

finally:
    if sml is not None:
L
Larry Hamel 已提交
474
        sml.release()