clsRecoverSegment.py 37.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
#!/usr/bin/env python
# Line too long            - pylint: disable=C0301
# Invalid name             - pylint: disable=C0103
#
# Copyright (c) Greenplum Inc 2010. All Rights Reserved.
#
# Note: the option to recover to a new host is not very good if we have a multi-home configuration
#
# Options removed when 4.0 gprecoverseg was implemented:
#        --version
#       -S "Primary segment dbid to force recovery": I think this is done now by bringing the primary down, waiting for
#           failover, and then doing recover full
#       -z "Primary segment data dir and host to force recovery" see removed -S option for comment
#       -f        : force Greenplum Database instance shutdown and restart
#       -F (HAS BEEN CHANGED) -- used to mean "force recovery" and now means "full recovery)
#
# import mainUtils FIRST to get python version check
# THIS IMPORT SHOULD COME FIRST
from gppylib.mainUtils import *

L
Larry Hamel 已提交
21
from optparse import OptionGroup
T
Tyler Ramer 已提交
22
import os, sys, signal, time
23 24


L
Larry Hamel 已提交
25
from gppylib import gparray, gplog, userinput, utils
26
from gppylib.util import gp_utils
27 28
from gppylib.commands import gp, pg, unix
from gppylib.commands.base import Command, WorkerPool
29
from gppylib.db import dbconn
30 31 32 33 34 35 36
from gppylib.gpparseopts import OptParser, OptChecker
from gppylib.operations.startSegments import *
from gppylib.operations.buildMirrorSegments import *
from gppylib.operations.rebalanceSegments import GpSegmentRebalanceOperation
from gppylib.programs import programIoUtils
from gppylib.system import configurationInterface as configInterface
from gppylib.system.environment import GpMasterEnvironment
37 38
from gppylib.parseutils import line_reader, check_values, canonicalize_address
from gppylib.utils import writeLinesToFile, normalizeAndValidateInputPath, TableLogger
39 40
from gppylib.gphostcache import GpInterfaceToHostNameCache
from gppylib.operations.utils import ParallelOperation
C
Chumki Roy 已提交
41
from gppylib.operations.package import SyncPackages
42
from gppylib.heapchecksum import HeapChecksum
43
from gppylib.mainUtils import ExceptionNoStackTraceNeeded
44 45 46

logger = gplog.get_default_logger()

47

48 49 50 51 52 53 54 55 56
class PortAssigner:
    """
    Used to assign new ports to segments on a host

    Note that this could be improved so that we re-use ports for segments that are being recovered but this
      does not seem necessary.

    """

57
    MAX_PORT_EXCLUSIVE = 65536
58 59 60 61 62 63 64 65

    def __init__(self, gpArray):
        #
        # determine port information for recovering to a new host --
        #   we need to know the ports that are in use and the valid range of ports
        #
        segments = gpArray.getDbList()
        ports = [seg.getSegmentPort() for seg in segments if seg.isSegmentQE()]
66
        if len(ports) > 0:
67 68 69 70 71 72 73 74 75 76 77
            self.__minPort = min(ports)
        else:
            raise Exception("No segment ports found in array.")
        self.__usedPortsByHostName = {}

        byHost = GpArray.getSegmentsByHostName(segments)
        for hostName, segments in byHost.iteritems():
            usedPorts = self.__usedPortsByHostName[hostName] = {}
            for seg in segments:
                usedPorts[seg.getSegmentPort()] = True

78
    def findAndReservePort(self, hostName, address):
79
        """
80
        Find a port not used by any postmaster process.
81 82 83 84 85 86 87
        When found, add an entry:  usedPorts[port] = True   and return the port found
        Otherwise raise an exception labeled with the given address
        """
        if hostName not in self.__usedPortsByHostName:
            self.__usedPortsByHostName[hostName] = {}
        usedPorts = self.__usedPortsByHostName[hostName]

88
        minPort = self.__minPort
89 90 91 92 93 94 95
        for port in range(minPort, PortAssigner.MAX_PORT_EXCLUSIVE):
            if port not in usedPorts:
                usedPorts[port] = True
                return port
        raise Exception("Unable to assign port on %s" % address)


96 97
# -------------------------------------------------------------------------

98
class RemoteQueryCommand(Command):
99 100 101 102 103 104 105 106 107 108 109 110
    def __init__(self, qname, query, hostname, port, dbname=None):
        self.qname = qname
        self.query = query
        self.hostname = hostname
        self.port = port
        self.dbname = dbname or os.environ.get('PGDATABASE', None) or 'template1'
        self.res = None

    def get_results(self):
        return self.res

    def run(self):
111 112 113 114
        logger.debug('Executing query (%s:%s) for segment (%s:%s) on database (%s)' % (
            self.qname, self.query, self.hostname, self.port, self.dbname))
        with dbconn.connect(dbconn.DbURL(hostname=self.hostname, port=self.port, dbname=self.dbname),
                            utility=True) as conn:
T
Tyler Ramer 已提交
115 116
            self.res = dbconn.query(conn, self.query).fetchall()
        conn.close()
117

118 119 120
# -------------------------------------------------------------------------

class GpRecoverSegmentProgram:
121 122 123 124 125 126 127 128 129 130
    #
    # Constructor:
    #
    # @param options the options as returned by the options parser
    #
    def __init__(self, options):
        self.__options = options
        self.__pool = None
        self.logger = logger

131 132 133 134 135 136
        # If user did not specify a value for showProgressInplace and
        # stdout is a tty then send escape sequences to gprecoverseg
        # output. Otherwise do not show progress inplace.
        if self.__options.showProgressInplace is None:
            self.__options.showProgressInplace = sys.stdout.isatty()

137 138 139 140 141 142 143 144 145 146 147 148 149

    def getProgressMode(self):
        if self.__options.showProgress:
            if self.__options.showProgressInplace:
                progressMode = GpMirrorListToBuild.Progress.INPLACE
            else:
                progressMode = GpMirrorListToBuild.Progress.SEQUENTIAL
        else:
            progressMode = GpMirrorListToBuild.Progress.NONE

        return progressMode


150 151 152
    def outputToFile(self, mirrorBuilder, gpArray, fileName):
        lines = []

153
        # one entry for each failure
154
        for mirror in mirrorBuilder.getMirrorsToBuild():
C
Chumki Roy 已提交
155
            output_str = ""
156 157
            seg = mirror.getFailedSegment()
            addr = canonicalize_address(seg.getSegmentAddress())
158
            output_str += ('%s|%d|%s' % (addr, seg.getSegmentPort(), seg.getSegmentDataDirectory()))
159 160 161 162

            seg = mirror.getFailoverSegment()
            if seg is not None:

C
Chumki Roy 已提交
163
                output_str += ' '
164
                addr = canonicalize_address(seg.getSegmentAddress())
165
                output_str += ('%s|%d|%s' % (
166
                    addr, seg.getSegmentPort(), seg.getSegmentDataDirectory()))
167

C
Chumki Roy 已提交
168
            lines.append(output_str)
169 170
        writeLinesToFile(fileName, lines)

171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
    def _getParsedRow(self, filename, lineno, line):
        groups = line.split()  # NOT line.split(' ') due to MPP-15675
        if len(groups) not in [1, 2]:
            msg = "line %d of file %s: expected 1 or 2 groups but found %d" % (lineno, filename, len(groups))
            raise ExceptionNoStackTraceNeeded(msg)
        parts = groups[0].split('|')
        if len(parts) != 3:
            msg = "line %d of file %s: expected 3 parts on failed segment group, obtained %d" % (
                lineno, filename, len(parts))
            raise ExceptionNoStackTraceNeeded(msg)
        address, port, datadir = parts
        check_values(lineno, address=address, port=port, datadir=datadir)
        row = {
            'failedAddress': address,
            'failedPort': port,
            'failedDataDirectory': datadir,
            'lineno': lineno
        }
        if len(groups) == 2:
            parts2 = groups[1].split('|')
            if len(parts2) != 3:
                msg = "line %d of file %s: expected 3 parts on new segment group, obtained %d" % (
                    lineno, filename, len(parts2))
                raise ExceptionNoStackTraceNeeded(msg)
            address2, port2, datadir2 = parts2
            check_values(lineno, address=address2, port=port2, datadir=datadir2)
            row.update({
                'newAddress': address2,
                'newPort': port2,
                'newDataDirectory': datadir2
            })

        return row

205 206 207 208
    def getRecoveryActionsFromConfigFile(self, gpArray):
        """
        getRecoveryActionsFromConfigFile

209 210
        returns: a tuple (segments in change tracking disabled mode which are unable to recover, GpMirrorListToBuild object
                 containing information of segments which are able to recover)
211 212
        """
        filename = self.__options.recoveryConfigFile
213
        rows = []
214 215
        with open(filename) as f:
            for lineno, line in line_reader(f):
216
                rows.append(self._getParsedRow(filename, lineno, line))
217

218 219
        allAddresses = [row["newAddress"] for row in rows if "newAddress" in row]
        interfaceLookup = GpInterfaceToHostNameCache(self.__pool, allAddresses, [None]*len(allAddresses))
220 221 222

        failedSegments = []
        failoverSegments = []
223
        for row in rows:
224
            # find the failed segment
225 226 227 228
            failedAddress = row['failedAddress']
            failedPort = row['failedPort']
            failedDataDirectory = normalizeAndValidateInputPath(row['failedDataDirectory'],
                                                                "config file", row['lineno'])
229 230
            failedSegment = None
            for segment in gpArray.getDbList():
231 232 233
                if (segment.getSegmentAddress() == failedAddress
                        and str(segment.getSegmentPort()) == failedPort
                        and segment.getSegmentDataDirectory() == failedDataDirectory):
234 235 236

                    if failedSegment is not None:
                        # this could be an assertion -- configuration should not allow multiple entries!
L
Larry Hamel 已提交
237
                        raise Exception(("A segment to recover was found twice in configuration.  "
238
                                         "This segment is described by address|port|directory '%s|%s|%s' "
L
Larry Hamel 已提交
239
                                         "on the input line: %s") %
240
                                        (failedAddress, failedPort, failedDataDirectory, row['lineno']))
241 242 243 244
                    failedSegment = segment

            if failedSegment is None:
                raise Exception("A segment to recover was not found in configuration.  " \
245 246
                                "This segment is described by address|port|directory '%s|%s|%s' on the input line: %s" %
                                (failedAddress, failedPort, failedDataDirectory, row['lineno']))
247 248

            failoverSegment = None
249
            if "newAddress" in row:
250 251 252 253 254 255 256 257
                """
                When the second set was passed, the caller is going to tell us to where we need to failover, so
                  build a failover segment
                """
                # these two lines make it so that failoverSegment points to the object that is registered in gparray
                failoverSegment = failedSegment
                failedSegment = failoverSegment.copy()

258
                address = row["newAddress"]
259
                try:
260
                    port = int(row["newPort"])
261
                except ValueError:
262
                    raise Exception('Config file format error, invalid number value in line: %s' % (row['lineno']))
263

264 265
                dataDirectory = normalizeAndValidateInputPath(row["newDataDirectory"], "config file",
                                                              row['lineno'])
266 267 268

                hostName = interfaceLookup.getHostName(address)
                if hostName is None:
269
                    raise Exception('Unable to find host name for address %s from line:%s' % (address, row['lineno']))
270 271

                # now update values in failover segment
272 273 274 275
                failoverSegment.setSegmentAddress(address)
                failoverSegment.setSegmentHostName(hostName)
                failoverSegment.setSegmentPort(port)
                failoverSegment.setSegmentDataDirectory(dataDirectory)
276 277 278 279 280 281 282 283 284

            # this must come AFTER the if check above because failedSegment can be adjusted to
            #   point to a different object
            failedSegments.append(failedSegment)
            failoverSegments.append(failoverSegment)

        peersForFailedSegments = self.findAndValidatePeersForFailedSegments(gpArray, failedSegments)

        segs = []
285
        segs_with_persistent_mirroring_disabled = []
286 287 288 289
        for index, failedSegment in enumerate(failedSegments):
            peerForFailedSegment = peersForFailedSegments[index]

            peerForFailedSegmentDbId = peerForFailedSegment.getSegmentDbId()
J
Jimmy Yih 已提交
290 291
            segs.append(GpMirrorToBuild(failedSegment, peerForFailedSegment, failoverSegments[index],
                                        self.__options.forceFullResynchronization))
292

293
        self._output_segments_with_persistent_mirroring_disabled(segs_with_persistent_mirroring_disabled)
294

J
Jimmy Yih 已提交
295
        return GpMirrorListToBuild(segs, self.__pool, self.__options.quiet,
296
                                   self.__options.parallelDegree, forceoverwrite=True,
297
                                   progressMode=self.getProgressMode())
298 299 300

    def findAndValidatePeersForFailedSegments(self, gpArray, failedSegments):
        dbIdToPeerMap = gpArray.getDbIdToPeerMap()
301
        peersForFailedSegments = [dbIdToPeerMap.get(seg.getSegmentDbId()) for seg in failedSegments]
302

303 304 305 306 307
        for i in range(len(failedSegments)):
            peer = peersForFailedSegments[i]
            if peer is None:
                raise Exception("No peer found for dbid %s" % failedSegments[i].getSegmentDbId())
            elif peer.isSegmentDown():
308 309 310
                raise Exception(
                    "Both segments for content %s are down; Try restarting Greenplum DB and running %s again." %
                    (peer.getSegmentContentId(), getProgramName()))
311 312 313 314 315 316
        return peersForFailedSegments

    def getRecoveryActionsFromConfiguration(self, gpEnv, gpArray):
        """
        getRecoveryActionsFromConfiguration

317 318
        returns: a tuple (segments in change tracking disabled mode which are unable to recover, GpMirrorListToBuild object
                 containing information of segments which are able to recover)
319 320 321
        """
        segments = gpArray.getSegDbList()

322
        failedSegments = [seg for seg in segments if seg.isSegmentDown()]
323 324 325 326 327 328
        peersForFailedSegments = self.findAndValidatePeersForFailedSegments(gpArray, failedSegments)

        # Dictionaries used for building mapping to new hosts
        recoverAddressMap = {}
        recoverHostMap = {}
        interfaceHostnameWarnings = []
C
Chumki Roy 已提交
329

330 331
        # Check if the array is a "standard" array
        (isStandardArray, _ignore) = gpArray.isStandardArray()
C
Chumki Roy 已提交
332

333
        recoverHostIdx = 0
C
Chumki Roy 已提交
334

335 336 337 338
        if self.__options.newRecoverHosts and len(self.__options.newRecoverHosts) > 0:
            for seg in failedSegments:
                segAddress = seg.getSegmentAddress()
                segHostname = seg.getSegmentHostName()
C
Chumki Roy 已提交
339

340 341 342 343 344 345 346 347 348 349 350 351 352 353
                # Haven't seen this hostname before so we put it on a new host
                if not recoverHostMap.has_key(segHostname):
                    try:
                        recoverHostMap[segHostname] = self.__options.newRecoverHosts[recoverHostIdx]
                    except:
                        # If we get here, not enough hosts were specified in the -p option.  Need 1 new host
                        # per 1 failed host.
                        raise Exception('Not enough new recovery hosts given for recovery.')
                    recoverHostIdx += 1

                if isStandardArray:
                    # We have a standard array configuration, so we'll try to use the same
                    # interface naming convention.  If this doesn't work, we'll correct it
                    # below on name lookup
C
Chumki Roy 已提交
354
                    segInterface = segAddress[segAddress.rfind('-'):]
355 356 357
                    destAddress = recoverHostMap[segHostname] + segInterface
                    destHostname = recoverHostMap[segHostname]
                else:
C
Chumki Roy 已提交
358 359
                    # Non standard configuration so we won't make assumptions on
                    # naming.  Instead we'll use the hostname passed in for both
360 361 362 363 364 365 366 367 368 369 370
                    # hostname and address and flag for warning later.
                    destAddress = recoverHostMap[segHostname]
                    destHostname = recoverHostMap[segHostname]

                # Save off the new host/address for this address.
                recoverAddressMap[segAddress] = (destHostname, destAddress)

            # Now that we've generated the mapping, look up all the addresses to make
            # sure they are resolvable.
            interfaces = [address for (_ignore, address) in recoverAddressMap.values()]
            interfaceLookup = GpInterfaceToHostNameCache(self.__pool, interfaces, [None] * len(interfaces))
C
Chumki Roy 已提交
371

372 373 374 375 376 377
            for key in recoverAddressMap.keys():
                (newHostname, newAddress) = recoverAddressMap[key]
                try:
                    addressHostnameLookup = interfaceLookup.getHostName(newAddress)
                    # Lookup failed so use hostname passed in for everything.
                    if addressHostnameLookup is None:
378 379
                        interfaceHostnameWarnings.append(
                            "Lookup of %s failed.  Using %s for both hostname and address." % (newAddress, newHostname))
380 381 382 383
                        newAddress = newHostname
                except:
                    # Catch all exceptions.  We will use hostname instead of address
                    # that we generated.
384 385
                    interfaceHostnameWarnings.append(
                        "Lookup of %s failed.  Using %s for both hostname and address." % (newAddress, newHostname))
386
                    newAddress = newHostname
C
Chumki Roy 已提交
387

388 389 390 391 392 393 394
                # if we've updated the address to use the hostname because of lookup failure
                # make sure the hostname is resolvable and up
                if newHostname == newAddress:
                    try:
                        unix.Ping.local("ping new hostname", newHostname)
                    except:
                        raise Exception("Ping of host %s failed." % newHostname)
C
Chumki Roy 已提交
395

396 397
                # Save changes in map
                recoverAddressMap[key] = (newHostname, newAddress)
C
Chumki Roy 已提交
398

399 400 401 402
            if len(self.__options.newRecoverHosts) != recoverHostIdx:
                interfaceHostnameWarnings.append("The following recovery hosts were not needed:")
                for h in self.__options.newRecoverHosts[recoverHostIdx:]:
                    interfaceHostnameWarnings.append("\t%s" % h)
C
Chumki Roy 已提交
403

404 405 406
        portAssigner = PortAssigner(gpArray)

        forceFull = self.__options.forceFullResynchronization
407

408
        segs = []
409
        segs_with_persistent_mirroring_disabled = []
410 411 412 413 414 415 416 417 418 419 420
        for i in range(len(failedSegments)):

            failoverSegment = None
            failedSegment = failedSegments[i]
            liveSegment = peersForFailedSegments[i]

            if self.__options.newRecoverHosts and len(self.__options.newRecoverHosts) > 0:
                (newRecoverHost, newRecoverAddress) = recoverAddressMap[failedSegment.getSegmentAddress()]
                # these two lines make it so that failoverSegment points to the object that is registered in gparray
                failoverSegment = failedSegment
                failedSegment = failoverSegment.copy()
421 422
                failoverSegment.setSegmentHostName(newRecoverHost)
                failoverSegment.setSegmentAddress(newRecoverAddress)
423
                port = portAssigner.findAndReservePort(newRecoverHost, newRecoverAddress)
424
                failoverSegment.setSegmentPort(port)
425

J
Jimmy Yih 已提交
426
            segs.append(GpMirrorToBuild(failedSegment, liveSegment, failoverSegment, forceFull))
427

428
        self._output_segments_with_persistent_mirroring_disabled(segs_with_persistent_mirroring_disabled)
429

J
Jimmy Yih 已提交
430 431
        return GpMirrorListToBuild(segs, self.__pool, self.__options.quiet,
                                   self.__options.parallelDegree,
432
                                   interfaceHostnameWarnings,
433
                                   forceoverwrite=True,
434
                                   progressMode=self.getProgressMode())
435

436 437 438
    def _output_segments_with_persistent_mirroring_disabled(self, segs_persistent_mirroring_disabled=None):
        if segs_persistent_mirroring_disabled:
            self.logger.warn('Segments with dbid %s not recovered; persistent mirroring state is disabled.' %
439
                             (', '.join(str(seg_id) for seg_id in segs_persistent_mirroring_disabled)))
440

441 442
    def getRecoveryActionsBasedOnOptions(self, gpEnv, gpArray):
        if self.__options.rebalanceSegments:
J
Jimmy Yih 已提交
443
            return GpSegmentRebalanceOperation(gpEnv, gpArray)
444 445 446 447 448 449
        elif self.__options.recoveryConfigFile is not None:
            return self.getRecoveryActionsFromConfigFile(gpArray)
        else:
            return self.getRecoveryActionsFromConfiguration(gpEnv, gpArray)

    def syncPackages(self, new_hosts):
C
Chumki Roy 已提交
450
        # The design decision here is to squash any exceptions resulting from the
451 452
        # synchronization of packages. We should *not* disturb the user's attempts to recover.
        try:
453
            self.logger.info('Syncing Greenplum Database extensions')
454
            operations = [SyncPackages(host) for host in new_hosts]
455 456 457 458
            ParallelOperation(operations, self.__options.parallelDegree).run()
            # introspect outcomes
            for operation in operations:
                operation.get_ret()
C
Chumki Roy 已提交
459
        except:
460 461
            self.logger.exception('Syncing of Greenplum Database extensions has failed.')
            self.logger.warning('Please run gppkg --clean after successful segment recovery.')
462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487

    def displayRecovery(self, mirrorBuilder, gpArray):
        self.logger.info('Greenplum instance recovery parameters')
        self.logger.info('---------------------------------------------------------')

        if self.__options.recoveryConfigFile:
            self.logger.info('Recovery from configuration -i option supplied')
        elif self.__options.newRecoverHosts is not None:
            self.logger.info('Recovery type              = Pool Host')
            for h in self.__options.newRecoverHosts:
                self.logger.info('Pool host for recovery     = %s' % h)
        elif self.__options.rebalanceSegments:
            self.logger.info('Recovery type              = Rebalance')
        else:
            self.logger.info('Recovery type              = Standard')

        if self.__options.rebalanceSegments:
            i = 1
            total = len(gpArray.get_unbalanced_segdbs())
            for toRebalance in gpArray.get_unbalanced_segdbs():
                tabLog = TableLogger()
                self.logger.info('---------------------------------------------------------')
                self.logger.info('Unbalanced segment %d of %d' % (i, total))
                self.logger.info('---------------------------------------------------------')
                programIoUtils.appendSegmentInfoForOutput("Unbalanced", gpArray, toRebalance, tabLog)
                tabLog.info(["Balanced role", "= Primary" if toRebalance.preferred_role == 'p' else "= Mirror"])
C
Chumki Roy 已提交
488
                tabLog.info(["Current role", "= Primary" if toRebalance.role == 'p' else "= Mirror"])
489
                tabLog.outputTable()
490
                i += 1
491 492 493 494 495
        else:
            i = 0
            total = len(mirrorBuilder.getMirrorsToBuild())
            for toRecover in mirrorBuilder.getMirrorsToBuild():
                self.logger.info('---------------------------------------------------------')
496
                self.logger.info('Recovery %d of %d' % (i + 1, total))
497
                self.logger.info('---------------------------------------------------------')
C
Chumki Roy 已提交
498

499
                tabLog = TableLogger()
C
Chumki Roy 已提交
500

501 502 503
                syncMode = "Full" if toRecover.isFullSynchronization() else "Incremental"
                tabLog.info(["Synchronization mode", "= " + syncMode])
                programIoUtils.appendSegmentInfoForOutput("Failed", gpArray, toRecover.getFailedSegment(), tabLog)
504 505
                programIoUtils.appendSegmentInfoForOutput("Recovery Source", gpArray, toRecover.getLiveSegment(),
                                                          tabLog)
C
Chumki Roy 已提交
506

507
                if toRecover.getFailoverSegment() is not None:
508 509
                    programIoUtils.appendSegmentInfoForOutput("Recovery Target", gpArray,
                                                              toRecover.getFailoverSegment(), tabLog)
510 511 512
                else:
                    tabLog.info(["Recovery Target", "= in-place"])
                tabLog.outputTable()
C
Chumki Roy 已提交
513

514 515 516 517 518
                i = i + 1

        self.logger.info('---------------------------------------------------------')

    def __getSimpleSegmentLabel(self, seg):
519 520
        addr = canonicalize_address(seg.getSegmentAddress())
        return "%s:%s" % (addr, seg.getSegmentDataDirectory())
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541

    def __displayRecoveryWarnings(self, mirrorBuilder):
        for warning in self._getRecoveryWarnings(mirrorBuilder):
            self.logger.warn(warning)

    def _getRecoveryWarnings(self, mirrorBuilder):
        """
        return an array of string warnings regarding the recovery
        """
        res = []
        for toRecover in mirrorBuilder.getMirrorsToBuild():

            if toRecover.getFailoverSegment() is not None:
                #
                # user specified a failover location -- warn if it's the same host as its primary
                #
                src = toRecover.getLiveSegment()
                dest = toRecover.getFailoverSegment()

                if src.getSegmentHostName() == dest.getSegmentHostName():
                    res.append("Segment is being recovered to the same host as its primary: "
542 543
                               "primary %s    failover target: %s"
                               % (self.__getSimpleSegmentLabel(src), self.__getSimpleSegmentLabel(dest)))
C
Chumki Roy 已提交
544

545 546
        for warning in mirrorBuilder.getAdditionalWarnings():
            res.append(warning)
C
Chumki Roy 已提交
547

548 549 550
        return res

    def _get_dblist(self):
551
        # template0 does not accept any connections so we exclude it
552
        with dbconn.connect(dbconn.DbURL()) as conn:
T
Tyler Ramer 已提交
553
            res = dbconn.query(conn, "SELECT datname FROM PG_DATABASE WHERE datname != 'template0'")
T
Tyler Ramer 已提交
554
        conn.close()
555 556
        return res.fetchall()

557 558
    def run(self):
        if self.__options.parallelDegree < 1 or self.__options.parallelDegree > 64:
559 560
            raise ProgramArgumentValidationException(
                "Invalid parallelDegree provided with -B argument: %d" % self.__options.parallelDegree)
561

562
        self.__pool = WorkerPool(self.__options.parallelDegree)
563 564 565 566 567 568 569 570 571 572 573
        gpEnv = GpMasterEnvironment(self.__options.masterDataDirectory, True)

        # verify "where to recover" options
        optionCnt = 0
        if self.__options.newRecoverHosts is not None:
            optionCnt += 1
        if self.__options.recoveryConfigFile is not None:
            optionCnt += 1
        if self.__options.rebalanceSegments:
            optionCnt += 1
        if optionCnt > 1:
574
            raise ProgramArgumentValidationException("Only one of -i, -p, and -r may be specified")
575 576 577 578 579 580 581

        faultProberInterface.getFaultProber().initializeProber(gpEnv.getMasterPort())

        confProvider = configInterface.getConfigurationProvider().initializeProvider(gpEnv.getMasterPort())

        gpArray = confProvider.loadSystemConfig(useUtilityMode=False)

582
        if not gpArray.hasMirrors:
L
Larry Hamel 已提交
583
            raise ExceptionNoStackTraceNeeded(
584
                'GPDB Mirroring replication is not configured for this Greenplum Database instance.')
585 586 587 588 589 590

        # We have phys-rep/filerep mirrors.

        if self.__options.newRecoverHosts is not None:
            try:
                uniqueHosts = []
C
Chumki Roy 已提交
591 592 593
                for h in self.__options.newRecoverHosts.split(','):
                    if h.strip() not in uniqueHosts:
                        uniqueHosts.append(h.strip())
594 595
                self.__options.newRecoverHosts = uniqueHosts
            except Exception, ex:
596
                raise ProgramArgumentValidationException( \
597 598 599 600 601 602 603 604 605 606
                    "Invalid value for recover hosts: %s" % ex)

        # If it's a rebalance operation, make sure we are in an acceptable state to do that
        # Acceptable state is:
        #    - No segments down
        #    - No segments in change tracking or unsynchronized state
        if self.__options.rebalanceSegments:
            if len(gpArray.get_invalid_segdbs()) > 0:
                raise Exception("Down segments still exist.  All segments must be up to rebalance.")
            if len(gpArray.get_synchronized_segdbs()) != len(gpArray.getSegDbList()):
607 608
                raise Exception(
                    "Some segments are not yet synchronized.  All segments must be synchronized to rebalance.")
609 610 611 612 613 614 615

        # retain list of hosts that were existing in the system prior to getRecoverActions...
        # this will be needed for later calculations that determine whether
        # new hosts were added into the system
        existing_hosts = set(gpArray.getHostList())

        # figure out what needs to be done
J
Jimmy Yih 已提交
616
        mirrorBuilder = self.getRecoveryActionsBasedOnOptions(gpEnv, gpArray)
617 618 619 620 621 622

        if self.__options.outputSampleConfigFile is not None:
            # just output config file and done
            self.outputToFile(mirrorBuilder, gpArray, self.__options.outputSampleConfigFile)
            self.logger.info('Configuration file output to %s successfully.' % self.__options.outputSampleConfigFile)
        elif self.__options.rebalanceSegments:
623
            assert (isinstance(mirrorBuilder, GpSegmentRebalanceOperation))
624 625 626 627

            # Make sure we have work to do
            if len(gpArray.get_unbalanced_segdbs()) == 0:
                self.logger.info("No segments are running in their non-preferred role and need to be rebalanced.")
C
Chumki Roy 已提交
628
            else:
629
                self.displayRecovery(mirrorBuilder, gpArray)
C
Chumki Roy 已提交
630

631 632 633 634 635
                if self.__options.interactive:
                    self.logger.warn("This operation will cancel queries that are currently executing.")
                    self.logger.warn("Connections to the database however will not be interrupted.")
                    if not userinput.ask_yesno(None, "\nContinue with segment rebalance procedure", 'N'):
                        raise UserAbortedException()
C
Chumki Roy 已提交
636

637
                fullRebalanceDone = mirrorBuilder.rebalance()
638
                self.logger.info("******************************************************************")
639 640 641 642 643
                if fullRebalanceDone:
                    self.logger.info("The rebalance operation has completed successfully.")
                else:
                    self.logger.info("The rebalance operation has completed with WARNINGS."
                                     " Please review the output in the gprecoverseg log.")
644 645 646 647
                self.logger.info("There is a resynchronization running in the background to bring all")
                self.logger.info("segments in sync.")
                self.logger.info("Use gpstate -e to check the resynchronization progress.")
                self.logger.info("******************************************************************")
C
Chumki Roy 已提交
648

J
Jimmy Yih 已提交
649
        elif len(mirrorBuilder.getMirrorsToBuild()) == 0:
650 651 652
            self.logger.info('No segments to recover')
        else:
            mirrorBuilder.checkForPortAndDirectoryConflicts(gpArray)
653
            self.validate_heap_checksum_consistency(gpArray, mirrorBuilder)
654 655 656 657 658 659 660 661 662 663 664 665 666 667

            self.displayRecovery(mirrorBuilder, gpArray)
            self.__displayRecoveryWarnings(mirrorBuilder)

            if self.__options.interactive:
                if not userinput.ask_yesno(None, "\nContinue with segment recovery procedure", 'N'):
                    raise UserAbortedException()

            # sync packages
            current_hosts = set(gpArray.getHostList())
            new_hosts = current_hosts - existing_hosts
            if new_hosts:
                self.syncPackages(new_hosts)

668 669
            if not mirrorBuilder.buildMirrors("recover", gpEnv, gpArray):
                sys.exit(1)
C
Chumki Roy 已提交
670

J
Jialun 已提交
671
            self.trigger_fts_probe(port=gpEnv.getMasterPort())
J
Jimmy Yih 已提交
672

673
            self.logger.info("******************************************************************")
J
Jimmy Yih 已提交
674 675 676
            self.logger.info("Updating segments for streaming is completed.")
            self.logger.info("For segments updated successfully, streaming will continue in the background.")
            self.logger.info("Use  gpstate -s  to check the streaming progress.")
677 678
            self.logger.info("******************************************************************")

679
        sys.exit(0)
680

J
Jialun 已提交
681
    def trigger_fts_probe(self, port=0):
J
Jimmy Yih 已提交
682
        self.logger.info('Triggering FTS probe')
T
Tyler Ramer 已提交
683 684 685 686 687 688 689 690
        conn = dbconn.connect(dbconn.DbURL(port=port))

        # XXX Perform two probe scans in a row, to work around a known
        # race where gp_request_fts_probe_scan() can return early during the
        # first call. Remove this duplication once that race is fixed.
        for _ in range(2):
            dbconn.execSQL(conn,"SELECT gp_request_fts_probe_scan()")
        conn.close()
J
Jimmy Yih 已提交
691

692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715
    def validate_heap_checksum_consistency(self, gpArray, mirrorBuilder):
        live_segments = [target.getLiveSegment() for target in mirrorBuilder.getMirrorsToBuild()]
        if len(live_segments) == 0:
            self.logger.info("No checksum validation necessary when there are no segments to recover.")
            return

        heap_checksum = HeapChecksum(gpArray, num_workers=len(live_segments), logger=self.logger)
        successes, failures = heap_checksum.get_segments_checksum_settings(live_segments)
        # go forward if we have at least one segment that has replied
        if len(successes) == 0:
            raise Exception("No segments responded to ssh query for heap checksum validation.")
        consistent, inconsistent, master_checksum_value = heap_checksum.check_segment_consistency(successes)
        if len(inconsistent) > 0:
            self.logger.fatal("Heap checksum setting differences reported on segments")
            self.logger.fatal("Failed checksum consistency validation:")
            for gpdb in inconsistent:
                segment_name = gpdb.getSegmentHostName()
                checksum = gpdb.heap_checksum
                self.logger.fatal("%s checksum set to %s differs from master checksum set to %s" %
                                  (segment_name, checksum, master_checksum_value))
            raise Exception("Heap checksum setting differences reported on segments")
        self.logger.info("Heap checksum setting is consistent between master and the segments that are candidates "
                         "for recoverseg")

716 717
    def cleanup(self):
        if self.__pool:
718 719 720 721 722
            self.__pool.haltWork()  # \  MPP-13489, CR-2572
            self.__pool.joinWorkers()  # > all three of these appear necessary
            self.__pool.join()  # /  see MPP-12633, CR-2252 as well

    # -------------------------------------------------------------------------
723 724 725 726 727 728 729 730

    @staticmethod
    def createParser():

        description = ("Recover a failed segment")
        help = [""]

        parser = OptParser(option_class=OptChecker,
731 732
                           description=' '.join(description.split()),
                           version='%prog version $Revision$')
733 734
        parser.setHelp(help)

735 736 737 738
        loggingGroup = addStandardLoggingAndHelpOptions(parser, True)
        loggingGroup.add_option("-s", None, default=None, action='store_false',
                                dest='showProgressInplace',
                                help='Show pg_basebackup progress sequentially instead of inplace')
739 740 741
        loggingGroup.add_option("--no-progress",
                                dest="showProgress", default=True, action="store_false",
                                help="Suppress pg_basebackup progress output")
742 743 744 745 746 747 748 749

        addTo = OptionGroup(parser, "Connection Options")
        parser.add_option_group(addTo)
        addMasterDirectoryOptionForSingleClusterProgram(addTo)

        addTo = OptionGroup(parser, "Recovery Source Options")
        parser.add_option_group(addTo)
        addTo.add_option("-i", None, type="string",
750 751 752
                         dest="recoveryConfigFile",
                         metavar="<configFile>",
                         help="Recovery configuration file")
753
        addTo.add_option("-o", None,
754 755 756 757
                         dest="outputSampleConfigFile",
                         metavar="<configFile>", type="string",
                         help="Sample configuration file name to output; "
                              "this file can be passed to a subsequent call using -i option")
758 759 760 761

        addTo = OptionGroup(parser, "Recovery Destination Options")
        parser.add_option_group(addTo)
        addTo.add_option("-p", None, type="string",
762 763 764
                         dest="newRecoverHosts",
                         metavar="<targetHosts>",
                         help="Spare new hosts to which to recover segments")
765 766 767 768

        addTo = OptionGroup(parser, "Recovery Options")
        parser.add_option_group(addTo)
        addTo.add_option('-F', None, default=False, action='store_true',
769 770 771
                         dest="forceFullResynchronization",
                         metavar="<forceFullResynchronization>",
                         help="Force full segment resynchronization")
772
        addTo.add_option("-B", None, type="int", default=16,
773 774 775
                         dest="parallelDegree",
                         metavar="<parallelDegree>",
                         help="Max # of workers to use for building recovery segments.  [default: %default]")
776 777 778 779 780 781 782 783
        addTo.add_option("-r", None, default=False, action='store_true',
                         dest='rebalanceSegments', help='Rebalance synchronized segments.')

        parser.set_defaults()
        return parser

    @staticmethod
    def createProgram(options, args):
784
        if len(args) > 0:
L
Larry Hamel 已提交
785
            raise ProgramArgumentValidationException("too many arguments: only options may be specified", True)
786
        return GpRecoverSegmentProgram(options)
C
Chumki Roy 已提交
787

788 789 790
    @staticmethod
    def mainOptions():
        """
C
Chumki Roy 已提交
791
        The dictionary this method returns instructs the simple_main framework
T
Tyler Ramer 已提交
792
        to check for a gprecoverseg.lock file under MASTER_DATA_DIRECTORY to
793 794 795
        prevent the customer from trying to run more than one instance of
        gprecoverseg at the same time.
        """
T
Tyler Ramer 已提交
796
        return {'pidlockpath': 'gprecoverseg.lock', 'parentpidvar': 'GPRECOVERPID'}