gpexpand 138.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
#!/usr/bin/env python
# Line too long            - pylint: disable=C0301
# Invalid name             - pylint: disable=C0103
#
# Copyright (c) Greenplum Inc 2008. All Rights Reserved.
#
from gppylib.mainUtils import getProgramName

import copy
import datetime
import os
import sys
import socket
import signal
import traceback
from time import strftime, sleep

try:
    from gppylib.commands.unix import *
    from gppylib.fault_injection import inject_fault
    from gppylib.commands.gp import *
    from gppylib.commands.pg import PgControlData
    from gppylib.gparray import GpArray, MODE_CHANGELOGGING, STATUS_DOWN
    from gppylib.gpparseopts import OptParser, OptChecker
    from gppylib.gplog import *
    from gppylib.db import catalog
    from gppylib.db import dbconn
    from gppylib.userinput import *
    from gppylib.operations.startSegments import MIRROR_MODE_MIRRORLESS
    from gppylib.system import configurationInterface, configurationImplGpdb
    from gppylib.system.environment import GpMasterEnvironment
    from pygresql.pgdb import DatabaseError
    from pygresql import pg
    from gppylib.gpcatalog import MASTER_ONLY_TABLES
    from gppylib.operations.package import SyncPackages
    from gppylib.operations.utils import ParallelOperation
    from gppylib.parseutils import line_reader, parse_gpexpand_segment_line, \
        canonicalize_address
39 40 41
    from gppylib.operations.filespace import PG_SYSTEM_FILESPACE, GP_TRANSACTION_FILES_FILESPACE, \
        GP_TEMPORARY_FILES_FILESPACE, GetCurrentFilespaceEntries, GetFilespaceEntries, GetFilespaceEntriesDict, \
        RollBackFilespaceChanges, GetMoveOperationList, FileType, UpdateFlatFiles
42
    from gppylib.heapchecksum import HeapChecksum
43 44 45 46 47

except ImportError, e:
    sys.exit('ERROR: Cannot import modules.  Please check that you have sourced greenplum_path.sh.  Detail: ' + str(e))

# constants
48 49
MAX_PARALLEL_EXPANDS = 96
MAX_BATCH_SIZE = 128
50

51 52 53
GPDB_STOPPED = 1
GPDB_STARTED = 2
GPDB_UTILITY = 3
54 55 56 57 58

FILE_SPACES_INPUT_FILENAME_SUFFIX = ".fs"
SEGMENT_CONFIGURATION_BACKUP_FILE = "gpexpand.gp_segment_configuration"
FILE_SPACES_INPUT_FILE_LINE_1_PREFIX = "filespaceOrder"

59 60 61
#global var
_gp_expand = None

62 63 64 65 66 67 68 69 70 71
description = ("""
Adds additional segments to a pre-existing GPDB Array.
""")

_help = ["""
The input file should be be a plain text file with a line for each segment
to add with the format:

  <hostname>:<port>:<data_directory>:<dbid>:<content>:<definedprimary>
""",
72 73 74 75 76
         """
         If an input file is not specified, gpexpand will ask a series of questions
         and create one.
         """,
         ]
77 78 79 80 81 82 83

_TODO = ["""

Remaining TODO items:
====================
""",

84
         """* smarter heuristics on setting ranks. """,
85

86
         """* make sure system isn't in "readonly mode" during setup. """,
87

88 89 90 91 92
         """* need a startup validation where we check the status detail
             with gp_distribution_policy and make sure that our book
             keeping matches reality. we don't have a perfect transactional
             model since the tables can be in a different database from
             where the gpexpand schema is kept. """,
93

94 95 96
         """* currently requires that GPHOME and PYTHONPATH be set on all of the remote hosts of
              the system.  should get rid of this requirement. """
         ]
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113

_usage = """[-f hosts_file] [-D database_name]

gpexpand -i input_file [-D database_name] [-B batch_size] [-V] [-t segment_tar_dir] [-S]

gpexpand [-d duration[hh][:mm[:ss]] | [-e 'YYYY-MM-DD hh:mm:ss']]
         [-a] [-n parallel_processes] [-D database_name]

gpexpand -r [-D database_name]

gpexpand -c [-D database_name]

gpexpand -? | -h | --help | --verbose | -v"""

EXECNAME = os.path.split(__file__)[-1]


114
# ----------------------- Command line option parser ----------------------
115 116 117

def parseargs():
    parser = OptParser(option_class=OptChecker,
118 119
                       description=' '.join(description.split()),
                       version='%prog version $Revision$')
120 121 122 123
    parser.setHelp(_help)
    parser.set_usage('%prog ' + _usage)
    parser.remove_option('-h')

124 125
    parser.add_option('-c', '--clean', action='store_true',
                      help='remove the expansion schema.')
126 127 128 129 130 131
    parser.add_option('-r', '--rollback', action='store_true',
                      help='rollback failed expansion setup.')
    parser.add_option('-V', '--novacuum', action='store_true',
                      help='Do not vacuum catalog tables before creating schema copy.')
    parser.add_option('-a', '--analyze', action='store_true',
                      help='Analyze the expanded table after redistribution.')
132
    parser.add_option('-d', '--duration', type='duration', metavar='[h][:m[:s]]',
133
                      help='duration from beginning to end.')
134 135 136 137
    parser.add_option('-e', '--end', type='datetime', metavar='datetime',
                      help="ending date and time in the format 'YYYY-MM-DD hh:mm:ss'.")
    parser.add_option('-i', '--input', dest="filename",
                      help="input expansion configuration file.", metavar="FILE")
138 139
    parser.add_option('-f', '--hosts-file', metavar='<hosts_file>',
                      help='file containing new host names used to generate input file')
140
    parser.add_option('-D', '--database', dest='database',
141
                      help='Database to create the gpexpand schema and tables in.  If this ' \
142 143
                           'option is not given, PGDATABASE will be used.  The template1, ' \
                           'template0 and postgres databases cannot be used.')
144 145 146
    parser.add_option('-B', '--batch-size', type='int', default=16, metavar="<batch_size>",
                      help='Expansion configuration batch size. Valid values are 1-%d' % MAX_BATCH_SIZE)
    parser.add_option('-n', '--parallel', type="int", default=1, metavar="<parallel_processes>",
147 148
                      help='number of tables to expand at a time. Valid values are 1-%d.' % MAX_PARALLEL_EXPANDS)
    parser.add_option('-v', '--verbose', action='store_true',
149
                      help='debug output.')
150
    parser.add_option('-S', '--simple-progress', action='store_true',
151
                      help='show simple progress.')
152
    parser.add_option('-t', '--tardir', default='.', metavar="FILE",
153 154
                      help='Tar file directory.')
    parser.add_option('-h', '-?', '--help', action='help',
155
                      help='show this help message and exit.')
156 157 158 159
    parser.add_option('-s', '--silent', action='store_true',
                      help='Do not prompt for confirmation to proceed on warnings')
    parser.add_option('--usage', action="briefhelp")

160
    parser.set_defaults(verbose=False, filters=[], slice=(None, None))
161 162 163

    # Parse the command line arguments
    (options, args) = parser.parse_args()
164
    return options, args, parser
165

166
def validate_options(options, args, parser):
167 168 169 170
    if len(args) > 0:
        logger.error('Unknown argument %s' % args[0])
        parser.exit()

171
    # -n sanity check
172 173 174 175 176
    if options.parallel > MAX_PARALLEL_EXPANDS or options.parallel < 1:
        logger.error('Invalid argument.  parallel value must be >= 1 and <= %d' % MAX_PARALLEL_EXPANDS)
        parser.print_help()
        parser.exit()

177
    proccount = os.environ.get('GP_MGMT_PROCESS_COUNT')
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
    if options.batch_size == 16 and proccount is not None:
        options.batch_size = int(proccount)

    if options.batch_size < 1 or options.batch_size > 128:
        logger.error('Invalid argument.  -B value must be >= 1 and <= %s' % MAX_BATCH_SIZE)
        parser.print_help()
        parser.exit()

    # OptParse can return date instead of datetime so we might need to convert
    if options.end and not isinstance(options.end, datetime.datetime):
        options.end = datetime.datetime.combine(options.end, datetime.time(0))

    if options.end and options.end < datetime.datetime.now():
        logger.error('End time occurs in the past')
        parser.print_help()
        parser.exit()

    if options.end and options.duration:
        logger.warn('Both end and duration options were given.')
        # Both a duration and an end time were given.
        if options.end > datetime.datetime.now() + options.duration:
            logger.warn('The duration argument will be used for the expansion end time.')
            options.end = datetime.datetime.now() + options.duration
        else:
            logger.warn('The end argument will be used for the expansion end time.')
    elif options.duration:
        options.end = datetime.datetime.now() + options.duration

    # -c and -r options are mutually exclusive
    if options.rollback and options.clean:
        rollbackOpt = "--rollback" if "--rollback" in sys.argv else "-r"
        cleanOpt = "--clean" if "--clean" in sys.argv else "-c"
        logger.error("%s and %s options cannot be specified together." % (rollbackOpt, cleanOpt))
        parser.exit()

    try:
        options.master_data_directory = get_masterdatadir()
        options.gphome = get_gphome()
    except GpError, msg:
        logger.error(msg)
        parser.exit()

    if not os.path.exists(options.master_data_directory):
221 222
        logger.error('Master data directory does not exist.')
        parser.exit()
223 224 225 226 227 228 229 230 231 232 233 234 235

    if options.database and (options.database.lower() == 'template0'
                             or options.database.lower() == 'template1'
                             or options.database.lower() == 'postgres'):
        logger.error('%s cannot be used to store the gpexpand schema and tables' % options.database)
        parser.exit()
    elif not options.database:
        options.database = os.getenv('PGDATABASE')

    options.pgport = int(os.getenv('PGPORT', 5432))

    return options, args

236 237

# -------------------------------------------------------------------------
238 239 240 241 242 243 244 245 246 247 248
# process information functions
def create_pid_file(master_data_directory):
    """Creates gpexpand pid file"""
    try:
        fp = open(master_data_directory + '/gpexpand.pid', 'w')
        fp.write(str(os.getpid()))
    except IOError:
        raise
    finally:
        if fp: fp.close()

249

250 251 252 253 254 255 256
def remove_pid_file(master_data_directory):
    """Removes gpexpand pid file"""
    try:
        os.unlink(master_data_directory + '/gpexpand.pid')
    except:
        pass

257

258 259 260 261 262 263 264 265 266 267
def is_gpexpand_running(master_data_directory):
    """Checks if there is another instance of gpexpand running"""
    is_running = False
    try:
        fp = open(master_data_directory + '/gpexpand.pid', 'r')
        pid = int(fp.readline().strip())
        fp.close()
        is_running = check_pid(pid)
    except IOError:
        pass
268
    except Exception:
269 270 271 272 273 274 275 276 277 278
        raise

    return is_running


def gpexpand_status_file_exists(master_data_directory):
    """Checks if gpexpand.pid exists"""
    return os.path.exists(master_data_directory + '/gpexpand.status')


279 280
# -------------------------------------------------------------------------
# expansion schema
281 282 283 284 285 286

undone_status = "NOT STARTED"
start_status = "IN PROGRESS"
done_status = "COMPLETED"
does_not_exist_status = 'NO LONGER EXISTS'

287
gpexpand_schema = 'gpexpand'
288 289 290
create_schema_sql = "CREATE SCHEMA " + gpexpand_schema
drop_schema_sql = "DROP schema IF EXISTS %s CASCADE" % gpexpand_schema

291 292
status_table = 'status'
status_table_sql = """CREATE TABLE %s.%s
293
                        ( status text,
294
                          updated timestamp ) """ % (gpexpand_schema, status_table)
295

296 297
status_detail_table = 'status_detail'
status_detail_table_sql = """CREATE TABLE %s.%s
298 299 300 301 302 303 304 305 306 307 308 309 310 311
                        ( dbname text,
                          fq_name text,
                          schema_oid oid,
                          table_oid oid,
                          distribution_policy smallint[],
                          distribution_policy_names text,
                          distribution_policy_coloids text,
                          storage_options text,
                          rank int,
                          status text,
                          expansion_started timestamp,
                          expansion_finished timestamp,
                          source_bytes numeric ) """ % (gpexpand_schema, status_detail_table)
# gpexpand views
312 313
progress_view = 'expansion_progress'
progress_view_simple_sql = """CREATE VIEW %s.%s AS
314 315 316 317 318 319
SELECT
    CASE status
        WHEN '%s' THEN 'Tables Expanded'
        WHEN '%s' THEN 'Tables Left'
    END AS Name,
    count(*)::text AS Value
320 321
FROM %s.%s GROUP BY status""" % (gpexpand_schema, progress_view,
                                 done_status, undone_status, gpexpand_schema, status_detail_table)
322

323
progress_view_sql = """CREATE VIEW %s.%s AS
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423
SELECT
    CASE status
        WHEN '%s' THEN 'Tables Expanded'
        WHEN '%s' THEN 'Tables Left'
        WHEN '%s' THEN 'Tables In Progress'
    END AS Name,
    count(*)::text AS Value
FROM %s.%s GROUP BY status

UNION

SELECT
    CASE status
        WHEN '%s' THEN 'Bytes Done'
        WHEN '%s' THEN 'Bytes Left'
        WHEN '%s' THEN 'Bytes In Progress'
    END AS Name,
    SUM(source_bytes)::text AS Value
FROM %s.%s GROUP BY status

UNION

SELECT
    'Estimated Expansion Rate' AS Name,
    (SUM(source_bytes) / (1 + extract(epoch FROM (max(expansion_finished) - min(expansion_started)))) / 1024 / 1024)::text || ' MB/s' AS Value
FROM %s.%s
WHERE status = '%s'
AND
expansion_started > (SELECT updated FROM %s.%s WHERE status = '%s' ORDER BY updated DESC LIMIT 1)

UNION

SELECT
'Estimated Time to Completion' AS Name,
CAST((SUM(source_bytes) / (
SELECT 1 + SUM(source_bytes) / (1 + (extract(epoch FROM (max(expansion_finished) - min(expansion_started)))))
FROM %s.%s
WHERE status = '%s'
AND
expansion_started > (SELECT updated FROM %s.%s WHERE status = '%s' ORDER BY
updated DESC LIMIT 1)))::text || ' seconds' as interval)::text AS Value
FROM %s.%s
WHERE status = '%s'
  OR status = '%s'""" % (gpexpand_schema, progress_view,
                         done_status, undone_status, start_status,
                         gpexpand_schema, status_detail_table,
                         done_status, undone_status, start_status,
                         gpexpand_schema, status_detail_table,
                         gpexpand_schema, status_detail_table,
                         done_status,
                         gpexpand_schema, status_table,
                         'EXPANSION STARTED',
                         gpexpand_schema, status_detail_table,
                         done_status,
                         gpexpand_schema, status_table,
                         'EXPANSION STARTED',
                         gpexpand_schema, status_detail_table,
                         start_status, undone_status)

unalterable_table_sql = """
SELECT
    current_database() AS database,
    pg_catalog.quote_ident(nspname) || '.' ||
    pg_catalog.quote_ident(relname) AS table,
    attnum,
    attlen,
    attbyval,
    attstorage,
    attalign,
    atttypmod,
    attndims,
    reltoastrelid != 0 AS istoasted
FROM
    pg_catalog.pg_attribute,
    pg_catalog.pg_class,
    pg_catalog.pg_namespace
WHERE
    attisdropped
    AND attnum >= 0
    AND attrelid = pg_catalog.pg_class.oid
    AND relnamespace = pg_catalog.pg_namespace.oid
    AND (attlen, attbyval, attalign, attstorage) NOT IN
        (SELECT typlen, typbyval, typalign, typstorage
        FROM pg_catalog.pg_type
        WHERE typisdefined AND typtype='b' )
ORDER BY
    attrelid, attnum
"""

has_unique_index_sql = """
SELECT
    current_database() || '.' || pg_catalog.quote_ident(nspname) || '.' || pg_catalog.quote_ident(relname) AS table
FROM
    pg_class c,
    pg_namespace n,
    pg_index i
WHERE
  i.indrelid = c.oid
  AND c.relnamespace = n.oid
  AND i.indisunique
L
Larry Hamel 已提交
424
  AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast',
425 426 427
                        'pg_bitmapindex', 'pg_aoseg')
"""

428 429

# -------------------------------------------------------------------------
430
class InvalidStatusError(Exception): pass
431 432


433 434
class ValidationError(Exception): pass

435 436

# -------------------------------------------------------------------------
437 438 439 440 441 442
class GpExpandStatus():
    """Class that manages gpexpand status file.

    The status file is placed in the master data directory on both the master and
    the standby master.  it's used to keep track of where we are in the progression.
    """
443

444
    def __init__(self, logger, master_data_directory, master_mirror=None):
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
        self.logger = logger

        self._status_values = {'UNINITIALIZED': 1,
                               'EXPANSION_PREPARE_STARTED': 2,
                               'BUILD_SEGMENT_TEMPLATE_STARTED': 3,
                               'BUILD_SEGMENT_TEMPLATE_DONE': 4,
                               'BUILD_SEGMENTS_STARTED': 5,
                               'BUILD_SEGMENTS_DONE': 6,
                               'UPDATE_OLD_SEGMENTS_STARTED': 7,
                               'UPDATE_OLD_SEGMENTS_DONE': 8,
                               'UPDATE_CATALOG_STARTED': 9,
                               'UPDATE_CATALOG_DONE': 10,
                               'SETUP_EXPANSION_SCHEMA_STARTED': 11,
                               'SETUP_EXPANSION_SCHEMA_DONE': 12,
                               'PREPARE_EXPANSION_SCHEMA_STARTED': 13,
                               'PREPARE_EXPANSION_SCHEMA_DONE': 14,
                               'EXPANSION_PREPARE_DONE': 15
                               }
463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537
        self._status = []
        self._status_info = []
        self._master_data_directory = master_data_directory
        self._master_mirror = master_mirror
        self._status_filename = master_data_directory + '/gpexpand.status'
        self._status_standby_filename = master_data_directory + '/gpexpand.standby.status'
        self._fp = None
        self._fp_standby = None
        self._temp_dir = None
        self._input_filename = None
        self._original_primary_count = None
        self._gp_segment_configuration_backup = None

        if os.path.exists(self._status_filename):
            self._read_status_file()

    def _read_status_file(self):
        """Reads in an existing gpexpand status file"""
        self.logger.debug("Trying to read in a pre-existing gpexpand status file")
        try:
            self._fp = open(self._status_filename, 'a+')
            self._fp.seek(0)

            for line in self._fp:
                (status, status_info) = line.rstrip().split(':')
                if status == 'BUILD_SEGMENT_TEMPLATE_STARTED':
                    self._temp_dir = status_info
                elif status == 'BUILD_SEGMENTS_STARTED':
                    self._seg_tarfile = status_info
                elif status == 'BUILD_SEGMENTS_DONE':
                    self._number_new_segments = status_info
                elif status == 'EXPANSION_PREPARE_STARTED':
                    self._input_filename = status_info
                elif status == 'UPDATE_OLD_SEGMENTS_STARTED':
                    self._original_primary_count = int(status_info)
                elif status == 'UPDATE_CATALOG_STARTED':
                    self._gp_segment_configuration_backup = status_info

                self._status.append(status)
                self._status_info.append(status_info)
        except IOError:
            raise

        if not self._status_values.has_key(self._status[-1]):
            raise InvalidStatusError('Invalid status file.  Unknown status %s' % self._status)

    def create_status_file(self):
        """Creates a new gpexpand status file"""
        try:
            self._fp = open(self._status_filename, 'w')
            if self._master_mirror:
                self._fp_standby = open(self._status_standby_filename, 'w')
                self._fp_standby.write('UNINITIALIZED:None\n')
                self._fp_standby.flush()
            self._fp.write('UNINITIALIZED:None\n')
            self._fp.flush()
            self._status.append('UNINITIALIZED')
            self._status_info.append('None')
        except IOError:
            raise

        self._sync_status_file()

    def _sync_status_file(self):
        """Syncs the gpexpand status file with the master mirror"""
        if self._master_mirror:
            cpCmd = RemoteCopy('gpexpand copying status file to master mirror',
                               self._status_standby_filename, self._master_mirror.getSegmentHostName(),
                               self._status_filename)
            cpCmd.run(validateAfter=True)

    def set_status(self, status, status_info=None):
        """Sets the current status.  gpexpand status must be set in
           proper order.  Any out of order status result in an
           InvalidStatusError exception"""
538
        self.logger.debug("Transitioning from %s to %s" % (self._status[-1], status))
539 540 541 542 543 544 545

        if not self._fp:
            raise InvalidStatusError('The status file is invalid and cannot be written to')
        if not self._status_values.has_key(status):
            raise InvalidStatusError('%s is an invalid gpexpand status' % status)
        # Only allow state transitions forward or backward 1
        if self._status and \
546 547
                        self._status_values[status] != self._status_values[self._status[-1]] + 1:
            raise InvalidStatusError('Invalid status transition from %s to %s' % (self._status[-1], status))
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581
        if self._master_mirror:
            self._fp_standby.write('%s:%s\n' % (status, status_info))
            self._fp_standby.flush()
            self._sync_status_file()
        self._fp.write('%s:%s\n' % (status, status_info))
        self._fp.flush()
        self._status.append(status)
        self._status_info.append(status_info)

    def get_current_status(self):
        """Gets the current status that has been written to the gpexpand
           status file"""
        if (len(self._status) > 0 and len(self._status_info) > 0):
            return (self._status[-1], self._status_info[-1])
        else:
            return (None, None)

    def get_status_history(self):
        """Gets the full status history"""
        return zip(self._status, self._status_info)

    def remove_status_file(self):
        """Closes and removes the gpexand status file"""
        if self._fp:
            self._fp.close()
            self._fp = None
        if self._fp_standby:
            self._fp_standby.close()
            self._fp_standby = None
        if os.path.exists(self._status_filename):
            os.unlink(self._status_filename)
        if os.path.exists(self._status_standby_filename):
            os.unlink(self._status_standby_filename)
        if self._master_mirror:
582 583 584
            RemoveFile.remote('gpexpand master mirror status file cleanup',
                              self._master_mirror.getSegmentHostName(),
                              self._status_filename)
585 586 587 588

    def remove_segment_configuration_backup_file(self):
        """ Remove the segment configuration backup file """
        self.logger.debug("Removing segment configuration backup file")
589 590 591
        if self._gp_segment_configuration_backup != None and os.path.exists(
                self._gp_segment_configuration_backup) == True:
            os.unlink(self._gp_segment_configuration_backup)
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625

    def get_temp_dir(self):
        """Gets temp dir that was used during template creation"""
        return self._temp_dir

    def get_input_filename(self):
        """Gets input file that was used by expansion setup"""
        return self._input_filename

    def get_seg_tarfile(self):
        """Gets tar file that was used during template creation"""
        return self._seg_tarfile

    def get_number_new_segments(self):
        """ Gets the number of new segments added """
        return self._number_new_segments

    def get_original_primary_count(self):
        """Returns the original number of primary segments"""
        return self._original_primary_count

    def get_gp_segment_configuration_backup(self):
        """Gets the filename of the gp_segment_configuration backup file
        created during expansion setup"""
        return self._gp_segment_configuration_backup

    def set_gp_segment_configuration_backup(self, filename):
        """Sets the filename of the gp_segment_configuration backup file"""
        self._gp_segment_configuration_backup = filename

    def is_standby(self):
        """Returns True if running on standby"""
        return os.path.exists(self._master_data_directory + self._status_standby_filename)

626 627

# -------------------------------------------------------------------------
628 629

class ExpansionError(Exception): pass
630 631


632 633
class SegmentTemplateError(Exception): pass

634 635

# -------------------------------------------------------------------------
636 637 638 639 640 641
class SegmentTemplate:
    """Class for creating, distributing and deploying new segments to an
    existing GPDB array"""

    def __init__(self, logger, statusLogger, pool,
                 gparray, masterDataDirectory,
642
                 dburl, conn, noVacuumCatalog, tempDir, batch_size,
643
                 segTarDir='.', schemaTarFile='gpexpand_schema.tar'):
644 645 646 647 648 649
        self.logger = logger
        self.statusLogger = statusLogger
        self.pool = pool
        self.gparray = gparray
        self.noVacuumCatalog = noVacuumCatalog
        self.tempDir = tempDir
650
        self.batch_size = batch_size
651 652 653 654 655 656 657 658 659
        self.dburl = dburl
        self.conn = conn
        self.masterDataDirectory = masterDataDirectory
        self.schema_tar_file = schemaTarFile
        self.maxDbId = self.gparray.get_max_dbid()
        self.segTarDir = segTarDir
        self.segTarFile = os.path.join(segTarDir, self.schema_tar_file)

        hosts = []
660
        for seg in self.gparray.getExpansionSegDbList():
661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
            hosts.append(seg.getSegmentHostName())
        self.hosts = SegmentTemplate.consolidate_hosts(pool, hosts)
        logger.debug('Hosts: %s' % self.hosts)

    @staticmethod
    def consolidate_hosts(pool, hosts):
        tmpHosts = {}
        consolidatedHosts = []

        for host in hosts:
            tmpHosts[host] = 0

        for host in tmpHosts.keys():
            hostnameCmd = Hostname('gpexpand associating hostnames with segments', ctxt=REMOTE, remoteHost=host)
            pool.addCommand(hostnameCmd)

        pool.join()

        finished_cmds = pool.getCompletedItems()

        for cmd in finished_cmds:
            if not cmd.was_successful():
                raise SegmentTemplateError(cmd.get_results())
            if cmd.get_hostname() not in consolidatedHosts:
                logger.debug('Adding %s to host list' % cmd.get_hostname())
                consolidatedHosts.append(cmd.get_hostname())

        return consolidatedHosts

    def build_segment_template(self):
        """Builds segment template tar file"""
        self.statusLogger.set_status('BUILD_SEGMENT_TEMPLATE_STARTED', self.tempDir)
        self._create_template()
        self._fixup_template()
        self._tar_template()
        self.statusLogger.set_status('BUILD_SEGMENT_TEMPLATE_DONE')

    def build_new_segments(self):
        """Deploys the template tar file and configures the new segments"""
        self.statusLogger.set_status('BUILD_SEGMENTS_STARTED', self.segTarFile)
        self._distribute_template()
        self._configure_new_segments()
        numNewSegments = len(self.gparray.getExpansionSegDbList())
        self.statusLogger.set_status('BUILD_SEGMENTS_DONE', numNewSegments)

    def _create_template(self):
        """Creates the schema template that is used by new segments"""
        self.logger.info('Creating segment template')

        if not self.noVacuumCatalog:
            self.logger.info('VACUUM FULL on the catalog tables')
712
            catalog.vacuum_catalog(self.dburl, self.conn, full=True, utility=True)
713

714
        MakeDirectory.local('gpexpand create temp dir', self.tempDir)
715 716 717 718 719

        self._select_src_segment()

        self.oldSegCount = self.gparray.get_segment_count()

720 721
        self.conn.close()

722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
        GpStop.local('gpexpand _create_template stop gpdb', masterOnly=True, fast=True)

        # Verify that we actually stopped
        self.logger.debug('Validating array state')
        pgControlDataCmd = PgControlData('Validate stopped', self.masterDataDirectory)
        state = None
        try:
            pgControlDataCmd.run(validateAfter=True)
        except Exception, e:
            raise SegmentTemplateError(e)
        state = pgControlDataCmd.get_value('Database cluster state')
        if state != 'shut down':
            raise SegmentTemplateError('Failed to stop the array.  pg_controldata return state of %s' % state)

        try:
            masterSeg = self.gparray.master
738
            masterSeg.createTemplate(dstDir=self.tempDir)
739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775
        except Exception, msg:
            raise SegmentTemplateError(msg)

    def _select_src_segment(self):
        """Gets a segment to use as a source for pg_hba.conf
        and postgresql.conf files"""
        seg = self.gparray.segments[0]
        if seg.primaryDB.valid:
            self.srcSegHostname = seg.primaryDB.getSegmentHostName()
            self.srcSegDataDir = seg.primaryDB.getSegmentDataDirectory()
        elif seg.mirrorDBs[0] is not None and seg.mirrorDBs[0].valid:
            self.srcSegHostname = seg.mirrorDBs[0].getSegmentHostName()
            self.srcSegDataDir = seg.mirrorDBs[0].getSegmentDataDirectory()
        else:
            raise SegmentTemplateError("no valid segdb for content=0 to use as a template")

    def _distribute_template(self):
        """Distributes the template tar file to the new segments and expands it"""
        self.logger.info('Distributing template tar file to new hosts')

        self._distribute_tarfile()

    def _distribute_tarfile(self):
        """Distributes template tar file to hosts"""
        for host in self.hosts:
            logger.debug('Copying tar file to %s' % host)
            cpCmd = RemoteCopy('gpexpand distribute tar file to new hosts', self.schema_tar_file, host, self.segTarDir)
            self.pool.addCommand(cpCmd)

        self.pool.join()
        self.pool.check_results()

    def _configure_new_segments(self):
        """Configures new segments.  This includes modifying the postgresql.conf file
        and setting up the gp_id table"""

        self.logger.info('Configuring new segments (primary)')
776 777
        new_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(self.gparray.getExpansionSegDbList(),
                                                                             primaryMirror='primary')
778 779
        for host in iter(new_segment_info):
            segCfgCmd = ConfigureNewSegment('gpexpand configure new segments', new_segment_info[host],
780
                                            tarFile=self.segTarFile, newSegments=True,
781
                                            verbose=gplog.logging_is_verbose(), batchSize=self.batch_size,
782
                                            ctxt=REMOTE, remoteHost=host)
783 784 785 786
            self.pool.addCommand(segCfgCmd)

        self.pool.join()
        self.pool.check_results()
M
Marbin Tan 已提交
787

788
        self.logger.info('Configuring new segments (mirror)')
789 790
        new_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(self.gparray.getExpansionSegDbList(),
                                                                             primaryMirror='mirror')
791 792
        for host in iter(new_segment_info):
            segCfgCmd = ConfigureNewSegment('gpexpand configure new segments', new_segment_info[host],
793
                                            tarFile=self.schema_tar_file, newSegments=True,
794
                                            verbose=gplog.logging_is_verbose(), batchSize=self.batch_size,
795
                                            ctxt=REMOTE, remoteHost=host, validationOnly=True)
796 797 798 799 800
            self.pool.addCommand(segCfgCmd)

        self.pool.join()
        self.pool.check_results()

M
Marbin Tan 已提交
801
    def _get_transaction_filespace_dir(self, transaction_flat_file):
802 803
        filespace_dir = None

804 805
        with open(transaction_flat_file) as tfile:
            for line in tfile:
806 807 808 809 810 811
                fs_info = line.split()
                if len(fs_info) != 2:
                    continue
                filespace_dir = fs_info[1]

        return filespace_dir
M
Marbin Tan 已提交
812

813 814 815 816 817 818 819
    def _fixup_template(self):
        """Copies postgresql.conf and pg_hba.conf files from a valid segment on the system.
        Then modifies the template copy of pg_hba.conf"""

        self.logger.info('Copying postgresql.conf from existing segment into template')

        localHostname = self.gparray.master.getSegmentHostName()
820 821 822 823
        cpCmd = RemoteCopy(
            'gpexpand copying postgresql.conf to %s:%s/postgresql.conf' % (self.srcSegHostname, self.srcSegDataDir),
            self.srcSegDataDir + '/postgresql.conf', localHostname,
            self.tempDir, ctxt=REMOTE, remoteHost=self.srcSegHostname)
824 825 826 827 828 829 830 831
        cpCmd.run(validateAfter=True)

        self.logger.info('Copying pg_hba.conf from existing segment into template')
        cpCmd = RemoteCopy('gpexpand copy pg_hba.conf to %s:%s/pg_hba.conf' % (self.srcSegHostname, self.srcSegDataDir),
                           self.srcSegDataDir + '/pg_hba.conf', localHostname,
                           self.tempDir, ctxt=REMOTE, remoteHost=self.srcSegHostname)
        cpCmd.run(validateAfter=True)

832
        # Copy the transaction directories into template
M
Marbin Tan 已提交
833
        pg_system_filespace_entries = GetFilespaceEntriesDict(GetFilespaceEntries(self.gparray,
834 835 836 837
                                                                                  PG_SYSTEM_FILESPACE).run()).run()
        transaction_flat_file = os.path.join(pg_system_filespace_entries[1][2], GP_TRANSACTION_FILES_FILESPACE)
        filespace_dir = None
        if os.path.exists(transaction_flat_file):
M
Marbin Tan 已提交
838
            filespace_dir = self._get_transaction_filespace_dir(transaction_flat_file)
839 840 841 842 843 844 845 846 847 848 849 850 851
            logger.debug('Filespace location = %s' % filespace_dir)

            if filespace_dir:
                transaction_files_dir = ['pg_xlog', 'pg_multixact', 'pg_subtrans', 'pg_clog',
                                         'pg_distributedlog', 'pg_distributedxidmap']
                for directory in transaction_files_dir:
                    dst_dir = os.path.join(self.tempDir, directory)
                    src_dir = os.path.join(filespace_dir, directory)

                    mkCmd = MakeDirectory('gpexpand creating transaction directories in template', dst_dir)
                    mkCmd.run(validateAfter=True)
                    cpCmd = LocalDirCopy('gpexpand copying dir %s' % src_dir, src_dir, dst_dir)
                    cpCmd.run(validateAfter=True)
M
Marbin Tan 已提交
852

853
        # Don't need log files and gpperfmon files in template.
854 855
        rmCmd = RemoveDirectory('gpexpand remove gppermfon data from template',
                                self.tempDir + '/gpperfmon/data')
856
        rmCmd.run(validateAfter=True)
857 858
        rmCmd = RemoveDirectoryContents('gpexpand remove logs from template',
                                        self.tempDir + '/pg_log')
859 860
        rmCmd.run(validateAfter=True)

861
        # other files not needed
862
        rmCmd = RemoveFile('gpexpand remove postmaster.opt from template',
863 864
                            self.tempDir + '/postmaster.opts')
        rmCmd.run(validateAfter=True)
865
        rmCmd = RemoveFile('gpexpand remove postmaster.pid from template',
866 867
                            self.tempDir + '/postmaster.pid')
        rmCmd.run(validateAfter=True)
868
        rmCmd = RemoveGlob('gpexpand remove gpexpand files from template',
869 870 871
                            self.tempDir + '/gpexpand.*')
        rmCmd.run(validateAfter=True)

872
        # We dont need the flat files
873
        rmCmd = RemoveFile('gpexpand remove transaction flat file from template',
874 875
                            self.tempDir + '/' + GP_TRANSACTION_FILES_FILESPACE)
        rmCmd.run(validateAfter=True)
876
        rmCmd = RemoveFile('gpexpand remove temporary flat file from template',
877 878 879 880 881 882 883 884 885 886 887 888 889 890
                            self.tempDir + '/' + GP_TEMPORARY_FILES_FILESPACE)
        rmCmd.run(validateAfter=True)

        self.logger.info('Adding new segments into template pg_hba.conf')
        try:
            fp = open(self.tempDir + '/pg_hba.conf', 'a')
            try:
                new_host_set = set()
                for newSeg in self.gparray.getExpansionSegDbList() + self.gparray.getDbList():
                    host = newSeg.getSegmentHostName()
                    new_host_set.add(host)

                for new_host in new_host_set:
                    addrinfo = socket.getaddrinfo(new_host, None)
891
                    ipaddrlist = list(set([(ai[0], ai[4][0]) for ai in addrinfo]))
892 893
                    fp.write('# %s\n' % new_host)
                    for addr in ipaddrlist:
894 895
                        fp.write(
                            'host\tall\tall\t%s/%s\ttrust\n' % (addr[1], '32' if addr[0] == socket.AF_INET else '128'))
896 897 898

            finally:
                fp.close()
899
        except IOError:
900
            raise SegmentTemplateError('Failed to open %s/pg_hba.conf' % self.tempDir)
901
        except Exception:
902 903 904 905 906 907 908 909 910 911 912 913
            raise SegmentTemplateError('Failed to add new segments to template pg_hba.conf')

    def _tar_template(self):
        """Tars up the template files"""
        self.logger.info('Creating schema tar file')
        tarCmd = CreateTar('gpexpand tar segment template', self.tempDir, self.schema_tar_file)
        tarCmd.run(validateAfter=True)

    @staticmethod
    def cleanup_build_segment_template(tarFile, tempDir):
        """Reverts the work done by build_segment_template.  Deletes the temp
        directory and local tar file"""
914
        rmCmd = RemoveDirectory('gpexpand remove temp dir: %s' % tempDir, tempDir)
915
        rmCmd.run(validateAfter=True)
916
        rmCmd = RemoveFile('gpexpand remove segment template file', tarFile)
917 918 919 920 921 922 923 924 925 926 927 928 929 930
        rmCmd.run(validateAfter=True)

    @staticmethod
    def cleanup_build_new_segments(pool, tarFile, gparray, hosts=None, removeDataDirs=False):
        """Cleans up the work done by build_new_segments.  Deletes remote tar files and
        and removes remote data directories"""

        if not hosts:
            hosts = []
            for seg in gparray.getExpansionSegDbList():
                hosts.append(seg.getSegmentHostName())

        # Remove template tar file
        for host in hosts:
931 932
            rmCmd = RemoveFile('gpexpand remove segment template file on host: %s' % host,
                               tarFile, ctxt=REMOTE, remoteHost=host)
933 934 935 936 937 938 939 940
            pool.addCommand(rmCmd)

        if removeDataDirs:
            for seg in gparray.getExpansionSegDbList():
                hostname = seg.getSegmentHostName()
                filespaces = seg.getSegmentFilespaces()
                for oid in filespaces:
                    datadir = filespaces[oid]
941 942
                    rmCmd = RemoveDirectory('gpexpand remove new segment data directory: %s:%s' % (hostname, datadir),
                                            datadir, ctxt=REMOTE, remoteHost=hostname)
943 944 945 946 947 948 949 950 951 952 953 954
                    pool.addCommand(rmCmd)
        pool.join()
        pool.check_results()

    def cleanup(self):
        """Cleans up temporary files from the local system and new segment hosts"""

        self.logger.info('Cleaning up temporary template files')
        SegmentTemplate.cleanup_build_segment_template(self.schema_tar_file, self.tempDir)
        SegmentTemplate.cleanup_build_new_segments(self.pool, self.segTarFile, self.gparray, self.hosts)


955 956
# ------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------
957
class NewSegmentInput:
958
    def __init__(self, hostname, address, port, datadir, dbid, contentId, role, replicationPort=None, fileSpaces=None):
959 960 961 962 963 964 965 966 967
        self.hostname = hostname
        self.address = address
        self.port = port
        self.datadir = datadir
        self.dbid = dbid
        self.contentId = contentId
        self.role = role
        self.replicationPort = replicationPort
        self.fileSpaces = fileSpaces
M
Marbin Tan 已提交
968

969

970 971
# ------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------
972
class gpexpand:
973
    def __init__(self, logger, gparray, dburl, options, parallel=1):
974 975 976
        self.pastThePointOfNoReturn = False
        self.logger = logger
        self.dburl = dburl
977
        self.options = options
978
        self.numworkers = parallel
979 980
        self.gparray = gparray
        self.unique_index_tables = {}
981
        self.conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8', allowSystemTableMods='dml')
982
        self.old_segments = self.gparray.getSegDbList()
983 984 985 986 987
        if dburl.pgdb == 'template0' or dburl.pgdb == 'template1' or dburl.pgdb == 'postgres':
            raise ExpansionError("Invalid database '%s' specified.  Cannot use a template database.\n"
                                 "Please set the environment variable PGDATABASE to a different "
                                 "database or use the -D option to specify a database and re-run" % dburl.pgdb)

988 989 990 991
        datadir = self.gparray.master.getSegmentDataDirectory()
        self.statusLogger = GpExpandStatus(logger=logger,
                                           master_data_directory=datadir,
                                           master_mirror=self.gparray.standbyMaster)
992 993

        # Adjust batch size if it's too high given the number of segments
994 995 996 997
        seg_count = len(self.old_segments)
        if self.options.batch_size > seg_count:
            self.options.batch_size = seg_count
        self.pool = WorkerPool(numWorkers=self.options.batch_size)
998 999 1000

        self.tempDir = self.statusLogger.get_temp_dir()
        if not self.tempDir:
1001
            self.tempDir = createTempDirectoryName(self.options.master_data_directory, "gpexpand")
1002 1003 1004 1005 1006
        self.queue = None
        self.segTemplate = None
        pass

    @staticmethod
1007
    def prepare_gpdb_state(logger, dburl, options):
1008 1009 1010 1011 1012 1013 1014 1015 1016
        """ Gets GPDB in the appropriate state for an expansion.
        This state will depend on if this is a new expansion setup,
        a continuation of a previous expansion or a rollback """
        # Get the database in the expected state for the expansion/rollback
        status_file_exists = os.path.exists(options.master_data_directory + '/gpexpand.status')
        gpexpand_db_status = None

        if status_file_exists:
            # gpexpand status file exists so the last run of gpexpand didn't finish properly
1017
            gpexpand.get_gpdb_in_state(GPDB_UTILITY, options)
1018
        else:
1019
            gpexpand.get_gpdb_in_state(GPDB_STARTED, options)
1020 1021 1022

            logger.info('Querying gpexpand schema for current expansion state')
            try:
1023
                gpexpand_db_status = gpexpand.get_status_from_db(dburl, options)
1024 1025 1026 1027 1028 1029 1030
            except Exception, e:
                raise Exception('Error while trying to query the gpexpand schema: %s' % e)
            logger.debug('Expansion status returned is %s' % gpexpand_db_status)

            if (not gpexpand_db_status and options.filename) and not options.clean:
                # New expansion, need to be in master only
                logger.info('Readying Greenplum Database for a new expansion')
1031
                gpexpand.get_gpdb_in_state(GPDB_UTILITY, options)
1032 1033 1034 1035

        return gpexpand_db_status

    @staticmethod
1036
    def get_gpdb_in_state(state, options):
1037 1038 1039 1040 1041 1042 1043
        runningStatus = chk_local_db_running(options.master_data_directory, options.pgport)
        gpdb_running = runningStatus[0] and runningStatus[1] and runningStatus[2] and runningStatus[3]
        if gpdb_running:
            gpdb_mode = get_local_db_mode(options.master_data_directory)

        if state == GPDB_STARTED:
            if gpdb_running:
1044 1045 1046 1047
                if gpdb_mode != 'UTILITY':
                    return
                else:
                    GpStop.local('Stop GPDB', masterOnly=True, fast=True)
1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064
            GpStart.local('Start GPDB')
        elif state == GPDB_STOPPED:
            if gpdb_running:
                if gpdb_mode != 'UTILITY':
                    GpStop.local('Stop GPDB', fast=True)
                else:
                    GpStop.local('Stop GPDB', masterOnly=True, fast=True)
        elif state == GPDB_UTILITY:
            if gpdb_running:
                if gpdb_mode == 'UTILITY':
                    return
                GpStop.local('Stop GPDB', fast=True)
            GpStart.local('Start GPDB in master only mode', masterOnly=True)
        else:
            raise Exception('Unkown gpdb state')

    @staticmethod
1065
    def get_status_from_db(dburl, options):
1066 1067 1068 1069 1070 1071 1072
        """Gets gpexpand status from the gpexpand schema"""
        status_conn = None
        gpexpand_db_status = None
        if get_local_db_mode(options.master_data_directory) == 'NORMAL':
            try:
                status_conn = dbconn.connect(dburl, encoding='UTF8')
                # Get the last status entry
1073
                cursor = dbconn.execSQL(status_conn, 'SELECT status FROM gpexpand.status ORDER BY updated DESC LIMIT 1')
1074 1075 1076
                if cursor.rowcount == 1:
                    gpexpand_db_status = cursor.fetchone()[0]

1077
            except Exception:
1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088
                # expansion schema doesn't exists or there was a connection failure.
                pass
            finally:
                if status_conn: status_conn.close()

        # make sure gpexpand schema doesn't exist since it wasn't in DB provided
        if not gpexpand_db_status:
            """
            MPP-14145 - If there's no discernable status, the schema must not exist.

            The checks in get_status_from_db claim to look for existence of the 'gpexpand' schema, but more accurately they're
M
Marbin Tan 已提交
1089 1090
            checking for non-emptiness of the gpexpand.status table. If the table were empty, but the schema did exist, gpexpand would presume
            a new expansion was taking place and it would try to CREATE SCHEMA later, which would fail. So, here, if this is the case, we error out.
1091 1092 1093 1094

            Note: -c/--clean will not necessarily work either, as it too has assumptions about the non-emptiness of the gpexpand schema.
            """
            with dbconn.connect(dburl, encoding='UTF8', utility=True) as conn:
1095
                count = dbconn.execSQLForSingleton(conn,
L
Larry Hamel 已提交
1096
                                                   "SELECT count(n.nspname) FROM pg_catalog.pg_namespace n WHERE n.nspname = 'gpexpand'")
1097
                if count > 0:
1098 1099
                    raise ExpansionError(
                        "Existing expansion state could not be determined, but a gpexpand schema already exists. Cannot proceed.")
1100 1101 1102 1103 1104 1105 1106

            # now determine whether gpexpand schema merely resides in another DB
            status_conn = dbconn.connect(dburl, encoding='UTF8')
            db_list = catalog.getDatabaseList(status_conn)
            status_conn.close()

            for db in db_list:
1107
                dbname = db[0]
1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120
                if dbname in ['template0', 'template1', 'postgres', dburl.pgdb]:
                    continue
                logger.debug('Looking for gpexpand schema in %s' % dbname.decode('utf-8'))
                test_url = copy.deepcopy(dburl)
                test_url.pgdb = dbname
                c = dbconn.connect(test_url, encoding='UTF8')
                try:
                    cursor = dbconn.execSQL(c, 'SELECT status FROM gpexpand.status ORDER BY updated DESC LIMIT 1')
                except:
                    # Not in here
                    pass
                else:
                    raise ExpansionError("""gpexpand schema exists in database %s, not in %s.
1121
Set PGDATABASE or use the -D option to specify the correct database to use.""" % (
S
Shoaib Lari 已提交
1122
                        dbname.decode('utf-8'), options.database))
1123 1124 1125 1126 1127 1128 1129 1130 1131 1132
                finally:
                    if c:
                        c.close()

        return gpexpand_db_status

    def validate_max_connections(self):
        try:
            conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
            max_connections = int(catalog.getSessionGUC(conn, 'max_connections'))
1133
        except DatabaseError, ex:
1134
            if self.options.verbose:
1135 1136 1137 1138 1139
                logger.exception(ex)
            logger.error('Failed to check max_connections GUC')
            if conn: conn.close()
            raise ex

1140 1141
        if max_connections < self.options.parallel * 2 + 1:
            self.logger.error('max_connections is too small to expand %d tables at' % self.options.parallel)
1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170
            self.logger.error('a time.  This will lead to connection errors.  Either')
            self.logger.error('reduce the value for -n passed to gpexpand or raise')
            self.logger.error('max_connections in postgresql.conf')
            return False

        return True

    def validate_unalterable_tables(self):
        conn = None
        unalterable_tables = []

        try:
            conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
            databases = catalog.getDatabaseList(conn)
            conn.close()

            tempurl = copy.deepcopy(self.dburl)
            for db in databases:
                if db[0] == 'template0':
                    continue
                self.logger.info('Checking database %s for unalterable tables...' % db[0].decode('utf-8'))
                tempurl.pgdb = db[0]
                conn = dbconn.connect(tempurl, utility=True, encoding='UTF8')
                cursor = dbconn.execSQL(conn, unalterable_table_sql)
                for row in cursor:
                    unalterable_tables.append(row)
                cursor.close()
                conn.close()

1171
        except DatabaseError, ex:
1172
            if self.options.verbose:
1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214
                logger.exception(ex)
            logger.error('Failed to check for unalterable tables.')
            if conn: conn.close()
            raise ex

        if len(unalterable_tables) > 0:
            self.logger.error('The following tables cannot be altered because they contain')
            self.logger.error('dropped columns of user defined types:')
            for t in unalterable_tables:
                self.logger.error('\t%s.%s' % (t[0].decode('utf-8'), t[1].decode('utf-8')))
            self.logger.error('Please consult the documentation for instructions on how to')
            self.logger.error('correct this issue, then run gpexpand again')
            return False

        return True

    def check_unique_indexes(self):
        """ Checks if there are tables with unique indexes.
        Returns true if unique indexes exist"""

        conn = None
        has_unique_indexes = False

        try:
            conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
            databases = catalog.getDatabaseList(conn)
            conn.close()

            tempurl = copy.deepcopy(self.dburl)
            for db in databases:
                if db[0] == 'template0':
                    continue
                self.logger.info('Checking database %s for tables with unique indexes...' % db[0].decode('utf-8'))
                tempurl.pgdb = db[0]
                conn = dbconn.connect(tempurl, utility=True, encoding='UTF8')
                cursor = dbconn.execSQL(conn, has_unique_index_sql)
                for row in cursor:
                    has_unique_indexes = True
                    self.unique_index_tables[row[0]] = True
                cursor.close()
                conn.close()

1215
        except DatabaseError, ex:
1216
            if self.options.verbose:
1217 1218 1219 1220 1221 1222 1223
                logger.exception(ex)
            logger.error('Failed to check for unique indexes.')
            if conn: conn.close()
            raise ex

        return has_unique_indexes

1224
    def rollback(self, dburl):
1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263
        """Rolls back and expansion setup that didn't successfully complete"""
        cleanSchema = False
        status_history = self.statusLogger.get_status_history()
        if not status_history:
            raise ExpansionError('No status history to rollback.')

        if (status_history[-1])[0] == 'EXPANSION_PREPARE_DONE':
            raise ExpansionError('Expansion preparation complete.  Nothing to rollback')

        for status in reversed(status_history):
            if status[0] == 'BUILD_SEGMENT_TEMPLATE_STARTED':
                if self.statusLogger.is_standby():
                    self.logger.info('Running on standby master, skipping segment template rollback')
                    continue
                self.logger.info('Rolling back segment template build')
                SegmentTemplate.cleanup_build_segment_template('gpexpand_schema.tar', status[1])

            elif status[0] == 'BUILD_SEGMENTS_STARTED':
                self.logger.info('Rolling back building of new segments')
                newSegList = self.read_input_files(self.statusLogger.get_input_filename())
                self.addNewSegments(newSegList)
                SegmentTemplate.cleanup_build_new_segments(self.pool,
                                                           self.statusLogger.get_seg_tarfile(),
                                                           self.gparray, removeDataDirs=True)

            elif status[0] == 'UPDATE_OLD_SEGMENTS_STARTED':
                self.logger.info('Rolling back update of original segments')
                self.restore_original_segments()

            elif status[0] == 'UPDATE_CATALOG_STARTED':
                self.logger.info('Rolling back master update')
                self.restore_master()
                self.gparray = GpArray.initFromCatalog(dburl, utility=True)

            elif status[0] == 'SETUP_EXPANSION_SCHEMA_STARTED':
                cleanSchema = True
            else:
                self.logger.debug('Skipping %s' % status[0])

1264 1265
        self.conn.close()

1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276
        GpStop.local('gpexpand rollback', masterOnly=True, fast=True)

        if cleanSchema:
            GpStart.local('gpexpand rollback start database restricted', restricted=True)
            self.logger.info('Dropping expansion expansion schema')
            schema_conn = dbconn.connect(self.dburl, encoding='UTF8', allowSystemTableMods='dml')
            try:
                dbconn.execSQL(schema_conn, drop_schema_sql)
                schema_conn.commit()
                schema_conn.close()
            except:
1277
                pass  # schema wasn't created yet.
1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294
            GpStop('gpexpand rollback stop database', fast=True)

        self.statusLogger.remove_status_file()
        self.statusLogger.remove_segment_configuration_backup_file()

    def get_state(self):
        """Returns expansion state from status logger"""
        return self.statusLogger.get_current_status()[0]

    def generate_inputfile(self):
        """Writes a gpexpand input file based on expansion segments
        added to gparray by the gpexpand interview"""
        outputfile = 'gpexpand_inputfile_' + strftime("%Y%m%d_%H%M%S")
        outfile = open(outputfile, 'w')

        logger.info("Generating input file...")

1295
        for db in self.gparray.getExpansionSegDbList():
1296 1297 1298 1299 1300 1301 1302 1303
            tempStr = "%s:%s:%d:%s:%d:%d:%s" % (canonicalize_address(db.getSegmentHostName())
                                                , canonicalize_address(db.getSegmentAddress())
                                                , db.getSegmentPort()
                                                , db.getSegmentDataDirectory()
                                                , db.getSegmentDbId()
                                                , db.getSegmentContentId()
                                                , db.getSegmentPreferredRole()
                                                )
1304
            if db.getSegmentReplicationPort() != None:
1305
                tempStr = tempStr + ':' + str(db.getSegmentReplicationPort())
1306
            outfile.write(tempStr + "\n")
M
Marbin Tan 已提交
1307

1308 1309 1310 1311
        outfile.close()

        return outputfile

1312
    # ------------------------------------------------------------------------
1313 1314 1315
    def generate_filespaces_inputfile(self, outFileNamePrefix):
        """
        Writes a gpexpand filespace input file based on expansion segments
M
Marbin Tan 已提交
1316
        added to gparray by the gpexpand interview. If the new segments
1317 1318 1319
        contain filespaces, then return the name of the file, else return
        None.
        """
1320
        filespaces = self.gparray.getFilespaces(includeSystemFilespace=False)
1321
        if filespaces != None and len(filespaces) > 0:
1322
            outputfile = outFileNamePrefix + FILE_SPACES_INPUT_FILENAME_SUFFIX
1323
        else:
1324
            outputfile = None
M
Marbin Tan 已提交
1325

1326 1327
        if outputfile != None:
            outfileFD = open(outputfile, 'w')
M
Marbin Tan 已提交
1328

1329
            logger.info("Generating filespaces input file...")
M
Marbin Tan 已提交
1330

1331 1332 1333 1334
            firstLine = FILE_SPACES_INPUT_FILE_LINE_1_PREFIX + "="
            firstFs = True
            for fs in filespaces:
                if firstFs == True:
1335 1336
                    firstLine = firstLine + fs.getName()
                    firstFs = False
1337
                else:
1338
                    firstLine = firstLine + ":" + fs.getName()
1339
            outfileFD.write(firstLine + '\n')
M
Marbin Tan 已提交
1340

1341
            for db in self.gparray.getExpansionSegDbList():
1342 1343 1344 1345 1346 1347 1348 1349
                dbid = db.getSegmentDbId()
                outLine = str(dbid)
                segmentFilespaces = db.getSegmentFilespaces()
                for fs in filespaces:
                    oid = fs.getOid()
                    path = segmentFilespaces[oid]
                    outLine = outLine + ":" + path
                outfileFD.write(outLine + '\n')
M
Marbin Tan 已提交
1350

1351
            outfileFD.close()
M
Marbin Tan 已提交
1352

1353
        return outputfile
M
Marbin Tan 已提交
1354

1355 1356
    def addNewSegments(self, inputFileEntryList):
        for seg in inputFileEntryList:
1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367
            self.gparray.addExpansionSeg(content=int(seg.contentId)
                                         , preferred_role=seg.role
                                         , dbid=int(seg.dbid)
                                         , role=seg.role
                                         , hostname=seg.hostname.strip()
                                         , address=seg.address.strip()
                                         , port=int(seg.port)
                                         , datadir=os.path.abspath(seg.datadir.strip())
                                         , replication_port=seg.replicationPort
                                         , fileSpaces=seg.fileSpaces
                                         )
1368 1369 1370 1371 1372
        try:
            self.gparray.validateExpansionSegs()
        except Exception, e:
            raise ExpansionError('Invalid input file: %s' % e)

1373
    def read_input_files(self, inputFilename=None):
1374 1375
        """Reads and validates line format of the input file passed
        in on the command line via the -i arg"""
M
Marbin Tan 已提交
1376

1377
        retValue = []
M
Marbin Tan 已提交
1378

1379
        if not self.options.filename and not inputFilename:
1380 1381
            raise ExpansionError('Missing input file')

1382 1383
        if self.options.filename:
            inputFilename = self.options.filename
1384 1385 1386 1387 1388
        fsInputFilename = inputFilename + FILE_SPACES_INPUT_FILENAME_SUFFIX
        fsOidList = []
        fsDictionary = {}
        f = None
        try:
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420
            existsCmd = FileDirExists(name="gpexpand see if .fs file exists", directory=fsInputFilename)
            existsCmd.run(validateAfter=True)
            exists = existsCmd.filedir_exists()
            if exists == False and len(self.gparray.getFilespaces(includeSystemFilespace=False)) != 0:
                raise ExpansionError("Expecting filespaces input file: " + fsInputFilename)
            if exists == True:
                f = open(fsInputFilename, 'r')
                for lineNumber, l in line_reader(f):
                    if lineNumber == 1:
                        fsNameString = l.strip().split("=")
                        fsNameList = fsNameString[1].strip().split(":")
                        for name in fsNameList:
                            oid = self.gparray.getFileSpaceOid(name)
                            if oid == None:
                                raise ExpansionError("Unknown filespace name: " + str(name))
                            fsOidList.append(oid)
                        # Make sure all the filespace names are specified.
                        if len(fsNameList) != len(self.gparray.getFilespaces(includeSystemFilespace=False)):
                            missingFsNames = []
                            filespaces = self.gparray.getFilespaces()
                            for fs in filespaces:
                                if fs.getName() not in fsNameList:
                                    missingFsNames.append(fs.getName())
                            raise ExpansionError("Missing filespaces: " + str(missingFsNames))

                    else:
                        fsLine = l.strip().split(":")
                        try:
                            fsDictionary[fsLine[0]] = fsLine[1:]
                        except Exception, e:
                            raise ExpansionError("Problem with inputfile %s, line number %s, exceptin %s." % \
                                                 (fsInputFilename, str(lineNumber), str(e)))
1421 1422

        except IOError, ioe:
1423
            raise ExpansionError('Problem with filespace input file: %s. Exception: %s' % (fsInputFilename, str(ioe)))
1424
        finally:
1425 1426
            if f != None:
                f.close()
1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437

        try:
            f = open(inputFilename, 'r')
            try:
                for line, l in line_reader(f):

                    hostname, address, port, datadir, dbid, contentId, role, replicationPort \
                        = parse_gpexpand_segment_line(inputFilename, line, l)

                    filespaces = {}
                    if len(fsDictionary) > 0:
1438
                        fileSpacesPathList = fsDictionary[dbid]
1439
                    else:
1440
                        fileSpacesPathList = []
1441 1442 1443
                    index = 0
                    for oid in fsOidList:
                        filespaces[oid] = fileSpacesPathList[index]
M
Marbin Tan 已提交
1444
                        index = index + 1
1445 1446 1447

                    # Check that input values look reasonable.
                    if hostname == None or len(hostname) == 0:
1448
                        raise ExpansionError("Invalid host name on line " + str(line))
1449
                    if address == None or len(address) == 0:
1450
                        raise ExpansionError("Invaid address on line " + str(line))
M
Marbin Tan 已提交
1451
                    if port == None or str(port).isdigit() == False or int(port) < 0:
1452
                        raise ExpansionError("Invalid port number on line " + str(line))
1453
                    if datadir == None or len(datadir) == 0:
1454
                        raise ExpansionError("Invalid data directory on line " + str(line))
1455
                    if dbid == None or str(dbid).isdigit() == False or int(dbid) < 0:
1456
                        raise ExpansionError("Invalid dbid on line " + str(line))
1457
                    if contentId == None or str(contentId).isdigit() == False or int(contentId) < 0:
1458
                        raise ExpansionError("Invalid contentId on line " + str(line))
1459
                    if role == None or len(role) > 1 or (role != 'p' and role != 'm'):
1460
                        raise ExpansionError("Invalid role on line " + str(line))
1461
                    if replicationPort != None and int(replicationPort) < 0:
1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473
                        raise ExpansionError("Invalid replicationPort on line " + str(line))

                    retValue.append(NewSegmentInput(hostname=hostname
                                                    , port=port
                                                    , address=address
                                                    , datadir=datadir
                                                    , dbid=dbid
                                                    , contentId=contentId
                                                    , role=role
                                                    , replicationPort=replicationPort
                                                    , fileSpaces=filespaces
                                                    ))
1474 1475 1476 1477 1478 1479 1480 1481
            except ValueError:
                raise ExpansionError('Missing or invalid value on line %d.' % line)
            except Exception, e:
                raise ExpansionError('Invalid input file on line %d: %s' % (line, str(e)))
            finally:
                f.close()
            return retValue
        except IOError:
1482
            raise ExpansionError('Input file %s not found' % self.options.filename)
1483 1484 1485

    def add_segments(self):
        """Starts the process of adding the new segments to the array"""
1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496
        self.segTemplate = SegmentTemplate(logger=self.logger,
                                           statusLogger=self.statusLogger,
                                           pool=self.pool,
                                           gparray=self.gparray,
                                           masterDataDirectory=self.options.master_data_directory,
                                           dburl=self.dburl,
                                           conn=self.conn,
                                           noVacuumCatalog=self.options.novacuum,
                                           tempDir=self.tempDir,
                                           segTarDir=self.options.tardir,
                                           batch_size=self.options.batch_size)
1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607
        try:
            self.segTemplate.build_segment_template()
            self.segTemplate.build_new_segments()
        except SegmentTemplateError, msg:
            raise ExpansionError(msg)

    def update_original_segments(self):
        """Updates the pg_hba.conf file and updates the gp_id catalog table
        of existing hosts"""
        self.statusLogger.set_status('UPDATE_OLD_SEGMENTS_STARTED', self.gparray.get_primary_count())

        self.logger.info('Backing up pg_hba.conf file on original segments')

        # backup pg_hba.conf file on original segments
        for seg in self.old_segments:
            if seg.isSegmentQD() or seg.getSegmentStatus() != 'u':
                continue

            hostname = seg.getSegmentHostName()
            datadir = seg.getSegmentDataDirectory()

            srcFile = datadir + '/pg_hba.conf'
            dstFile = datadir + '/pg_hba.gpexpand.bak'
            cpCmd = RemoteCopy('gpexpand back up pg_hba.conf file on original segments',
                               srcFile, hostname, dstFile, ctxt=REMOTE, remoteHost=hostname)

            self.pool.addCommand(cpCmd)

        self.pool.join()

        try:
            self.pool.check_results()
        except ExecutionError, msg:
            raise ExpansionError('Failed to configure original segments: %s' % msg)

        # Copy the new pg_hba.conf file to original segments
        self.logger.info('Copying new pg_hba.conf file to original segments')
        for seg in self.old_segments:
            if seg.isSegmentQD() or seg.getSegmentStatus() != 'u':
                continue

            hostname = seg.getSegmentHostName()
            datadir = seg.getSegmentDataDirectory()

            cpCmd = RemoteCopy('gpexpand copy new pg_hba.conf file to original segments',
                               self.tempDir + '/pg_hba.conf', hostname, datadir)

            self.pool.addCommand(cpCmd)

        self.pool.join()

        try:
            self.pool.check_results()
        except ExecutionError, msg:
            raise ExpansionError('Failed to configure original segments: %s' % msg)

        # Update the gp_id of original segments
        self.newPrimaryCount = 0;
        for seg in self.gparray.getExpansionSegDbList():
            if seg.isSegmentPrimary(False):
                self.newPrimaryCount += 1

        self.newPrimaryCount += self.gparray.get_primary_count()

        self.logger.info('Configuring original segments')

        if self.segTemplate:
            self.segTemplate.cleanup()

        self.statusLogger.set_status('UPDATE_OLD_SEGMENTS_DONE')

    def restore_original_segments(self):
        """ Restores the original segments back to their state prior the expansion
        setup.  This is only possible if the expansion setup has not completed
        successfully."""
        self.logger.info('Restoring original segments')
        gp_segment_configuration_backup_file = self.statusLogger.get_gp_segment_configuration_backup();
        if gp_segment_configuration_backup_file:
            originalArray = GpArray.initFromFile(self.statusLogger.get_gp_segment_configuration_backup())
        else:
            originalArray = self.gparray

        # Restore pg_hba.conf file from backup
        self.logger.info('Restoring pg_hba.conf file on original segments')
        for seg in originalArray.getSegDbList():
            datadir = seg.getSegmentDataDirectory()
            hostname = seg.getSegmentHostName()

            srcFile = datadir + '/pg_hba.gpexpand.bak'
            dstFile = datadir + '/pg_hba.conf'
            cpCmd = RemoteCopy('gpexpand restore of pg_hba.conf file on original segments',
                               srcFile, hostname, dstFile, ctxt=REMOTE,
                               remoteHost=hostname)

            self.pool.addCommand(cpCmd)

        self.pool.join()

        try:
            self.pool.check_results()
        except:
            # Setup didn't get this far so no backup to restore.
            self.pool.empty_completed_items()

        # note: this code may not be needed -- it will NOT change gp_id
        #       However, the call to gpconfigurenewsegment may still be doing some needed work (stopping the segment)
        #       which could be unnecessary or could be moved here)
        self.logger.info('Restoring original segments catalog tables')
        orig_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(originalArray.getSegDbList())
        for host in iter(orig_segment_info):
            segCfgCmd = ConfigureNewSegment('gpexpand configure new segments', orig_segment_info[host],
1608
                                            verbose=gplog.logging_is_verbose(), batchSize=self.options.batch_size,
1609
                                            ctxt=REMOTE, remoteHost=host)
1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631
            self.pool.addCommand(segCfgCmd)

        self.pool.join()

        try:
            self.pool.check_results()
        except ExecutionError:
            raise ExpansionError('Failed to restore original segments')

    def _construct_filespace_parameter(self, seg, gpFSobjList):
        """ return a string containing a filespace parameter appropriate for use in sql functions. """
        filespaces = []
        segFilespaces = seg.getSegmentFilespaces()
        filespaceNames = []
        filespaceLocations = []
        for entry in gpFSobjList:
            name = entry.getName()
            oid = entry.getOid()
            location = segFilespaces[oid]
            filespaceNames.append(name)
            filespaceLocations.append(location)
        for i in range(len(filespaceNames)):
1632
            entry = [filespaceNames[i], filespaceLocations[i]]
1633 1634 1635 1636
            filespaces.append(entry)
        return str(filespaces)

    def update_catalog(self):
M
Marbin Tan 已提交
1637 1638 1639 1640
        """
        Starts the database, calls updateSystemConfig() to setup
        the catalog tables and get the actual dbid and content id
        for the new segments.
1641
        """
1642
        self.statusLogger.set_gp_segment_configuration_backup(
1643
            self.options.master_data_directory + '/' + SEGMENT_CONFIGURATION_BACKUP_FILE)
1644
        self.gparray.dumpToFile(self.statusLogger.get_gp_segment_configuration_backup())
1645 1646 1647
        self.statusLogger.set_status('UPDATE_CATALOG_STARTED', self.statusLogger.get_gp_segment_configuration_backup())

        self.logger.info('Starting Greenplum Database in restricted mode')
1648
        startCmd = GpStart('gpexpand update master start database restricted mode', restricted=True, verbose=True)
1649 1650 1651 1652 1653 1654 1655
        startCmd.run(validateAfter=True)

        # Put expansion segment primaries in change tracking
        for seg in self.gparray.getExpansionSegDbList():
            if seg.isSegmentMirror() == True:
                continue
            if self.gparray.get_mirroring_enabled() == True:
1656
                seg.setSegmentMode(MODE_CHANGELOGGING)
1657 1658 1659 1660 1661

        # Set expansion segment mirror state = down
        for seg in self.gparray.getExpansionSegDbList():
            if seg.isSegmentPrimary() == True:
                continue
1662
            seg.setSegmentStatus(STATUS_DOWN)
1663 1664 1665 1666 1667

        # Update the catalog
        configurationInterface.getConfigurationProvider().updateSystemConfig(
            self.gparray,
            "%s: segment config for resync" % getProgramName(),
1668 1669 1670
            dbIdToForceMirrorRemoveAdd={},
            useUtilityMode=True,
            allowPrimary=True
1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681
        )

        # The content IDs may have changed, so we must make sure the array is in proper order.
        self.gparray.reOrderExpansionSegs()

        # Issue checkpoint due to forced shutdown below
        self.conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
        dbconn.execSQL(self.conn, "CHECKPOINT")
        self.conn.close()

        self.logger.info('Stopping database')
1682
        stopCmd = GpStop('gpexpand update master stop database', verbose=True, ctxt=LOCAL, force=True)
1683
        # We do not check the results of GpStop becuase we will get errors for all the new segments.
1684
        stopCmd.run(validateAfter=False)
1685 1686 1687

        self.statusLogger.set_status('UPDATE_CATALOG_DONE')

1688
    # --------------------------------------------------------------------------
1689 1690 1691 1692 1693 1694 1695
    def configure_new_segment_filespaces(self):
        """
        This method is called after all new segments have been configured.
        """

        self.logger.info('Configuring new segment filespaces')
        newSegments = self.gparray.getExpansionSegDbList()
1696
        fsObjList = self.gparray.getFilespaces(includeSystemFilespace=False)
1697 1698 1699 1700 1701 1702 1703 1704

        if len(fsObjList) == 0:
            # No filespaces to configure
            return

        """ Connect to the back end of each new segment directly, and call the filespace setup function. """
        for seg in newSegments:
            if seg.isSegmentMirror() == True:
1705 1706
                continue
            name = "gpexpand prep new segment filespaces. host = %s, sysdatadir = %s" % (
S
Shoaib Lari 已提交
1707
                seg.getSegmentHostName(), seg.getSegmentDataDirectory())
1708 1709 1710 1711 1712 1713 1714 1715 1716
            segFilespaces = seg.getSegmentFilespaces()
            filespaceNames = []
            filespaceLocations = []
            for entry in fsObjList:
                fsname = entry.getName()
                oid = entry.getOid()
                location = segFilespaces[oid]
                filespaceNames.append(fsname)
                filespaceLocations.append(location)
1717 1718 1719 1720 1721 1722 1723 1724 1725
            prepCmd = PrepFileSpaces(name=name
                                     , filespaceNames=filespaceNames
                                     , filespaceLocations=filespaceLocations
                                     , sysDataDirectory=seg.getSegmentDataDirectory()
                                     , dbid=seg.getSegmentDbId()
                                     , contentId=seg.getSegmentContentId()
                                     , ctxt=REMOTE
                                     , remoteHost=seg.getSegmentHostName()
                                     )
1726 1727 1728 1729
            self.pool.addCommand(prepCmd)
        self.pool.join()
        self.pool.check_results()

1730
    # --------------------------------------------------------------------------
1731 1732 1733 1734
    def cleanup_new_segments(self):
        """
        This method is called after all new segments have been configured.
        """
M
Marbin Tan 已提交
1735

1736 1737
        self.logger.info('Cleaning up databases in new segments.')
        newSegments = self.gparray.getExpansionSegDbList()
M
Marbin Tan 已提交
1738

1739 1740 1741 1742 1743
        """ Get a list of databases. """
        self.logger.info('Starting master in utility mode')

        startCmd = GpStart('gpexpand update master start database master only', masterOnly=True)
        startCmd.run(validateAfter=True)
M
Marbin Tan 已提交
1744

1745 1746 1747 1748 1749 1750
        conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
        databases = catalog.getDatabaseList(conn)
        conn.close()

        self.logger.info('Stopping master in utility mode')
        GpStop.local('gpexpand update master stop database', masterOnly=True, fast=True)
M
Marbin Tan 已提交
1751 1752

        """
1753 1754
        Connect to each database in each segment and do some cleanup of tables that have stuff in them as a result of copying the segment from the master.
        Note, this functionaliy used to be in segcopy and was therefore done just once to the original copy of the master. We need to do it separately for
M
Marbin Tan 已提交
1755
        each segment now since filespaces may contain the databases.
1756 1757 1758
        """
        for seg in newSegments:
            if seg.isSegmentMirror() == True:
1759
                continue
M
Marbin Tan 已提交
1760
            """ Start all the new segments in utilty mode. """
1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771
            segStartCmd = SegmentStart(
                name="Starting new segment dbid %s on host %s." % (str(seg.getSegmentDbId()), seg.getSegmentHostName())
                , gpdb=seg
                , numContentsInCluster=0  # Starting seg on it's own.
                , era=None
                , mirrormode=MIRROR_MODE_MIRRORLESS
                , utilityMode=True
                , ctxt=REMOTE
                , remoteHost=seg.getSegmentHostName()
                , noWait=False
                , timeout=SEGMENT_TIMEOUT_DEFAULT)
1772 1773 1774
            self.pool.addCommand(segStartCmd)
        self.pool.join()
        self.pool.check_results()
M
Marbin Tan 已提交
1775

1776 1777 1778 1779
        """
        Build the list of delete statements based on the MASTER_ONLY_TABLES
        defined in gpcatalog.py
        """
1780
        statements = ["delete from pg_catalog.%s" % tab for tab in MASTER_ONLY_TABLES]
M
Marbin Tan 已提交
1781 1782

        """
1783 1784 1785 1786
          Connect to each database in the new segments, and clean up the catalog tables.
        """
        for seg in newSegments:
            if seg.isSegmentMirror() == True:
1787
                continue
1788
            for database in databases:
1789 1790 1791 1792 1793 1794 1795
                if database[0] == 'template0':
                    continue
                dburl = dbconn.DbURL(hostname=seg.getSegmentHostName()
                                     , port=seg.getSegmentPort()
                                     , dbname=database[0]
                                     )
                name = "gpexpand execute segment cleanup commands. seg dbid = %s, command = %s" % (
S
Shoaib Lari 已提交
1796
                    seg.getSegmentDbId(), str(statements))
1797 1798 1799 1800 1801 1802 1803 1804 1805
                execSQLCmd = ExecuteSQLStatementsCommand(name=name
                                                         , url=dburl
                                                         , sqlCommandList=statements
                                                         )
                self.pool.addCommand(execSQLCmd)
                self.pool.join()
                ### need to fix self.pool.check_results(). Call getCompletedItems to clear the queue for now.
                self.pool.check_results()
                self.pool.getCompletedItems()
M
Marbin Tan 已提交
1806

1807 1808 1809 1810 1811
        """
        Stop all the new segments.
        """
        for seg in newSegments:
            if seg.isSegmentMirror() == True:
1812 1813 1814 1815 1816 1817 1818 1819
                continue
            segStopCmd = SegmentStop(
                name="Stopping new segment dbid %s on host %s." % (str(seg.getSegmentDbId), seg.getSegmentHostName())
                , dataDir=seg.getSegmentDataDirectory()
                , mode='smart'
                , nowait=False
                , ctxt=REMOTE
                , remoteHost=seg.getSegmentHostName()
S
Shoaib Lari 已提交
1820
            )
1821 1822 1823
            self.pool.addCommand(segStopCmd)
        self.pool.join()
        self.pool.check_results()
M
Marbin Tan 已提交
1824

1825 1826 1827 1828 1829 1830
        self.logger.info('Starting Greenplum Database in restricted mode')
        startCmd = GpStart('gpexpand update master start database restricted', restricted=True, verbose=True)
        startCmd.run(validateAfter=True)

        # Need to restore the connection used by the expansion
        self.conn = dbconn.connect(self.dburl, encoding='UTF8')
M
Marbin Tan 已提交
1831

1832
    # --------------------------------------------------------------------------
1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848
    def restore_master(self):
        """Restores the gp_segment_configuration catalog table for rollback"""
        backupFile = self.statusLogger.get_gp_segment_configuration_backup()

        if not os.path.exists(backupFile):
            raise ExpansionError('gp_segment_configuration backup file %s does not exist' % backupFile)

        # Create a new gpArray from the backup file
        array = GpArray.initFromFile(backupFile)

        originalDbIds = ""
        originalDbIdsList = []
        first = True
        for seg in array.getDbList():
            originalDbIdsList.append(int(seg.getSegmentDbId()))
            if first == False:
1849
                originalDbIds += ", "
1850 1851
            first = False
            originalDbIds += str(seg.getSegmentDbId())
M
Marbin Tan 已提交
1852

1853
        if len(originalDbIds) > 0:
1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907
            # Update the catalog with the contents of the backup
            restore_conn = None
            try:
                restore_conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8', allowSystemTableMods='dml')

                # Get a list of all the expand primary segments
                sqlStr = "select dbid from pg_catalog.gp_segment_configuration where dbid not in (%s) and role = 'p'" % str(
                    originalDbIds)
                curs = dbconn.execSQL(restore_conn, sqlStr)
                deleteDbIdList = []
                rows = curs.fetchall()
                for row in rows:
                    deleteDbIdList.append(int(row[0]))

                # Get a list of all the expand mirror segments
                sqlStr = "select content from pg_catalog.gp_segment_configuration where dbid not in (%s) and role = 'm'" % str(
                    originalDbIds)
                curs = dbconn.execSQL(restore_conn, sqlStr)
                deleteContentList = []
                rows = curs.fetchall()
                for row in rows:
                    deleteContentList.append(int(row[0]))

                #
                # The following is a sanity check to make sure we don't do something bad here.
                #
                if len(originalDbIdsList) < 2:
                    self.logger.error(
                        "The original DB DIS list is to small to be correct: %s " % str(len(originalDbIdsList)))
                    raise Exception("Unable to complete rollback")

                totalToDelete = len(deleteDbIdList) + len(deleteContentList)
                if int(totalToDelete) > int(self.statusLogger.get_number_new_segments()):
                    self.logger.error(
                        "There was a discrepancy between the number of expand segments to rollback (%s), and the expected number of segment to rollback (%s)" \
                        % (str(totalToDelete), str(self.statusLogger.get_number_new_segments())))
                    self.logger.error("  Expanded primary segment dbids = %s", str(deleteDbIdList))
                    self.logger.error("  Expansion mirror content ids   = %s", str(deleteContentList))
                    raise Exception("Unable to complete rollback")

                for content in deleteContentList:
                    sqlStr = "select * from gp_remove_segment_mirror(%s::smallint)" % str(content)
                    dbconn.execSQL(restore_conn, sqlStr)

                for dbid in deleteDbIdList:
                    sqlStr = "select * from gp_remove_segment(%s::smallint)" % str(dbid)
                    dbconn.execSQL(restore_conn, sqlStr)

                restore_conn.commit()
            except Exception, e:
                raise Exception("Unable to restore master. Exception: " + str(e))
            finally:
                if restore_conn != None:
                    restore_conn.close()
1908 1909 1910

    def sync_new_mirrors(self):
        """ This method will execute gprecoverseg so that all new segments sync with their mirrors."""
1911
        if self.gparray.get_mirroring_enabled() == True:
1912 1913 1914
            self.logger.info('Starting new mirror segment synchronization')
            cmd = GpRecoverSeg(name="gpexpand syncing mirrors", options="-a -F")
            cmd.run(validateAfter=True)
1915 1916 1917

    def start_prepare(self):
        """Inserts into gpexpand.status that expansion preparation has started."""
1918
        if self.options.filename:
1919
            self.statusLogger.create_status_file()
1920
            self.statusLogger.set_status('EXPANSION_PREPARE_STARTED', os.path.abspath(self.options.filename))
1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931

    def finalize_prepare(self):
        """Removes the gpexpand status file and segment configuration backup file"""
        self.statusLogger.remove_status_file()
        self.statusLogger.remove_segment_configuration_backup_file()
        self.pastThePointOfNoReturn = True;

    def setup_schema(self):
        """Used to setup the gpexpand schema"""
        self.statusLogger.set_status('SETUP_EXPANSION_SCHEMA_STARTED')
        self.logger.info('Creating expansion schema')
1932 1933 1934
        dbconn.execSQL(self.conn, create_schema_sql)
        dbconn.execSQL(self.conn, status_table_sql)
        dbconn.execSQL(self.conn, status_detail_table_sql)
1935

1936
        # views
1937
        if not self.options.simple_progress:
1938
            dbconn.execSQL(self.conn, progress_view_sql)
1939
        else:
1940
            dbconn.execSQL(self.conn, progress_view_simple_sql)
1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954

        self.conn.commit()

        self.statusLogger.set_status('SETUP_EXPANSION_SCHEMA_DONE')

    def prepare_schema(self):
        """Prepares the gpexpand schema"""
        self.statusLogger.set_status('PREPARE_EXPANSION_SCHEMA_STARTED')

        if not self.conn:
            self.conn = dbconn.connect(self.dburl, encoding='UTF8', allowSystemTableMods='dml')
            self.gparray = GpArray.initFromCatalog(self.dburl)

        nowStr = datetime.datetime.now()
1955
        statusSQL = "INSERT INTO %s.%s VALUES ( 'SETUP', '%s' ) " % (gpexpand_schema, status_table, nowStr)
1956

1957
        dbconn.execSQL(self.conn, statusSQL)
1958 1959 1960 1961

        db_list = catalog.getDatabaseList(self.conn)

        for db in db_list:
1962
            dbname = db[0]
1963 1964
            if dbname == 'template0':
                continue
1965
            self.logger.info('Populating %s.%s with data from database %s' % (
S
Shoaib Lari 已提交
1966
                gpexpand_schema, status_detail_table, dbname.decode('utf-8')))
1967 1968 1969 1970 1971 1972
            self._populate_regular_tables(dbname)
            self._populate_partitioned_tables(dbname)
            inject_fault('gpexpand MPP-14620 fault injection')
            self._update_distribution_policy(dbname)

        nowStr = datetime.datetime.now()
1973
        statusSQL = "INSERT INTO %s.%s VALUES ( 'SETUP DONE', '%s' ) " % (gpexpand_schema, status_table, nowStr)
1974 1975 1976 1977
        dbconn.execSQL(self.conn, statusSQL)

        self.conn.commit()

1978
        self.conn.close()
1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989

        self.statusLogger.set_status('PREPARE_EXPANSION_SCHEMA_DONE')
        self.statusLogger.set_status('EXPANSION_PREPARE_DONE')

        # At this point, no rollback is possible and the the system
        # including new segments has been started once before so finalize
        self.finalize_prepare()

        self.logger.info('Stopping Greenplum Database')
        GpStop.local('gpexpand setup complete', fast=True)

1990
    def _populate_regular_tables(self, dbname):
1991 1992 1993 1994
        """ we don't do 3.2+ style partitioned tables here, but we do
            all other table types.
        """

1995
        src_bytes_str = "0" if self.options.simple_progress else "pg_relation_size(quote_ident(n.nspname) || '.' || quote_ident(c.relname))"
1996
        sql = """SELECT
1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033
    n.nspname || '.' || c.relname as fq_name,
    n.oid as schemaoid,
    c.oid as tableoid,
    p.attrnums as distribution_policy,
    now() as last_updated,
    %s
FROM
            pg_class c
    JOIN pg_namespace n ON (c.relnamespace=n.oid)
    JOIN pg_catalog.gp_distribution_policy p on (c.oid = p.localoid)
    LEFT JOIN pg_partition pp on (c.oid=pp.parrelid)
    LEFT JOIN pg_partition_rule pr on (c.oid=pr.parchildrelid)
WHERE
    pp.parrelid is NULL
    AND pr.parchildrelid is NULL
    AND n.nspname != 'gpexpand'
    AND n.nspname != 'pg_bitmapindex'
    AND c.relstorage != 'x';

                  """ % (src_bytes_str)
        self.logger.debug(sql)
        table_conn = self.connect_database(dbname)
        curs = dbconn.execSQL(table_conn, sql)
        rows = curs.fetchall()
        try:
            sql_file = os.path.abspath('./%s.dat' % status_detail_table)
            self.logger.debug('status_detail data file: %s' % sql_file)
            fp = open(sql_file, 'w')
            for row in rows:
                fqname = row[0]
                schema_oid = row[1]
                table_oid = row[2]
                if row[3]:
                    self.logger.debug("dist policy raw: %s " % row[3].decode('utf-8'))
                else:
                    self.logger.debug("dist policy raw: NULL")
                dist_policy = row[3]
2034
                (policy_name, policy_oids) = self.form_dist_policy_name(table_conn, row[3], table_oid)
2035 2036 2037 2038 2039 2040 2041 2042
                rel_bytes = int(row[5])

                if dist_policy is None:
                    dist_policy = 'NULL'

                full_name = '%s.%s' % (dbname, fqname)
                rank = 1 if self.unique_index_tables.has_key(full_name) else 2

2043
                fp.write("""%s\t%s\t%s\t%s\t%s\t%s\t%s\tNULL\t%d\t%s\tNULL\tNULL\t%d\n""" % (
S
Shoaib Lari 已提交
2044 2045 2046
                    dbname, fqname, schema_oid, table_oid,
                    dist_policy, policy_name, policy_oids,
                    rank, undone_status, rel_bytes))
2047 2048 2049 2050 2051 2052 2053 2054 2055
        except Exception, e:
            raise ExpansionError(e)
        finally:
            if fp: fp.close()

        try:
            copySQL = """COPY %s.%s FROM '%s' NULL AS 'NULL'""" % (gpexpand_schema, status_detail_table, sql_file)

            self.logger.debug(copySQL)
2056
            dbconn.execSQL(self.conn, copySQL)
2057 2058 2059 2060 2061 2062 2063 2064
        except Exception, e:
            raise ExpansionError(e)
        finally:
            os.unlink(sql_file)

        table_conn.commit()
        table_conn.close()

2065
    def _populate_partitioned_tables(self, dbname):
2066
        """population of status_detail for partitioned tables. """
2067
        src_bytes_str = "0" if self.options.simple_progress else "pg_relation_size(quote_ident(p.partitionschemaname) || '.' || quote_ident(p.partitiontablename))"
2068
        sql = """
2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115
SELECT
    p.partitionschemaname || '.' || p.partitiontablename as fq_name,
    n.oid as schemaoid,
    c2.oid as tableoid,
    d.attrnums as distributed_policy,
    now() as last_updated,
    %s,
    partitiontype,partitionlevel,partitionrank,partitionposition,
    partitionrangestart
FROM
    pg_partitions p,
    pg_class c,
    pg_class c2,
    pg_namespace n,
    pg_namespace n2,
    gp_distribution_policy d
WHERE
    quote_ident(p.tablename) = quote_ident(c.relname)
    AND    d.localoid = c2.oid
    AND quote_ident(p.schemaname) = quote_ident(n.nspname)
    AND c.relnamespace = n.oid
    AND p.partitionlevel = (select max(parlevel) FROM pg_partition WHERE parrelid = c.oid)
    AND quote_ident(p.partitionschemaname) = quote_ident(n2.nspname)
    AND quote_ident(p.partitiontablename) = quote_ident(c2.relname)
    AND c2.relnamespace = n2.oid
    AND c2.relstorage != 'x'
ORDER BY tablename, c2.oid desc;
                  """ % (src_bytes_str)
        self.logger.debug(sql)
        table_conn = self.connect_database(dbname)
        curs = dbconn.execSQL(table_conn, sql)
        rows = curs.fetchall()

        try:
            sql_file = os.path.abspath('./%s.dat' % status_detail_table)
            self.logger.debug('status_detail data file: %s' % sql_file)
            fp = open(sql_file, 'w')

            for row in rows:
                fqname = row[0]
                schema_oid = row[1]
                table_oid = row[2]
                if row[3]:
                    self.logger.debug("dist policy raw: %s " % row[3])
                else:
                    self.logger.debug("dist policy raw: NULL")
                dist_policy = row[3]
2116
                (policy_name, policy_oids) = self.form_dist_policy_name(table_conn, row[3], table_oid)
2117 2118 2119 2120 2121 2122 2123 2124
                rel_bytes = int(row[5])

                if dist_policy is None:
                    dist_policy = 'NULL'

                full_name = '%s.%s' % (dbname, fqname)
                rank = 1 if self.unique_index_tables.has_key(full_name) else 2

2125
                fp.write("""%s\t%s\t%s\t%s\t%s\t%s\t%s\tNULL\t%d\t%s\tNULL\tNULL\t%d\n""" % (
S
Shoaib Lari 已提交
2126 2127 2128
                    dbname, fqname, schema_oid, table_oid,
                    dist_policy, policy_name, policy_oids,
                    rank, undone_status, rel_bytes))
2129 2130 2131 2132 2133 2134 2135 2136 2137
        except Exception:
            raise
        finally:
            if fp: fp.close()

        try:
            copySQL = """COPY %s.%s FROM '%s' NULL AS 'NULL'""" % (gpexpand_schema, status_detail_table, sql_file)

            self.logger.debug(copySQL)
2138
            dbconn.execSQL(self.conn, copySQL)
2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150
        except Exception, e:
            raise ExpansionError(e)
        finally:
            os.unlink(sql_file)

        table_conn.commit()
        table_conn.close()

    def _update_distribution_policy(self, dbname):
        """ NULL out the distribution policy for both
            regular and paritioned table before expansion
        """
M
Marbin Tan 已提交
2151

2152
        table_conn = self.connect_database(dbname)
2153
        # null out the dist policies
2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168
        sql = """
UPDATE  gp_distribution_policy
  SET attrnums = NULL
FROM pg_class c
    JOIN pg_namespace n ON (c.relnamespace=n.oid)
    LEFT JOIN pg_partition pp ON (c.oid=pp.parrelid)
    LEFT JOIN pg_partition_rule pr ON (c.oid=pr.parchildrelid)
WHERE
    localoid = c.oid
    AND pp.parrelid IS NULL
    AND pr.parchildrelid IS NULL
    AND n.nspname != 'gpexpand';
        """

        self.logger.debug(sql)
2169
        dbconn.execSQL(table_conn, sql)
2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181

        sql = """
UPDATE gp_distribution_policy
    SET attrnums = NULL
    FROM
        ( SELECT pp.parrelid AS tableoid,
                 n2.nspname AS partitionschemaname, cl2.relname AS partitiontablename,
                 cl2.oid AS partitiontableoid, pr1.parname AS partitionname, cl3.relname AS parentpartitiontablename, pr2.parname AS parentpartitioname,
                    pp.parlevel AS partitionlevel, pr1.parruleord AS partitionposition
               FROM pg_namespace n, pg_namespace n2, pg_class cl, pg_class cl2, pg_partition pp, pg_partition_rule pr1
          LEFT JOIN pg_partition_rule pr2 ON pr1.parparentrule = pr2.oid
       LEFT JOIN pg_class cl3 ON pr2.parchildrelid = cl3.oid
L
Larry Hamel 已提交
2182
      WHERE pp.paristemplate = FALSE AND pp.parrelid = cl.oid AND pr1.paroid = pp.oid AND cl2.oid = pr1.parchildrelid AND cl.relnamespace = n.oid AND cl2.relnamespace = n2.oid
2183 2184 2185
    ) p1
    WHERE
    localoid = p1.partitiontableoid
L
Larry Hamel 已提交
2186
    AND p1.partitionlevel = (SELECT max(parlevel) FROM pg_partition WHERE parrelid = p1.tableoid);
2187 2188 2189

"""
        self.logger.debug(sql)
2190
        dbconn.execSQL(table_conn, sql)
2191 2192
        table_conn.commit()
        table_conn.close()
M
Marbin Tan 已提交
2193

2194
    def form_dist_policy_name(self, conn, rs_val, table_oid):
2195
        if rs_val is None:
2196 2197
            return (None, None)
        rs_val = rs_val.lstrip('{').rstrip('}').strip()
2198

2199 2200
        namedict = {}
        oiddict = {}
2201
        sql = "select attnum, attname, attrelid from pg_attribute where attrelid =  %s and attnum > 0" % table_oid
2202
        cursor = dbconn.execSQL(conn, sql)
2203 2204 2205 2206
        for row in cursor:
            namedict[row[0]] = row[1]
            oiddict[row[0]] = row[2]

2207 2208
        name_list = []
        oid_list = []
2209 2210

        if rs_val != "":
2211
            rs_list = rs_val.split(',')
2212 2213 2214 2215 2216

            for colnum in rs_list:
                name_list.append(namedict[int(colnum)])
                oid_list.append(str(oiddict[int(colnum)]))

2217
        return (' , '.join(name_list), ' , '.join(oid_list))
2218 2219 2220 2221 2222

    def perform_expansion(self):
        """Performs the actual table re-organiations"""
        expansionStart = datetime.datetime.now()

2223 2224
        # setup a threadpool
        self.queue = WorkerPool(numWorkers=self.numworkers)
2225

2226
        # go through and reset any "IN PROGRESS" tables
2227
        self.conn = dbconn.connect(self.dburl, encoding='UTF8')
2228
        sql = "INSERT INTO %s.%s VALUES ( 'EXPANSION STARTED', '%s' ) " % (
S
Shoaib Lari 已提交
2229
            gpexpand_schema, status_table, expansionStart)
2230 2231 2232
        cursor = dbconn.execSQL(self.conn, sql)
        self.conn.commit()

2233 2234
        sql = """UPDATE gpexpand.status_detail set status = '%s' WHERE status = '%s' """ % (undone_status, start_status)
        cursor = dbconn.execSQL(self.conn, sql)
2235 2236
        self.conn.commit()

2237 2238 2239
        # read schema and queue up commands
        sql = "SELECT * FROM %s.%s WHERE status = 'NOT STARTED' ORDER BY rank" % (gpexpand_schema, status_detail_table)
        cursor = dbconn.execSQL(self.conn, sql)
2240 2241 2242 2243

        for row in cursor:
            self.logger.debug(row)
            name = "name"
2244 2245
            tbl = ExpandTable(options=self.options, row=row)
            cmd = ExpandCommand(name=name, status_url=self.dburl, table=tbl, options=self.options)
2246 2247
            self.queue.addCommand(cmd)

2248 2249
        table_expand_error = False

2250 2251
        stopTime = None
        stoppedEarly = False
2252 2253
        if self.options.end:
            stopTime = self.options.end
2254

2255
        # wait till done.
2256
        while not self.queue.isDone():
2257 2258
            logger.debug(
                "woke up.  queue: %d finished %d  " % (self.queue.num_assigned, self.queue.completed_queue.qsize()))
2259 2260 2261
            if stopTime and datetime.datetime.now() >= stopTime:
                stoppedEarly = True
                break
2262
            time.sleep(5)
2263 2264 2265 2266 2267 2268 2269 2270

        expansionStopped = datetime.datetime.now()

        self.pool.haltWork()
        self.pool.joinWorkers()
        self.queue.haltWork()
        self.queue.joinWorkers()

2271 2272 2273 2274 2275 2276 2277
        # Doing this after the halt and join workers guarantees that no new completed items can be added
        # while we're doing a check
        for expandCommand in self.queue.getCompletedItems():
            if expandCommand.table_expand_error:
                table_expand_error = True
                break

2278 2279
        if stoppedEarly:
            logger.info('End time reached.  Stopping expansion.')
2280
            sql = "INSERT INTO %s.%s VALUES ( 'EXPANSION STOPPED', '%s' ) " % (
S
Shoaib Lari 已提交
2281
                gpexpand_schema, status_table, expansionStopped)
2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294
            cursor = dbconn.execSQL(self.conn, sql)
            self.conn.commit()
            logger.info('You can resume expansion by running gpexpand again')
        elif table_expand_error:
            logger.warn('**************************************************')
            logger.warn('One or more tables failed to expand successfully.')
            logger.warn('Please check the log file, correct the problem and')
            logger.warn('run gpexpand again to finish the expansion process')
            logger.warn('**************************************************')
            # We'll try to update the status, but if the errors were caused by
            # going into read only mode, this will fail.  That's ok though as
            # gpexpand will resume next run
            try:
2295
                sql = "INSERT INTO %s.%s VALUES ( 'EXPANSION STOPPED', '%s' ) " % (
S
Shoaib Lari 已提交
2296
                    gpexpand_schema, status_table, expansionStopped)
2297 2298 2299 2300 2301
                cursor = dbconn.execSQL(self.conn, sql)
                self.conn.commit()
            except:
                pass
        else:
2302
            sql = "INSERT INTO %s.%s VALUES ( 'EXPANSION COMPLETE', '%s' ) " % (
S
Shoaib Lari 已提交
2303
                gpexpand_schema, status_table, expansionStopped)
2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320
            cursor = dbconn.execSQL(self.conn, sql)
            self.conn.commit()
            logger.info("EXPANSION COMPLETED SUCCESSFULLY")

    def shutdown(self):
        """used if the script is closed abrubtly"""
        logger.info('Shutting down gpexpand...')
        if self.pool:
            self.pool.haltWork()
            self.pool.joinWorkers()

        if self.queue:
            self.queue.haltWork()
            self.queue.joinWorkers()

        try:
            expansionStopped = datetime.datetime.now()
2321
            sql = "INSERT INTO %s.%s VALUES ( 'EXPANSION STOPPED', '%s' ) " % (
S
Shoaib Lari 已提交
2322
                gpexpand_schema, status_table, expansionStopped)
2323 2324 2325
            cursor = dbconn.execSQL(self.conn, sql)
            self.conn.commit()

2326
            cursor.close()
2327 2328 2329 2330 2331 2332 2333
            self.conn.close()
        except pg.OperationalError:
            pass
        except Exception:
            # schema doesn't exist.  Cancel or error during setup
            pass

2334 2335 2336 2337 2338 2339 2340 2341
    def halt_work(self):
        if self.pool:
            self.pool.haltWork()
            self.pool.joinWorkers()

        if self.queue:
            self.queue.haltWork()
            self.queue.joinWorkers()
2342 2343 2344

    def cleanup_schema(self, gpexpand_db_status):
        """Removes the gpexpand schema"""
2345
        # drop schema
2346 2347 2348 2349
        if gpexpand_db_status != 'EXPANSION COMPLETE':
            c = dbconn.connect(self.dburl, encoding='UTF8')
            self.logger.warn('Expansion has not yet completed.  Removing the expansion')
            self.logger.warn('schema now will leave the following tables unexpanded:')
2350
            unexpanded_tables_sql = "SELECT fq_name FROM %s.%s WHERE status = 'NOT STARTED' ORDER BY rank" % (
S
Shoaib Lari 已提交
2351
                gpexpand_schema, status_detail_table)
2352

2353
            cursor = dbconn.execSQL(c, unexpanded_tables_sql)
2354 2355 2356 2357 2358 2359 2360
            unexpanded_tables_text = ''.join("\t%s\n" % row[0] for row in cursor)

            c.close()

            self.logger.warn(unexpanded_tables_text)
            self.logger.warn('These tables will have to be expanded manually by setting')
            self.logger.warn('the distribution policy using the ALTER TABLE command.')
2361
            if not ask_yesno('', "Are you sure you want to drop the expansion schema?", 'N'):
2362 2363 2364 2365 2366
                logger.info("User Aborted. Exiting...")
                sys.exit(0)

        # See if user wants to dump the status_detail table to file
        c = dbconn.connect(self.dburl, encoding='UTF8')
2367 2368
        if ask_yesno('', "Do you want to dump the gpexpand.status_detail table to file?", 'Y'):
            self.logger.info(
2369 2370
                "Dumping gpexpand.status_detail to %s/gpexpand.status_detail" % self.options.master_data_directory)
            copy_gpexpand_status_detail_sql = "COPY gpexpand.status_detail TO '%s/gpexpand.status_detail'" % self.options.master_data_directory
2371 2372 2373
            dbconn.execSQL(c, copy_gpexpand_status_detail_sql)

        self.logger.info("Removing gpexpand schema")
2374
        dbconn.execSQL(c, drop_schema_sql)
2375 2376 2377
        c.commit()
        c.close()

2378
    def connect_database(self, dbname):
2379 2380 2381 2382 2383 2384 2385
        test_url = copy.deepcopy(self.dburl)
        test_url.pgdb = dbname
        c = dbconn.connect(test_url, encoding='UTF8', allowSystemTableMods='dml')
        return c

    def sync_packages(self):
        """
M
Marbin Tan 已提交
2386
        The design decision here is to squash any exceptions resulting from the
2387 2388 2389 2390
        synchronization of packages. We should *not* disturb the user's attempts to expand.
        """
        try:
            logger.info('Syncing Greenplum Database extensions')
M
Marbin Tan 已提交
2391
            new_segment_list = self.gparray.getExpansionSegDbList()
2392
            new_host_set = set([h.getSegmentHostName() for h in new_segment_list])
M
Marbin Tan 已提交
2393 2394
            operations = [SyncPackages(host) for host in new_host_set]
            ParallelOperation(operations, self.numworkers).run()
2395 2396 2397
            # introspect outcomes
            for operation in operations:
                operation.get_ret()
2398
        except Exception:
2399 2400 2401 2402 2403 2404 2405
            logger.exception('Syncing of Greenplum Database extensions has failed.')
            logger.warning('Please run gppkg --clean after successful expansion.')

    def move_filespaces(self):
        """
            Move filespaces for temporary and transaction files.
        """
M
Marbin Tan 已提交
2406

2407 2408 2409 2410
        segments = self.gparray.getExpansionSegDbList()

        cur_filespace_entries = GetFilespaceEntriesDict(GetFilespaceEntries(self.gparray,
                                                                            PG_SYSTEM_FILESPACE
2411
                                                                            ).run()).run()
2412 2413 2414 2415 2416
        pg_system_filespace_entries = cur_filespace_entries
        cur_filespace_name = self.gparray.getFileSpaceName(int(cur_filespace_entries[1][0]))
        segments = self.gparray.getExpansionSegDbList()

        logger.info('Checking if Transaction filespace was moved')
M
Marbin Tan 已提交
2417
        if os.path.exists(os.path.join(cur_filespace_entries[1][2], GP_TRANSACTION_FILES_FILESPACE)):
2418 2419
            logger.info('Transaction filespace was moved')
            new_filespace_entries = GetFilespaceEntriesDict(GetCurrentFilespaceEntries(self.gparray,
M
Marbin Tan 已提交
2420
                                                                                       FileType.TRANSACTION_FILES
2421
                                                                                       ).run()).run()
2422 2423 2424 2425 2426 2427 2428
            new_filespace_name = self.gparray.getFileSpaceName(int(new_filespace_entries[1][0]))
            operations_list = GetMoveOperationList(segments,
                                                   FileType.TRANSACTION_FILES,
                                                   new_filespace_name,
                                                   new_filespace_entries,
                                                   cur_filespace_entries,
                                                   pg_system_filespace_entries
2429
                                                   ).run()
2430 2431 2432 2433 2434 2435 2436 2437

            logger.info('Moving Transaction filespace on expansion segments')
            ParallelOperation(operations_list).run()

            logger.debug('Checking results of transaction files move')
            for operation in operations_list:
                try:
                    operation.get_ret()
2438
                except Exception as _:
2439 2440 2441 2442 2443
                    logger.info('Transaction filespace move failed on expansion segment')
                    RollBackFilespaceChanges(segments,
                                             FileType.TRANSACTION_FILES,
                                             cur_filespace_name,
                                             cur_filespace_entries,
M
Marbin Tan 已提交
2444
                                             new_filespace_entries,
2445
                                             pg_system_filespace_entries,
2446
                                             ).run()
2447
                    raise
M
Marbin Tan 已提交
2448

2449
        logger.info('Checking if Temporary filespace was moved')
M
Marbin Tan 已提交
2450
        if os.path.exists(os.path.join(cur_filespace_entries[1][2], GP_TEMPORARY_FILES_FILESPACE)):
2451 2452
            logger.info('Temporary filespace was moved')
            new_filespace_entries = GetFilespaceEntriesDict(GetCurrentFilespaceEntries(self.gparray,
M
Marbin Tan 已提交
2453
                                                                                       FileType.TEMPORARY_FILES
2454
                                                                                       ).run()).run()
2455 2456 2457 2458 2459 2460 2461
            new_filespace_name = self.gparray.getFileSpaceName(int(new_filespace_entries[1][0]))
            operations_list = GetMoveOperationList(segments,
                                                   FileType.TEMPORARY_FILES,
                                                   new_filespace_name,
                                                   new_filespace_entries,
                                                   cur_filespace_entries,
                                                   pg_system_filespace_entries
2462
                                                   ).run()
2463 2464 2465 2466 2467 2468 2469 2470

            logger.info('Moving Temporary filespace on expansion segments')
            ParallelOperation(operations_list).run()

            logger.debug('Checking results of temporary files move')
            for operation in operations_list:
                try:
                    operation.get_ret()
2471
                except Exception:
2472 2473 2474 2475 2476
                    logger.info('Temporary filespace move failed on expansion segment')
                    RollBackFilespaceChanges(segments,
                                             FileType.TEMPORARY_FILES,
                                             cur_filespace_name,
                                             cur_filespace_entries,
M
Marbin Tan 已提交
2477
                                             new_filespace_entries,
2478
                                             pg_system_filespace_entries
2479
                                             ).run()
2480 2481
                    raise

2482
        # Update flat files on mirrors
2483 2484 2485
        UpdateFlatFiles(self.gparray, primaries=False, expansion=True).run()


2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518
    def validate_heap_checksums(self):
        num_workers = min(len(self.gparray.get_hostlist()), MAX_PARALLEL_EXPANDS)
        heap_checksum_util = HeapChecksum(gparray=self.gparray, num_workers=num_workers, logger=self.logger)
        successes, failures = heap_checksum_util.get_segments_checksum_settings()
        if len(successes) == 0:
            logger.fatal("No segments responded to ssh query for heap checksum. Not expanding the cluster.")
            return 1

        consistent, inconsistent, master_heap_checksum = heap_checksum_util.check_segment_consistency(successes)

        inconsistent_segment_msgs = []
        for segment in inconsistent:
            inconsistent_segment_msgs.append("dbid: %s "
                                             "checksum set to %s differs from master checksum set to %s" %
                                             (segment.getSegmentDbId(), segment.heap_checksum,
                                              master_heap_checksum))

        if not heap_checksum_util.are_segments_consistent(consistent, inconsistent):
            self.logger.fatal("Cluster heap checksum setting differences reported")
            self.logger.fatal("Heap checksum settings on %d of %d segment instances do not match master <<<<<<<<"
                              % (len(inconsistent_segment_msgs), len(self.gparray.segments)))
            self.logger.fatal("Review %s for details" % get_logfile())
            log_to_file_only("Failed checksum consistency validation:", logging.WARN)
            self.logger.fatal("gpexpand error: Cluster will not be modified as checksum settings are not consistent "
                              "across the cluster.")

            for msg in inconsistent_segment_msgs:
                log_to_file_only(msg, logging.WARN)
                raise Exception("Segments have heap_checksum set inconsistently to master")
        else:
            self.logger.info("Heap checksum setting consistent across cluster")


2519 2520
# -----------------------------------------------
class ExpandTable():
2521 2522
    def __init__(self, options, row=None):
        self.options = options
2523
        if row is not None:
2524 2525 2526 2527
            (self.dbname, self.fq_name, self.schema_oid, self.table_oid,
             self.distrib_policy, self.distrib_policy_names, self.distrib_policy_coloids,
             self.storage_options, self.rank, self.status,
             self.expansion_started, self.expansion_finished,
2528 2529
             self.source_bytes) = row

2530
    def add_table(self, conn):
2531 2532 2533
        insertSQL = """INSERT INTO %s.%s
                            VALUES ('%s','%s',%s,%s,
                                    '%s','%s','%s','%s',%d,'%s','%s','%s',%d)
2534 2535 2536 2537 2538 2539
                    """ % (gpexpand_schema, status_detail_table,
                           self.dbname, self.fq_name, self.schema_oid, self.table_oid,
                           self.distrib_policy, self.distrib_policy_names, self.distrib_policy_coloids,
                           self.storage_options, self.rank, self.status,
                           self.expansion_started, self.expansion_finished,
                           self.source_bytes)
2540 2541
        logger.info('Added table %s.%s' % (self.dbname.decode('utf-8'), self.fq_name.decode('utf-8')))
        logger.debug(insertSQL.decode('utf-8'))
2542
        dbconn.execSQL(conn, insertSQL)
2543

2544
    def mark_started(self, status_conn, table_conn, start_time, cancel_flag):
2545 2546 2547 2548
        if cancel_flag:
            return
        (schema_name, table_name) = self.fq_name.split('.')
        sql = "SELECT pg_relation_size(quote_ident('%s') || '.' || quote_ident('%s'))" % (schema_name, table_name)
2549
        cursor = dbconn.execSQL(table_conn, sql)
2550 2551 2552 2553 2554 2555 2556 2557
        row = cursor.fetchone()
        src_bytes = int(row[0])
        logger.debug(" Table: %s has %d bytes" % (self.fq_name.decode('utf-8'), src_bytes))

        sql = """UPDATE %s.%s
                  SET status = '%s', expansion_started='%s',
                      source_bytes = %d
                  WHERE dbname = '%s' AND schema_oid = %s
2558 2559 2560 2561
                        AND table_oid = %s """ % (gpexpand_schema, status_detail_table,
                                                  start_status, start_time,
                                                  src_bytes, self.dbname,
                                                  self.schema_oid, self.table_oid)
2562 2563

        logger.debug("Mark Started: " + sql.decode('utf-8'))
2564
        dbconn.execSQL(status_conn, sql)
2565 2566 2567 2568 2569 2570
        status_conn.commit()

    def reset_started(self, status_conn):
        sql = """UPDATE %s.%s
                 SET status = '%s', expansion_started=NULL, expansion_finished=NULL
                 WHERE dbname = '%s' AND schema_oid = %s
2571 2572
                 AND table_oid = %s """ % (gpexpand_schema, status_detail_table, undone_status,
                                           self.dbname, self.schema_oid, self.table_oid)
2573 2574

        logger.debug('Reseting detailed_status: %s' % sql.decode('utf-8'))
2575
        dbconn.execSQL(status_conn, sql)
2576 2577
        status_conn.commit()

2578
    def expand(self, table_conn, cancel_flag):
2579 2580 2581 2582 2583 2584 2585
        foo = self.distrib_policy_names.strip()
        new_storage_options = ''
        if self.storage_options:
            new_storage_options = ',' + self.storage_options

        (schema_name, table_name) = self.fq_name.split('.')

2586 2587
        logger.info("Distribution policy for table %s is '%s' " % (self.fq_name.decode('utf-8'), foo.decode('utf-8')))
        # logger.info("Storage options for table %s is %s" % (self.fq_name, self.storage_options))
2588 2589

        if foo == "" or foo == "None" or foo is None:
2590
            sql = 'ALTER TABLE ONLY "%s"."%s" SET WITH(REORGANIZE=TRUE%s) DISTRIBUTED RANDOMLY' % (
S
Shoaib Lari 已提交
2591
                schema_name, table_name, new_storage_options)
2592 2593 2594 2595
        else:
            dist_cols = foo.split(',')
            dist_cols = ['"%s"' % x.strip() for x in dist_cols]
            dist_cols = ','.join(dist_cols)
2596
            sql = 'ALTER TABLE ONLY "%s"."%s" SET WITH(REORGANIZE=TRUE%s) DISTRIBUTED BY (%s)' % (
S
Shoaib Lari 已提交
2597
                schema_name, table_name, new_storage_options, dist_cols)
2598 2599 2600 2601 2602 2603

        logger.info('Expanding %s.%s' % (self.dbname.decode('utf-8'), self.fq_name.decode('utf-8')))
        logger.debug("Expand SQL: %s" % sql.decode('utf-8'))

        # check is atomic in python
        if not cancel_flag:
2604
            dbconn.execSQL(table_conn, sql)
2605
            table_conn.commit()
2606
            if self.options.analyze:
2607 2608 2609 2610 2611
                sql = 'ANALYZE "%s"."%s"' % (schema_name, table_name)
                logger.info('Analyzing %s.%s' % (schema_name.decode('utf-8'), table_name.decode('utf-8')))
                dbconn.execSQL(table_conn, sql)
                table_conn.commit()

2612 2613 2614 2615 2616
            return True

        # I can only get here if the cancel flag is True
        return False

2617
    def mark_finished(self, status_conn, start_time, finish_time):
2618 2619 2620
        sql = """UPDATE %s.%s
                  SET status = '%s', expansion_started='%s', expansion_finished='%s'
                  WHERE dbname = '%s' AND schema_oid = %s
2621 2622 2623
                  AND table_oid = %s """ % (gpexpand_schema, status_detail_table,
                                            done_status, start_time, finish_time,
                                            self.dbname, self.schema_oid, self.table_oid)
2624
        logger.debug(sql.decode('utf-8'))
2625
        dbconn.execSQL(status_conn, sql)
2626 2627 2628 2629 2630 2631
        status_conn.commit()

    def mark_does_not_exist(self, status_conn, finish_time):
        sql = """UPDATE %s.%s
                  SET status = '%s', expansion_finished='%s'
                  WHERE dbname = '%s' AND schema_oid = %s
2632
                  AND table_oid = %s """ % (gpexpand_schema, status_detail_table,
2633
                                            does_not_exist_status, finish_time,
2634
                                            self.dbname, self.schema_oid, self.table_oid)
2635
        logger.debug(sql.decode('utf-8'))
2636
        dbconn.execSQL(status_conn, sql)
2637 2638 2639
        status_conn.commit()


2640
# -----------------------------------------------
2641 2642 2643 2644
class PrepFileSpaces(Command):
    """
    This class will connect to a segment backend and execute the gp_prep_new_segment function to setup the file spaces.
    """
2645 2646 2647

    def __init__(self, name, filespaceNames, filespaceLocations, sysDataDirectory, dbid, contentId, ctxt=LOCAL,
                 remoteHost=None):
2648 2649 2650 2651 2652 2653 2654 2655
        self.name = name
        self.filespaceNames = filespaceNames
        self.filespaceLocations = filespaceLocations
        self.sysDataDirectory = sysDataDirectory
        self.dbid = dbid
        self.contentId = contentId
        self.filespaces = []
        for i in range(len(filespaceNames)):
2656
            entry = [filespaceNames[i], filespaceLocations[i]]
2657
            self.filespaces.append(entry)
2658 2659
        cmdStr = """echo "select * from gp_prep_new_segment( array %s )" """ % (str(self.filespaces))
        cmdStr += """ | $GPHOME/bin/postgres --single --gp_num_contents_in_cluster=1 -O -c gp_session_role=utility -c gp_debug_linger=0 -c gp_before_filespace_setup=true  -E -D %s --gp_dbid=%s --gp_contentid=%s template1""" % (
S
Shoaib Lari 已提交
2660
            self.sysDataDirectory, str(self.dbid), str(self.contentId))
2661
        Command.__init__(self, name, cmdStr, ctxt, remoteHost)
M
Marbin Tan 已提交
2662

2663

2664
# -----------------------------------------------
2665 2666 2667 2668
class ExecuteSQLStatementsCommand(SQLCommand):
    """
    This class will execute a list of SQL statements.
    """
2669

2670 2671 2672 2673 2674 2675
    def __init__(self, name, url, sqlCommandList):
        self.name = name
        self.url = url
        self.sqlCommandList = sqlCommandList
        self.conn = None
        self.error = None
M
Marbin Tan 已提交
2676

2677 2678
        SQLCommand.__init__(self, name)
        pass
M
Marbin Tan 已提交
2679

2680
    def run(self, validateAfter=False):
2681 2682 2683 2684
        statement = None

        faultPoint = os.getenv('GP_COMMAND_FAULT_POINT')
        if faultPoint and self.name and self.name.startswith(faultPoint):
2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695
            # simulate error
            self.results = CommandResult(1, 'Fault Injection', 'Fault Injection', False, True)
            self.error = "Fault Injection"
            return

        self.results = CommandResult(rc=0
                                     , stdout=""
                                     , stderr=""
                                     , completed=True
                                     , halt=False
                                     )
2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707

        try:
            self.conn = dbconn.connect(self.url, utility=True, encoding='UTF8', allowSystemTableMods='dml')
            for statement in self.sqlCommandList:
                dbconn.execSQL(self.conn, statement)
            self.conn.commit()
        except Exception, e:
            # traceback.print_exc()
            logger.error("Exception in ExecuteSQLStatements. URL = %s" % str(self.url))
            logger.error("  Statement = %s" % str(statement))
            logger.error("  Exception = %s" % str(e))
            self.error = str(e)
2708 2709 2710 2711 2712 2713
            self.results = CommandResult(rc=1
                                         , stdout=""
                                         , stderr=str(e)
                                         , completed=False
                                         , halt=True
                                         )
2714 2715
        finally:
            if self.conn != None:
2716
                self.conn.close()
M
Marbin Tan 已提交
2717

2718 2719
    def set_results(self, results):
        raise ExecutionError("TODO:  must implement", None)
2720 2721 2722 2723 2724 2725

    def get_results(self):
        return self.results

    def was_successful(self):
        if self.error != None:
2726
            return False
2727
        else:
2728
            return True
2729

2730 2731
    def validate(self, expected_rc=0):
        raise ExecutionError("TODO:  must implement", None)
2732 2733


2734
# -----------------------------------------------
2735
class ExpandCommand(SQLCommand):
2736
    def __init__(self, name, status_url, table, options):
2737 2738
        self.status_url = status_url
        self.table = table
2739
        self.options = options
2740
        self.cmdStr = "Expand %s.%s" % (table.dbname, table.fq_name)
2741 2742
        self.table_url = copy.deepcopy(status_url)
        self.table_url.pgdb = table.dbname
2743
        self.table_expand_error = False
2744

2745
        SQLCommand.__init__(self, name)
2746 2747
        pass

2748 2749
    def run(self, validateAfter=False):
        # connect.
2750 2751 2752 2753 2754
        status_conn = None
        table_conn = None
        table_exp_success = False

        try:
2755 2756
            status_conn = dbconn.connect(self.status_url, encoding='UTF8')
            table_conn = dbconn.connect(self.table_url, encoding='UTF8')
2757
        except DatabaseError, ex:
2758
            if self.options.verbose:
2759 2760 2761 2762
                logger.exception(ex)
            logger.error(ex.__str__().strip())
            if status_conn: status_conn.close()
            if table_conn: table_conn.close()
2763
            self.table_expand_error = True
2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787
            return

        # validate table hasn't been dropped
        start_time = None
        try:
            (schema_name, table_name) = self.table.fq_name.split('.')
            sql = """select * from pg_class c, pg_namespace n
            where c.relname = '%s' and n.oid = c.relnamespace and n.nspname='%s'""" % (table_name, schema_name)

            cursor = dbconn.execSQL(table_conn, sql)

            if cursor.rowcount == 0:
                logger.info('%s.%s no longer exists in database %s' % (schema_name.decode('utf-8'),
                                                                       table_name.decode('utf-8'),
                                                                       self.table.dbname.decode('utf-8')))

                self.table.mark_does_not_exist(status_conn, datetime.datetime.now())
                status_conn.close()
                table_conn.close()
                return
            else:
                # Set conn for  cancel
                self.cancel_conn = table_conn
                start_time = datetime.datetime.now()
2788
                if not self.options.simple_progress:
2789
                    self.table.mark_started(status_conn, table_conn, start_time, self.cancel_flag)
2790

2791 2792
                table_exp_success = self.table.expand(table_conn, self.cancel_flag)

2793 2794
        except Exception, ex:
            if ex.__str__().find('canceling statement due to user request') == -1 and not self.cancel_flag:
2795
                self.table_expand_error = True
2796
                if self.options.verbose:
2797 2798 2799 2800 2801
                    logger.exception(ex)
                logger.error('Table %s.%s failed to expand: %s' % (self.table.dbname.decode('utf-8'),
                                                                   self.table.fq_name.decode('utf-8'),
                                                                   ex.__str__().strip()))
            else:
2802
                logger.info('ALTER TABLE of %s.%s canceled' % (
S
Shoaib Lari 已提交
2803
                    self.table.dbname.decode('utf-8'), self.table.fq_name.decode('utf-8')))
2804 2805 2806

        if table_exp_success:
            end_time = datetime.datetime.now()
2807 2808 2809 2810
            # update metadata
            logger.info(
                "Finished expanding %s.%s" % (self.table.dbname.decode('utf-8'), self.table.fq_name.decode('utf-8')))
            self.table.mark_finished(status_conn, start_time, end_time)
2811
        elif not self.options.simple_progress:
2812
            logger.info("Reseting status_detail for %s.%s" % (
S
Shoaib Lari 已提交
2813
                self.table.dbname.decode('utf-8'), self.table.fq_name.decode('utf-8')))
2814 2815
            self.table.reset_started(status_conn)

2816
        # disconnect
2817 2818 2819
        status_conn.close()
        table_conn.close()

2820 2821
    def set_results(self, results):
        raise ExecutionError("TODO:  must implement", None)
2822 2823

    def get_results(self):
2824
        raise ExecutionError("TODO:  must implement", None)
2825 2826

    def was_successful(self):
2827
        raise ExecutionError("TODO:  must implement", None)
2828

2829 2830
    def validate(self, expected_rc=0):
        raise ExecutionError("TODO:  must implement", None)
2831 2832


2833
# ------------------------------- UI Help --------------------------------
2834
def read_hosts_file(hosts_file):
2835 2836
    new_hosts = []
    try:
2837
        f = open(hosts_file, 'r')
2838 2839 2840 2841 2842 2843 2844 2845 2846 2847
        try:
            for l in f:
                if l.strip().startswith('#') or l.strip() == '':
                    continue

                new_hosts.append(l.strip())

        finally:
            f.close()
    except IOError:
2848
        raise ExpansionError('Hosts file %s not found' % hosts_file)
2849 2850 2851 2852

    return new_hosts


2853
def interview_setup(gparray, options):
2854
    help = """
2855 2856 2857 2858
System Expansion is used to add segments to an existing GPDB array.
gpexpand did not detect a System Expansion that is in progress.

Before initiating a System Expansion, you need to provision and burn-in
2859 2860
the new hardware.  Please be sure to run gpcheckperf to make sure the
new hardware is working properly.
2861 2862 2863

Please refer to the Admin Guide for more information."""

2864
    if not ask_yesno(help, "Would you like to initiate a new System Expansion", 'N'):
2865 2866 2867
        logger.info("User Aborted. Exiting...")
        sys.exit(0)

2868
    help = """
2869 2870 2871
This utility can handle some expansion scenarios by asking a few questions.
More complex expansions can be done by providing an input file with
the --input <file>.  Please see the docs for the format of this file. """
M
Marbin Tan 已提交
2872

2873 2874
    standard, message = gparray.isStandardArray()
    if standard == False:
2875
        help = help + """
2876 2877 2878

       The current system appears to be non-standard.
       """
2879 2880
        help = help + message
        help = help + """
2881 2882
       gpexpand may not be able to symmetrically distribute the new segments appropriately.
       It is recommended that you specify your own input file with appropriate values."""
2883 2884 2885
        if not ask_yesno(help, "Are you sure you want to continue with this gpexpand session?", 'N'):
            logger.info("User Aborted. Exiting...")
            sys.exit(0)
M
Marbin Tan 已提交
2886

2887 2888 2889 2890 2891 2892
    help = help + """

We'll now ask you a few questions to try and build this file for you.
You'll have the opportunity to save this file and inspect it/modify it
before continuing by re-running this utility and providing the input file. """

2893 2894
    def datadir_validator(input_value, *args):
        if not input_value or input_value.find(' ') != -1 or input_value == '':
2895 2896
            return None
        else:
2897
            return input_value
2898 2899

    if options.hosts_file:
2900
        new_hosts = read_hosts_file(options.hosts_file)
2901
    else:
2902 2903 2904 2905
        new_hosts = ask_list(None,
                             "\nEnter a comma separated list of new hosts you want\n" \
                             "to add to your array.  Do not include interface hostnames.\n" \
                             "**Enter a blank line to only add segments to existing hosts**", [])
2906 2907 2908 2909 2910 2911 2912 2913 2914
        new_hosts = [host.strip() for host in new_hosts]

    num_new_hosts = len(new_hosts)

    mirror_type = 'none'

    if gparray.get_mirroring_enabled():
        if num_new_hosts < 2:
            raise ExpansionError('You must be adding two or more hosts when expanding a system with mirroring enabled.')
2915 2916 2917 2918 2919 2920 2921 2922
        mirror_type = ask_string(
            "\nYou must now specify a mirroring strategy for the new hosts.  Spread mirroring places\n" \
            "a given hosts mirrored segments each on a separate host.  You must be \n" \
            "adding more hosts than the number of segments per host to use this. \n" \
            "Grouped mirroring places all of a given hosts segments on a single \n" \
            "mirrored host.  You must be adding at least 2 hosts in order to use this.\n\n",
            "What type of mirroring strategy would you like?",
            'grouped', ['spread', 'grouped'])
2923 2924 2925 2926 2927 2928 2929 2930 2931 2932 2933 2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944 2945 2946 2947 2948 2949 2950 2951 2952 2953

    try:
        gparray.addExpansionHosts(new_hosts, mirror_type)
        gparray.validateExpansionSegs()
    except Exception, ex:
        num_new_hosts = 0
        if ex.__str__() == 'No new hosts to add':
            print
            print '** No hostnames were given that do not already exist in the **'
            print '** array. Additional segments will be added existing hosts. **'
        else:
            raise

    help = """
    By default, new hosts are configured with the same number of primary
    segments as existing hosts.  Optionally, you can increase the number
    of segments per host.

    For example, if existing hosts have two primary segments, entering a value
    of 2 will initialize two additional segments on existing hosts, and four
    segments on new hosts.  In addition, mirror segments will be added for
    these new primary segments if mirroring is enabled.
    """
    num_new_datadirs = ask_int(help, "How many new primary segments per host do you want to add?", None, 0, 0, 128)

    if num_new_datadirs > 0:
        new_datadirs = []
        new_fsDirs = []
        new_mirrordirs = []
        new_mirrorFsDirs = []

2954
        gpFSobjList = gparray.getFilespaces(includeSystemFilespace=False)
M
Marbin Tan 已提交
2955

2956 2957
        for i in range(1, num_new_datadirs + 1):
            new_datadir = ask_input(None, 'Enter new primary data directory %d' % i, '',
2958 2959
                                    '/data/gpdb_p%d' % i, datadir_validator, None)
            new_datadirs.append(new_datadir.strip())
M
Marbin Tan 已提交
2960

2961 2962
            fsDict = {}
            for fsObj in gpFSobjList:
2963 2964 2965 2966 2967 2968 2969 2970 2971
                # Prompt the user for a location for each filespace
                fsLoc = ask_input(None
                                  , 'Enter new file space location for file space name: %s' % fsObj.getName()
                                  , ''
                                  , ''
                                  , datadir_validator
                                  , None
                                  )
                fsDict[fsObj.getOid()] = fsLoc.strip()
2972 2973 2974
            new_fsDirs.append(fsDict)

        if len(new_datadirs) != num_new_datadirs:
2975 2976
            raise ExpansionError(
                'The number of data directories entered does not match the number of primary segments added')
2977 2978

        if gparray.get_mirroring_enabled():
2979 2980
            for i in range(1, num_new_datadirs + 1):
                new_mirrordir = ask_input(None, 'Enter new mirror data directory %d' % i, '',
2981 2982
                                          '/data/gpdb_m%d' % i, datadir_validator, None)
                new_mirrordirs.append(new_mirrordir.strip())
M
Marbin Tan 已提交
2983

2984 2985
                fsDict = {}
                for fsObj in gpFSobjList:
2986 2987 2988 2989 2990 2991 2992 2993 2994
                    # Prompt the user for a location for each filespace
                    fsLoc = ask_input(None
                                      , 'Enter new file space location for file space name: %s' % fsObj.getName()
                                      , ''
                                      , ''
                                      , datadir_validator
                                      , None
                                      )
                    fsDict[fsObj.getOid()] = fsLoc.strip()
M
Marbin Tan 已提交
2995 2996
                new_mirrorFsDirs.append(fsDict)

2997
            if len(new_mirrordirs) != num_new_datadirs:
2998 2999 3000 3001 3002 3003 3004 3005 3006
                raise ExpansionError(
                    'The number of new mirror data directories entered does not match the number of segments added')

        gparray.addExpansionDatadirs(datadirs=new_datadirs
                                     , mirrordirs=new_mirrordirs
                                     , mirror_type=mirror_type
                                     , fs_dirs=new_fsDirs
                                     , fs_mirror_dirs=new_mirrorFsDirs
                                     )
3007 3008 3009 3010
        try:
            gparray.validateExpansionSegs()
        except Exception, ex:
            if ex.__str__().find('Port') == 0:
3011 3012
                raise ExpansionError(
                    'Current primary and mirror ports are contiguous.  The input file for gpexpand will need to be created manually.')
3013 3014 3015 3016 3017
    elif num_new_hosts == 0:
        raise ExpansionError('No new hosts or segments were entered.')

    print "\nGenerating configuration file...\n"

3018 3019
    outfile = _gp_expand.generate_inputfile()
    outFilespaceFileName = _gp_expand.generate_filespaces_inputfile(outFileNamePrefix=outfile)
3020 3021 3022

    outFileStr = ""
    if outfile != None:
3023
        outFileStr = """\nInput configuration files were written to '%s' and '%s'.""" % (outfile, outFilespaceFileName)
3024
    else:
3025
        outFileStr = """\nInput configuration file was written to '%s'.""" % (outfile)
3026 3027 3028 3029 3030 3031 3032

    print outFileStr
    print """Please review the file and make sure that it is correct then re-run
with: gpexpand -i %s %s
                """ % (outfile, '-D %s' % options.database if options.database else '')


3033
def sig_handler(sig):
3034 3035
    if _gp_expand is not None:
        _gp_expand.shutdown()
3036 3037 3038 3039 3040 3041 3042 3043

    signal.signal(signal.SIGTERM, signal.SIG_DFL)
    signal.signal(signal.SIGHUP, signal.SIG_DFL)

    # raise sig
    os.kill(os.getpid(), sig)


3044 3045 3046
# --------------------------------------------------------------------------
# Main
# --------------------------------------------------------------------------
3047 3048
def main(options, args, parser):
    global _gp_expand
3049

3050
    remove_pid = True
3051 3052 3053 3054
    try:
        # setup signal handlers so we can clean up correctly
        signal.signal(signal.SIGTERM, sig_handler)
        signal.signal(signal.SIGHUP, sig_handler)
3055

3056
        logger = get_default_logger()
3057
        setup_tool_logging(EXECNAME, getLocalHostname(), getUserName())
3058

3059
        options, args = validate_options(options, args, parser)
3060

3061 3062
        if options.verbose:
            enable_verbose_logging()
3063

3064 3065 3066 3067 3068 3069 3070
        if is_gpexpand_running(options.master_data_directory):
            logger.error('gpexpand is already running.  Only one instance')
            logger.error('of gpexpand is allowed at a time.')
            remove_pid = False
            sys.exit(1)
        else:
            create_pid_file(options.master_data_directory)
3071

3072 3073
        # prepare provider for updateSystemConfig
        gpEnv = GpMasterEnvironment(options.master_data_directory, True)
3074 3075
        configurationInterface.registerConfigurationProvider(
            configurationImplGpdb.GpConfigurationProviderUsingGpdbCatalog())
3076
        configurationInterface.getConfigurationProvider().initializeProvider(gpEnv.getMasterPort())
3077

3078 3079 3080
        dburl = dbconn.DbURL()
        if options.database:
            dburl.pgdb = options.database
3081

3082
        gpexpand_db_status = gpexpand.prepare_gpdb_state(logger, dburl, options)
3083

3084 3085
        # Get array configuration
        try:
3086
            gparray = GpArray.initFromCatalog(dburl, utility=True)
3087 3088 3089 3090 3091
        except DatabaseError, ex:
            logger.error('Failed to connect to database.  Make sure the')
            logger.error('Greenplum instance you wish to expand is running')
            logger.error('and that your environment is correct, then rerun')
            logger.error('gexpand ' + ' '.join(sys.argv[1:]))
3092
            gpexpand.get_gpdb_in_state(GPDB_STARTED, options)
3093
            sys.exit(1)
3094

3095
        _gp_expand = gpexpand(logger, gparray, dburl, options, parallel=options.parallel)
3096

3097 3098
        gpexpand_file_status = None
        if not gpexpand_db_status:
3099
            gpexpand_file_status = _gp_expand.get_state()
3100

3101
        if options.clean and gpexpand_db_status is not None:
3102
            _gp_expand.cleanup_schema(gpexpand_db_status)
3103 3104
            logger.info('Cleanup Finished.  exiting...')
            sys.exit(0)
3105

3106 3107 3108 3109 3110 3111 3112 3113 3114 3115
        if options.rollback:
            try:
                if gpexpand_db_status:
                    logger.error('A previous expansion is either in progress or has')
                    logger.error('completed.  Since the setup portion of the expansion')
                    logger.error('has finished successfully there is nothing to rollback.')
                    sys.exit(1)
                if gpexpand_file_status is None:
                    logger.error('There is no partially completed setup to rollback.')
                    sys.exit(1)
3116
                _gp_expand.rollback(dburl)
3117 3118
                logger.info('Rollback complete.  Greenplum Database can now be started')
                sys.exit(0)
3119
            except ExpansionError, e:
3120
                logger.error(e)
3121 3122
                sys.exit(1)

3123
        if gpexpand_db_status == 'SETUP DONE' or gpexpand_db_status == 'EXPANSION STOPPED':
3124
            if not _gp_expand.validate_max_connections():
3125
                raise ValidationError()
3126
            _gp_expand.perform_expansion()
3127 3128 3129
        elif gpexpand_db_status == 'EXPANSION STARTED':
            logger.info('It appears the last run of gpexpand did not exit cleanly.')
            logger.info('Resuming the expansion process...')
3130
            if not _gp_expand.validate_max_connections():
3131
                raise ValidationError()
3132
            _gp_expand.perform_expansion()
3133 3134 3135 3136 3137
        elif gpexpand_db_status == 'EXPANSION COMPLETE':
            logger.info('Expansion has already completed.')
            logger.info('If you want to expand again, run gpexpand -c to remove')
            logger.info('the gpexpand schema and begin a new expansion')
        elif gpexpand_db_status == None and gpexpand_file_status == None and options.filename:
3138
            if not _gp_expand.validate_unalterable_tables():
3139
                raise ValidationError()
3140
            if _gp_expand.check_unique_indexes():
3141 3142 3143 3144
                logger.info("Tables with unique indexes exist.  Until these tables are successfully")
                logger.info("redistributed, unique constraints may be violated.  For more information")
                logger.info("on this issue, see the Greenplum Database Administrator Guide")
                if not options.silent:
3145
                    if not ask_yesno(None, "Would you like to continue with System Expansion", 'N'):
3146
                        raise ValidationError()
3147 3148
            _gp_expand.validate_heap_checksums()
            newSegList = _gp_expand.read_input_files()
3149 3150 3151 3152 3153 3154 3155 3156 3157 3158 3159
            _gp_expand.addNewSegments(newSegList)
            _gp_expand.sync_packages()
            _gp_expand.start_prepare()
            _gp_expand.add_segments()
            _gp_expand.update_original_segments()
            _gp_expand.update_catalog()
            _gp_expand.move_filespaces()
            _gp_expand.configure_new_segment_filespaces()
            _gp_expand.cleanup_new_segments()
            _gp_expand.setup_schema()
            _gp_expand.prepare_schema()
3160 3161
            logger.info('Starting Greenplum Database')
            GpStart.local('gpexpand expansion prepare final start')
3162
            _gp_expand.sync_new_mirrors()
3163 3164 3165 3166 3167 3168
            logger.info('************************************************')
            logger.info('Initialization of the system expansion complete.')
            logger.info('To begin table expansion onto the new segments')
            logger.info('rerun gpexpand')
            logger.info('************************************************')
        elif options.filename is None and gpexpand_file_status == None:
3169
            interview_setup(gparray, options)
3170 3171 3172
        else:
            logger.error('The last gpexpand setup did not complete successfully.')
            logger.error('Please run gpexpand -r to rollback to the original state.')
3173

3174 3175
        logger.info("Exiting...")
        sys.exit(0)
3176

3177 3178
    except ValidationError:
        logger.info('Bringing Greenplum Database back online...')
3179 3180 3181
        if _gp_expand is not None:
            _gp_expand.shutdown()
        gpexpand.get_gpdb_in_state(GPDB_STARTED, options)
3182
        sys.exit()
3183
    except Exception, e:
3184
        if options and options.verbose:
3185
            logger.exception("gpexpand failed. exiting...")
3186
        else:
3187
            logger.error("gpexpand failed: %s \n\nExiting..." % e)
3188
        if _gp_expand is not None and _gp_expand.pastThePointOfNoReturn == True:
3189 3190
            logger.error(
                'gpexpand is past the point of rollback. Any remaining issues must be addressed outside of gpexpand.')
3191 3192
        if _gp_expand is not None:
            if gpexpand_db_status is None and _gp_expand.get_state() is None:
3193
                logger.info('Bringing Greenplum Database back online...')
3194
                gpexpand.get_gpdb_in_state(GPDB_STARTED, options)
3195
            else:
3196
                if _gp_expand.pastThePointOfNoReturn == False:
3197
                    logger.error('Please run \'gpexpand -r%s\' to rollback to the original state.' % (
S
Shoaib Lari 已提交
3198
                        '' if not options.database else ' -D %s' % options.database))
3199
            _gp_expand.shutdown()
3200 3201 3202
        sys.exit(3)
    except KeyboardInterrupt:
        # Disable SIGINT while we shutdown.
3203
        signal.signal(signal.SIGINT, signal.SIG_IGN)
3204

3205 3206
        if _gp_expand is not None:
            _gp_expand.shutdown()
3207

3208
        # Re-enabled SIGINT
3209
        signal.signal(signal.SIGINT, signal.default_int_handler)
3210

3211
        sys.exit('\nUser Interrupted')
3212 3213


3214 3215
    finally:
        try:
3216
            if remove_pid and options:
3217 3218 3219
                remove_pid_file(options.master_data_directory)
        except NameError:
            pass
M
Marbin Tan 已提交
3220

3221 3222 3223 3224
        if _gp_expand is not None:
            _gp_expand.halt_work()

if __name__ == '__main__':
3225 3226
    options, args, parser = parseargs()
    main(options, args, parser)