gpexpand 110.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
#!/usr/bin/env python
# Line too long            - pylint: disable=C0301
# Invalid name             - pylint: disable=C0103
#
# Copyright (c) Greenplum Inc 2008. All Rights Reserved.
#
from gppylib.mainUtils import getProgramName

import copy
import datetime
import os
import sys
import socket
import signal
import traceback
from time import strftime, sleep

try:
    from gppylib.commands.unix import *
    from gppylib.fault_injection import inject_fault
    from gppylib.commands.gp import *
    from gppylib.gparray import GpArray, MODE_CHANGELOGGING, STATUS_DOWN
    from gppylib.gpparseopts import OptParser, OptChecker
    from gppylib.gplog import *
    from gppylib.db import catalog
    from gppylib.db import dbconn
    from gppylib.userinput import *
    from gppylib.operations.startSegments import MIRROR_MODE_MIRRORLESS
    from gppylib.system import configurationInterface, configurationImplGpdb
    from gppylib.system.environment import GpMasterEnvironment
    from pygresql.pgdb import DatabaseError
    from pygresql import pg
    from gppylib.gpcatalog import MASTER_ONLY_TABLES
    from gppylib.operations.package import SyncPackages
    from gppylib.operations.utils import ParallelOperation
    from gppylib.parseutils import line_reader, parse_gpexpand_segment_line, \
        canonicalize_address
38
    from gppylib.heapchecksum import HeapChecksum
39 40 41 42 43

except ImportError, e:
    sys.exit('ERROR: Cannot import modules.  Please check that you have sourced greenplum_path.sh.  Detail: ' + str(e))

# constants
44 45
MAX_PARALLEL_EXPANDS = 96
MAX_BATCH_SIZE = 128
46

47 48 49
GPDB_STOPPED = 1
GPDB_STARTED = 2
GPDB_UTILITY = 3
50 51 52

SEGMENT_CONFIGURATION_BACKUP_FILE = "gpexpand.gp_segment_configuration"

53 54 55
#global var
_gp_expand = None

56 57 58 59 60
description = ("""
Adds additional segments to a pre-existing GPDB Array.
""")

_help = ["""
61
The input file should be a plain text file with a line for each segment
62 63 64 65
to add with the format:

  <hostname>:<port>:<data_directory>:<dbid>:<content>:<definedprimary>
""",
66 67 68 69 70
         """
         If an input file is not specified, gpexpand will ask a series of questions
         and create one.
         """,
         ]
71 72 73 74 75 76 77

_TODO = ["""

Remaining TODO items:
====================
""",

78
         """* smarter heuristics on setting ranks. """,
79

80
         """* make sure system isn't in "readonly mode" during setup. """,
81

82 83 84 85 86
         """* need a startup validation where we check the status detail
             with gp_distribution_policy and make sure that our book
             keeping matches reality. we don't have a perfect transactional
             model since the tables can be in a different database from
             where the gpexpand schema is kept. """,
87

88 89 90
         """* currently requires that GPHOME and PYTHONPATH be set on all of the remote hosts of
              the system.  should get rid of this requirement. """
         ]
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107

_usage = """[-f hosts_file] [-D database_name]

gpexpand -i input_file [-D database_name] [-B batch_size] [-V] [-t segment_tar_dir] [-S]

gpexpand [-d duration[hh][:mm[:ss]] | [-e 'YYYY-MM-DD hh:mm:ss']]
         [-a] [-n parallel_processes] [-D database_name]

gpexpand -r [-D database_name]

gpexpand -c [-D database_name]

gpexpand -? | -h | --help | --verbose | -v"""

EXECNAME = os.path.split(__file__)[-1]


108
# ----------------------- Command line option parser ----------------------
109 110 111

def parseargs():
    parser = OptParser(option_class=OptChecker,
112 113
                       description=' '.join(description.split()),
                       version='%prog version $Revision$')
114 115 116 117
    parser.setHelp(_help)
    parser.set_usage('%prog ' + _usage)
    parser.remove_option('-h')

118 119
    parser.add_option('-c', '--clean', action='store_true',
                      help='remove the expansion schema.')
120 121 122 123
    parser.add_option('-r', '--rollback', action='store_true',
                      help='rollback failed expansion setup.')
    parser.add_option('-a', '--analyze', action='store_true',
                      help='Analyze the expanded table after redistribution.')
124
    parser.add_option('-d', '--duration', type='duration', metavar='[h][:m[:s]]',
125
                      help='duration from beginning to end.')
126 127 128 129
    parser.add_option('-e', '--end', type='datetime', metavar='datetime',
                      help="ending date and time in the format 'YYYY-MM-DD hh:mm:ss'.")
    parser.add_option('-i', '--input', dest="filename",
                      help="input expansion configuration file.", metavar="FILE")
130 131
    parser.add_option('-f', '--hosts-file', metavar='<hosts_file>',
                      help='file containing new host names used to generate input file')
132
    parser.add_option('-D', '--database', dest='database',
133
                      help='Database to create the gpexpand schema and tables in.  If this ' \
134 135
                           'option is not given, PGDATABASE will be used.  The template1, ' \
                           'template0 and postgres databases cannot be used.')
136 137 138
    parser.add_option('-B', '--batch-size', type='int', default=16, metavar="<batch_size>",
                      help='Expansion configuration batch size. Valid values are 1-%d' % MAX_BATCH_SIZE)
    parser.add_option('-n', '--parallel', type="int", default=1, metavar="<parallel_processes>",
139 140
                      help='number of tables to expand at a time. Valid values are 1-%d.' % MAX_PARALLEL_EXPANDS)
    parser.add_option('-v', '--verbose', action='store_true',
141
                      help='debug output.')
142
    parser.add_option('-S', '--simple-progress', action='store_true',
143
                      help='show simple progress.')
144
    parser.add_option('-t', '--tardir', default='.', metavar="FILE",
145 146
                      help='Tar file directory.')
    parser.add_option('-h', '-?', '--help', action='help',
147
                      help='show this help message and exit.')
148 149 150 151
    parser.add_option('-s', '--silent', action='store_true',
                      help='Do not prompt for confirmation to proceed on warnings')
    parser.add_option('--usage', action="briefhelp")

152
    parser.set_defaults(verbose=False, filters=[], slice=(None, None))
153 154 155

    # Parse the command line arguments
    (options, args) = parser.parse_args()
156
    return options, args, parser
157

158
def validate_options(options, args, parser):
159 160 161 162
    if len(args) > 0:
        logger.error('Unknown argument %s' % args[0])
        parser.exit()

163
    # -n sanity check
164 165 166 167 168
    if options.parallel > MAX_PARALLEL_EXPANDS or options.parallel < 1:
        logger.error('Invalid argument.  parallel value must be >= 1 and <= %d' % MAX_PARALLEL_EXPANDS)
        parser.print_help()
        parser.exit()

169
    proccount = os.environ.get('GP_MGMT_PROCESS_COUNT')
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
    if options.batch_size == 16 and proccount is not None:
        options.batch_size = int(proccount)

    if options.batch_size < 1 or options.batch_size > 128:
        logger.error('Invalid argument.  -B value must be >= 1 and <= %s' % MAX_BATCH_SIZE)
        parser.print_help()
        parser.exit()

    # OptParse can return date instead of datetime so we might need to convert
    if options.end and not isinstance(options.end, datetime.datetime):
        options.end = datetime.datetime.combine(options.end, datetime.time(0))

    if options.end and options.end < datetime.datetime.now():
        logger.error('End time occurs in the past')
        parser.print_help()
        parser.exit()

    if options.end and options.duration:
        logger.warn('Both end and duration options were given.')
        # Both a duration and an end time were given.
        if options.end > datetime.datetime.now() + options.duration:
            logger.warn('The duration argument will be used for the expansion end time.')
            options.end = datetime.datetime.now() + options.duration
        else:
            logger.warn('The end argument will be used for the expansion end time.')
    elif options.duration:
        options.end = datetime.datetime.now() + options.duration

    # -c and -r options are mutually exclusive
    if options.rollback and options.clean:
        rollbackOpt = "--rollback" if "--rollback" in sys.argv else "-r"
        cleanOpt = "--clean" if "--clean" in sys.argv else "-c"
        logger.error("%s and %s options cannot be specified together." % (rollbackOpt, cleanOpt))
        parser.exit()

    try:
        options.master_data_directory = get_masterdatadir()
        options.gphome = get_gphome()
    except GpError, msg:
        logger.error(msg)
        parser.exit()

    if not os.path.exists(options.master_data_directory):
213
        logger.error('Master data directory %s does not exist.' % options.master_data_directory)
214
        parser.exit()
215 216 217 218 219 220 221 222 223 224 225 226 227

    if options.database and (options.database.lower() == 'template0'
                             or options.database.lower() == 'template1'
                             or options.database.lower() == 'postgres'):
        logger.error('%s cannot be used to store the gpexpand schema and tables' % options.database)
        parser.exit()
    elif not options.database:
        options.database = os.getenv('PGDATABASE')

    options.pgport = int(os.getenv('PGPORT', 5432))

    return options, args

228 229

# -------------------------------------------------------------------------
230 231 232 233 234 235 236 237 238 239 240
# process information functions
def create_pid_file(master_data_directory):
    """Creates gpexpand pid file"""
    try:
        fp = open(master_data_directory + '/gpexpand.pid', 'w')
        fp.write(str(os.getpid()))
    except IOError:
        raise
    finally:
        if fp: fp.close()

241

242 243 244 245 246 247 248
def remove_pid_file(master_data_directory):
    """Removes gpexpand pid file"""
    try:
        os.unlink(master_data_directory + '/gpexpand.pid')
    except:
        pass

249

250 251 252 253 254 255 256 257 258 259
def is_gpexpand_running(master_data_directory):
    """Checks if there is another instance of gpexpand running"""
    is_running = False
    try:
        fp = open(master_data_directory + '/gpexpand.pid', 'r')
        pid = int(fp.readline().strip())
        fp.close()
        is_running = check_pid(pid)
    except IOError:
        pass
260
    except Exception:
261 262 263 264 265 266 267 268 269 270
        raise

    return is_running


def gpexpand_status_file_exists(master_data_directory):
    """Checks if gpexpand.pid exists"""
    return os.path.exists(master_data_directory + '/gpexpand.status')


271 272
# -------------------------------------------------------------------------
# expansion schema
273 274 275 276 277 278

undone_status = "NOT STARTED"
start_status = "IN PROGRESS"
done_status = "COMPLETED"
does_not_exist_status = 'NO LONGER EXISTS'

279
gpexpand_schema = 'gpexpand'
280 281 282
create_schema_sql = "CREATE SCHEMA " + gpexpand_schema
drop_schema_sql = "DROP schema IF EXISTS %s CASCADE" % gpexpand_schema

283 284
status_table = 'status'
status_table_sql = """CREATE TABLE %s.%s
285
                        ( status text,
286
                          updated timestamp ) """ % (gpexpand_schema, status_table)
287

288 289
status_detail_table = 'status_detail'
status_detail_table_sql = """CREATE TABLE %s.%s
290 291 292 293 294 295 296
                        ( dbname text,
                          fq_name text,
                          schema_oid oid,
                          table_oid oid,
                          distribution_policy smallint[],
                          distribution_policy_names text,
                          distribution_policy_coloids text,
P
Pengzhou Tang 已提交
297
                          distribution_policy_type text,
298
                          root_partition_name text,
299 300 301 302 303 304 305
                          storage_options text,
                          rank int,
                          status text,
                          expansion_started timestamp,
                          expansion_finished timestamp,
                          source_bytes numeric ) """ % (gpexpand_schema, status_detail_table)
# gpexpand views
306 307
progress_view = 'expansion_progress'
progress_view_simple_sql = """CREATE VIEW %s.%s AS
308 309 310 311 312 313
SELECT
    CASE status
        WHEN '%s' THEN 'Tables Expanded'
        WHEN '%s' THEN 'Tables Left'
    END AS Name,
    count(*)::text AS Value
314 315
FROM %s.%s GROUP BY status""" % (gpexpand_schema, progress_view,
                                 done_status, undone_status, gpexpand_schema, status_detail_table)
316

317
progress_view_sql = """CREATE VIEW %s.%s AS
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
SELECT
    CASE status
        WHEN '%s' THEN 'Tables Expanded'
        WHEN '%s' THEN 'Tables Left'
        WHEN '%s' THEN 'Tables In Progress'
    END AS Name,
    count(*)::text AS Value
FROM %s.%s GROUP BY status

UNION

SELECT
    CASE status
        WHEN '%s' THEN 'Bytes Done'
        WHEN '%s' THEN 'Bytes Left'
        WHEN '%s' THEN 'Bytes In Progress'
    END AS Name,
    SUM(source_bytes)::text AS Value
FROM %s.%s GROUP BY status

UNION

SELECT
    'Estimated Expansion Rate' AS Name,
    (SUM(source_bytes) / (1 + extract(epoch FROM (max(expansion_finished) - min(expansion_started)))) / 1024 / 1024)::text || ' MB/s' AS Value
FROM %s.%s
WHERE status = '%s'
AND
expansion_started > (SELECT updated FROM %s.%s WHERE status = '%s' ORDER BY updated DESC LIMIT 1)

UNION

SELECT
'Estimated Time to Completion' AS Name,
CAST((SUM(source_bytes) / (
SELECT 1 + SUM(source_bytes) / (1 + (extract(epoch FROM (max(expansion_finished) - min(expansion_started)))))
FROM %s.%s
WHERE status = '%s'
AND
expansion_started > (SELECT updated FROM %s.%s WHERE status = '%s' ORDER BY
updated DESC LIMIT 1)))::text || ' seconds' as interval)::text AS Value
FROM %s.%s
WHERE status = '%s'
  OR status = '%s'""" % (gpexpand_schema, progress_view,
                         done_status, undone_status, start_status,
                         gpexpand_schema, status_detail_table,
                         done_status, undone_status, start_status,
                         gpexpand_schema, status_detail_table,
                         gpexpand_schema, status_detail_table,
                         done_status,
                         gpexpand_schema, status_table,
                         'EXPANSION STARTED',
                         gpexpand_schema, status_detail_table,
                         done_status,
                         gpexpand_schema, status_table,
                         'EXPANSION STARTED',
                         gpexpand_schema, status_detail_table,
                         start_status, undone_status)

unalterable_table_sql = """
SELECT
    current_database() AS database,
    pg_catalog.quote_ident(nspname) || '.' ||
    pg_catalog.quote_ident(relname) AS table,
    attnum,
    attlen,
    attbyval,
    attstorage,
    attalign,
    atttypmod,
    attndims,
    reltoastrelid != 0 AS istoasted
FROM
    pg_catalog.pg_attribute,
    pg_catalog.pg_class,
    pg_catalog.pg_namespace
WHERE
    attisdropped
    AND attnum >= 0
    AND attrelid = pg_catalog.pg_class.oid
    AND relnamespace = pg_catalog.pg_namespace.oid
    AND (attlen, attbyval, attalign, attstorage) NOT IN
        (SELECT typlen, typbyval, typalign, typstorage
        FROM pg_catalog.pg_type
        WHERE typisdefined AND typtype='b' )
ORDER BY
    attrelid, attnum
"""

has_unique_index_sql = """
SELECT
    current_database() || '.' || pg_catalog.quote_ident(nspname) || '.' || pg_catalog.quote_ident(relname) AS table
FROM
    pg_class c,
    pg_namespace n,
    pg_index i
WHERE
  i.indrelid = c.oid
  AND c.relnamespace = n.oid
  AND i.indisunique
L
Larry Hamel 已提交
418
  AND n.nspname NOT IN ('pg_catalog', 'information_schema', 'pg_toast',
419 420 421
                        'pg_bitmapindex', 'pg_aoseg')
"""

422 423

# -------------------------------------------------------------------------
424
class InvalidStatusError(Exception): pass
425 426


427 428
class ValidationError(Exception): pass

429 430

# -------------------------------------------------------------------------
431 432 433 434 435 436
class GpExpandStatus():
    """Class that manages gpexpand status file.

    The status file is placed in the master data directory on both the master and
    the standby master.  it's used to keep track of where we are in the progression.
    """
437

438
    def __init__(self, logger, master_data_directory, master_mirror=None):
439 440 441 442 443 444 445 446
        self.logger = logger

        self._status_values = {'UNINITIALIZED': 1,
                               'EXPANSION_PREPARE_STARTED': 2,
                               'BUILD_SEGMENT_TEMPLATE_STARTED': 3,
                               'BUILD_SEGMENT_TEMPLATE_DONE': 4,
                               'BUILD_SEGMENTS_STARTED': 5,
                               'BUILD_SEGMENTS_DONE': 6,
J
Jialun 已提交
447 448 449 450 451 452 453
                               'UPDATE_CATALOG_STARTED': 7,
                               'UPDATE_CATALOG_DONE': 8,
                               'SETUP_EXPANSION_SCHEMA_STARTED': 9,
                               'SETUP_EXPANSION_SCHEMA_DONE': 10,
                               'PREPARE_EXPANSION_SCHEMA_STARTED': 11,
                               'PREPARE_EXPANSION_SCHEMA_DONE': 12,
                               'EXPANSION_PREPARE_DONE': 13
454
                               }
455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517
        self._status = []
        self._status_info = []
        self._master_data_directory = master_data_directory
        self._master_mirror = master_mirror
        self._status_filename = master_data_directory + '/gpexpand.status'
        self._status_standby_filename = master_data_directory + '/gpexpand.standby.status'
        self._fp = None
        self._fp_standby = None
        self._temp_dir = None
        self._input_filename = None
        self._gp_segment_configuration_backup = None

        if os.path.exists(self._status_filename):
            self._read_status_file()

    def _read_status_file(self):
        """Reads in an existing gpexpand status file"""
        self.logger.debug("Trying to read in a pre-existing gpexpand status file")
        try:
            self._fp = open(self._status_filename, 'a+')
            self._fp.seek(0)

            for line in self._fp:
                (status, status_info) = line.rstrip().split(':')
                if status == 'BUILD_SEGMENT_TEMPLATE_STARTED':
                    self._temp_dir = status_info
                elif status == 'BUILD_SEGMENTS_STARTED':
                    self._seg_tarfile = status_info
                elif status == 'BUILD_SEGMENTS_DONE':
                    self._number_new_segments = status_info
                elif status == 'EXPANSION_PREPARE_STARTED':
                    self._input_filename = status_info
                elif status == 'UPDATE_CATALOG_STARTED':
                    self._gp_segment_configuration_backup = status_info

                self._status.append(status)
                self._status_info.append(status_info)
        except IOError:
            raise

        if not self._status_values.has_key(self._status[-1]):
            raise InvalidStatusError('Invalid status file.  Unknown status %s' % self._status)

    def create_status_file(self):
        """Creates a new gpexpand status file"""
        try:
            self._fp = open(self._status_filename, 'w')
            if self._master_mirror:
                self._fp_standby = open(self._status_standby_filename, 'w')
                self._fp_standby.write('UNINITIALIZED:None\n')
                self._fp_standby.flush()
            self._fp.write('UNINITIALIZED:None\n')
            self._fp.flush()
            self._status.append('UNINITIALIZED')
            self._status_info.append('None')
        except IOError:
            raise

        self._sync_status_file()

    def _sync_status_file(self):
        """Syncs the gpexpand status file with the master mirror"""
        if self._master_mirror:
518 519 520 521
            cpCmd = Scp('gpexpand copying status file to master mirror',
                        srcFile=self._status_standby_filename,
                        dstFile=self._status_filename,
                        dstHost=self._master_mirror.getSegmentHostName())
522 523 524 525 526 527
            cpCmd.run(validateAfter=True)

    def set_status(self, status, status_info=None):
        """Sets the current status.  gpexpand status must be set in
           proper order.  Any out of order status result in an
           InvalidStatusError exception"""
528
        self.logger.debug("Transitioning from %s to %s" % (self._status[-1], status))
529 530 531 532 533 534 535

        if not self._fp:
            raise InvalidStatusError('The status file is invalid and cannot be written to')
        if not self._status_values.has_key(status):
            raise InvalidStatusError('%s is an invalid gpexpand status' % status)
        # Only allow state transitions forward or backward 1
        if self._status and \
536 537
                        self._status_values[status] != self._status_values[self._status[-1]] + 1:
            raise InvalidStatusError('Invalid status transition from %s to %s' % (self._status[-1], status))
538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571
        if self._master_mirror:
            self._fp_standby.write('%s:%s\n' % (status, status_info))
            self._fp_standby.flush()
            self._sync_status_file()
        self._fp.write('%s:%s\n' % (status, status_info))
        self._fp.flush()
        self._status.append(status)
        self._status_info.append(status_info)

    def get_current_status(self):
        """Gets the current status that has been written to the gpexpand
           status file"""
        if (len(self._status) > 0 and len(self._status_info) > 0):
            return (self._status[-1], self._status_info[-1])
        else:
            return (None, None)

    def get_status_history(self):
        """Gets the full status history"""
        return zip(self._status, self._status_info)

    def remove_status_file(self):
        """Closes and removes the gpexand status file"""
        if self._fp:
            self._fp.close()
            self._fp = None
        if self._fp_standby:
            self._fp_standby.close()
            self._fp_standby = None
        if os.path.exists(self._status_filename):
            os.unlink(self._status_filename)
        if os.path.exists(self._status_standby_filename):
            os.unlink(self._status_standby_filename)
        if self._master_mirror:
572 573 574
            RemoveFile.remote('gpexpand master mirror status file cleanup',
                              self._master_mirror.getSegmentHostName(),
                              self._status_filename)
575 576 577 578

    def remove_segment_configuration_backup_file(self):
        """ Remove the segment configuration backup file """
        self.logger.debug("Removing segment configuration backup file")
579 580 581
        if self._gp_segment_configuration_backup != None and os.path.exists(
                self._gp_segment_configuration_backup) == True:
            os.unlink(self._gp_segment_configuration_backup)
582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611

    def get_temp_dir(self):
        """Gets temp dir that was used during template creation"""
        return self._temp_dir

    def get_input_filename(self):
        """Gets input file that was used by expansion setup"""
        return self._input_filename

    def get_seg_tarfile(self):
        """Gets tar file that was used during template creation"""
        return self._seg_tarfile

    def get_number_new_segments(self):
        """ Gets the number of new segments added """
        return self._number_new_segments

    def get_gp_segment_configuration_backup(self):
        """Gets the filename of the gp_segment_configuration backup file
        created during expansion setup"""
        return self._gp_segment_configuration_backup

    def set_gp_segment_configuration_backup(self, filename):
        """Sets the filename of the gp_segment_configuration backup file"""
        self._gp_segment_configuration_backup = filename

    def is_standby(self):
        """Returns True if running on standby"""
        return os.path.exists(self._master_data_directory + self._status_standby_filename)

J
Jialun 已提交
612 613 614 615 616 617
    def can_rollback(self, status):
        """Return if it can rollback under current status"""
        if int(self._status_values[status]) > int(self._status_values['UPDATE_CATALOG_DONE']):
            return False
        return True

618 619

# -------------------------------------------------------------------------
620 621

class ExpansionError(Exception): pass
622 623


624 625
class SegmentTemplateError(Exception): pass

626 627

# -------------------------------------------------------------------------
628 629 630 631 632 633
class SegmentTemplate:
    """Class for creating, distributing and deploying new segments to an
    existing GPDB array"""

    def __init__(self, logger, statusLogger, pool,
                 gparray, masterDataDirectory,
634
                 dburl, conn, tempDir, batch_size,
635
                 segTarDir='.', schemaTarFile='gpexpand_schema.tar'):
636 637 638 639 640
        self.logger = logger
        self.statusLogger = statusLogger
        self.pool = pool
        self.gparray = gparray
        self.tempDir = tempDir
641
        self.batch_size = batch_size
642 643 644 645 646 647 648 649 650
        self.dburl = dburl
        self.conn = conn
        self.masterDataDirectory = masterDataDirectory
        self.schema_tar_file = schemaTarFile
        self.maxDbId = self.gparray.get_max_dbid()
        self.segTarDir = segTarDir
        self.segTarFile = os.path.join(segTarDir, self.schema_tar_file)

        hosts = []
651
        for seg in self.gparray.getExpansionSegDbList():
652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692
            hosts.append(seg.getSegmentHostName())
        self.hosts = SegmentTemplate.consolidate_hosts(pool, hosts)
        logger.debug('Hosts: %s' % self.hosts)

    @staticmethod
    def consolidate_hosts(pool, hosts):
        tmpHosts = {}
        consolidatedHosts = []

        for host in hosts:
            tmpHosts[host] = 0

        for host in tmpHosts.keys():
            hostnameCmd = Hostname('gpexpand associating hostnames with segments', ctxt=REMOTE, remoteHost=host)
            pool.addCommand(hostnameCmd)

        pool.join()

        finished_cmds = pool.getCompletedItems()

        for cmd in finished_cmds:
            if not cmd.was_successful():
                raise SegmentTemplateError(cmd.get_results())
            if cmd.get_hostname() not in consolidatedHosts:
                logger.debug('Adding %s to host list' % cmd.get_hostname())
                consolidatedHosts.append(cmd.get_hostname())

        return consolidatedHosts

    def build_segment_template(self):
        """Builds segment template tar file"""
        self.statusLogger.set_status('BUILD_SEGMENT_TEMPLATE_STARTED', self.tempDir)
        self._create_template()
        self._fixup_template()
        self._tar_template()
        self.statusLogger.set_status('BUILD_SEGMENT_TEMPLATE_DONE')

    def build_new_segments(self):
        """Deploys the template tar file and configures the new segments"""
        self.statusLogger.set_status('BUILD_SEGMENTS_STARTED', self.segTarFile)
        self._distribute_template()
693
        # FIXME: truncate the qd only tables' underlying files instead of delete the tuples
694 695 696 697 698 699 700 701
        self._configure_new_segments()
        numNewSegments = len(self.gparray.getExpansionSegDbList())
        self.statusLogger.set_status('BUILD_SEGMENTS_DONE', numNewSegments)

    def _create_template(self):
        """Creates the schema template that is used by new segments"""
        self.logger.info('Creating segment template')

702
        MakeDirectory.local('gpexpand create temp dir', self.tempDir)
703 704 705 706 707

        self._select_src_segment()

        self.oldSegCount = self.gparray.get_segment_count()

708 709
        self.conn.close()

710 711
        try:
            masterSeg = self.gparray.master
712
            masterSeg.createTemplate(dstDir=self.tempDir)
713 714 715 716 717 718
        except Exception, msg:
            raise SegmentTemplateError(msg)

    def _select_src_segment(self):
        """Gets a segment to use as a source for pg_hba.conf
        and postgresql.conf files"""
719 720 721 722 723 724 725
        segPair = self.gparray.segmentPairs[0]
        if segPair.primaryDB.valid:
            self.srcSegHostname = segPair.primaryDB.getSegmentHostName()
            self.srcSegDataDir = segPair.primaryDB.getSegmentDataDirectory()
        elif segPair.mirrorDB and segPair.mirrorDB.valid:
            self.srcSegHostname = segPair.mirrorDB.getSegmentHostName()
            self.srcSegDataDir = segPair.mirrorDB.getSegmentDataDirectory()
726 727 728 729 730 731 732 733 734 735 736 737 738
        else:
            raise SegmentTemplateError("no valid segdb for content=0 to use as a template")

    def _distribute_template(self):
        """Distributes the template tar file to the new segments and expands it"""
        self.logger.info('Distributing template tar file to new hosts')

        self._distribute_tarfile()

    def _distribute_tarfile(self):
        """Distributes template tar file to hosts"""
        for host in self.hosts:
            logger.debug('Copying tar file to %s' % host)
739 740 741 742
            cpCmd = Scp(name='gpexpand distribute tar file to new hosts',
                        srcFile=self.schema_tar_file,
                        dstFile=self.segTarDir,
                        dstHost=host)
743 744 745 746 747
            self.pool.addCommand(cpCmd)

        self.pool.join()
        self.pool.check_results()

748 749 750
    def _start_new_primary_segments(self):
        newSegments = self.gparray.getExpansionSegDbList()
        for seg in newSegments:
S
Shoaib Lari 已提交
751
            if seg.isSegmentMirror():
752 753 754 755 756 757 758 759 760
                continue
            """ Start all the new segments in utilty mode. """
            segStartCmd = SegmentStart(
                name="Starting new segment dbid %s on host %s." % (str(seg.getSegmentDbId()), seg.getSegmentHostName())
                , gpdb=seg
                , numContentsInCluster=0  # Starting seg on it's own.
                , era=None
                , mirrormode=MIRROR_MODE_MIRRORLESS
                , utilityMode=True
761
                , specialMode='convertMasterDataDirToSegment'
762 763
                , ctxt=REMOTE
                , remoteHost=seg.getSegmentHostName()
S
Shoaib Lari 已提交
764
                , pg_ctl_wait=True
765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786
                , timeout=SEGMENT_TIMEOUT_DEFAULT)
            self.pool.addCommand(segStartCmd)
        self.pool.join()
        self.pool.check_results()

    def _stop_new_primary_segments(self):
        newSegments = self.gparray.getExpansionSegDbList()
        for seg in newSegments:
            if seg.isSegmentMirror() == True:
                continue
            segStopCmd = SegmentStop(
                name="Stopping new segment dbid %s on host %s." % (str(seg.getSegmentDbId), seg.getSegmentHostName())
                , dataDir=seg.getSegmentDataDirectory()
                , mode='smart'
                , nowait=False
                , ctxt=REMOTE
                , remoteHost=seg.getSegmentHostName()
            )
            self.pool.addCommand(segStopCmd)
        self.pool.join()
        self.pool.check_results()

787 788 789 790 791
    def _configure_new_segments(self):
        """Configures new segments.  This includes modifying the postgresql.conf file
        and setting up the gp_id table"""

        self.logger.info('Configuring new segments (primary)')
792 793
        new_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(self.gparray.getExpansionSegDbList(),
                                                                             primaryMirror='primary')
S
Shoaib Lari 已提交
794
        self.logger.info(new_segment_info)
795
        for host in iter(new_segment_info):
S
Shoaib Lari 已提交
796
            segCfgCmd = ConfigureNewSegment(name='gpexpand configure new segments', confinfo=new_segment_info[host],
797
                                            tarFile=self.segTarFile, newSegments=True,
798
                                            verbose=gplog.logging_is_verbose(), batchSize=self.batch_size,
799
                                            ctxt=REMOTE, remoteHost=host)
800 801 802 803
            self.pool.addCommand(segCfgCmd)

        self.pool.join()
        self.pool.check_results()
M
Marbin Tan 已提交
804

805 806
        self._start_new_primary_segments()

807
        self.logger.info('Configuring new segments (mirror)')
808

S
Shoaib Lari 已提交
809
        # This loop enriches segments which are mirrors with two fields, primaryHostname and primarySegmentPort.
810 811
        mirrorsList = []
        for segPair in self.gparray.getExpansionSegPairList():
S
Shoaib Lari 已提交
812 813
            if segPair.mirrorDB is None:
                continue
814 815 816 817 818 819 820
            mirror = segPair.mirrorDB
            mirror.primaryHostname = segPair.primaryDB.getSegmentHostName()
            mirror.primarySegmentPort = segPair.primaryDB.getSegmentPort()
            mirrorsList.append(mirror)

        new_segment_info = ConfigureNewSegment.buildSegmentInfoForNewSegment(mirrorsList, primaryMirror='mirror')

821
        for host in iter(new_segment_info):
S
Shoaib Lari 已提交
822
            segCfgCmd = ConfigureNewSegment(name='gpexpand configure new segments', confinfo=new_segment_info[host],
823
                                            tarFile=self.schema_tar_file, newSegments=True,
824
                                            verbose=gplog.logging_is_verbose(), batchSize=self.batch_size,
S
Shoaib Lari 已提交
825
                                            ctxt=REMOTE, remoteHost=host, validationOnly=False)
826 827 828 829 830
            self.pool.addCommand(segCfgCmd)

        self.pool.join()
        self.pool.check_results()

831 832
        self._stop_new_primary_segments()

833 834 835 836 837 838 839
    def _fixup_template(self):
        """Copies postgresql.conf and pg_hba.conf files from a valid segment on the system.
        Then modifies the template copy of pg_hba.conf"""

        self.logger.info('Copying postgresql.conf from existing segment into template')

        localHostname = self.gparray.master.getSegmentHostName()
840 841 842 843 844
        cmdName = 'gpexpand copying postgresql.conf to %s:%s/postgresql.conf' \
                  % (self.srcSegHostname, self.srcSegDataDir)
        cpCmd = Scp(name=cmdName, srcFile=self.srcSegDataDir + '/postgresql.conf',
            dstFile=self.tempDir, dstHost=localHostname, ctxt=REMOTE,
            remoteHost=self.srcSegHostname)
845 846 847
        cpCmd.run(validateAfter=True)

        self.logger.info('Copying pg_hba.conf from existing segment into template')
848 849 850 851 852
        cmdName = 'gpexpand copy pg_hba.conf to %s:%s/pg_hba.conf' \
                  % (self.srcSegHostname, self.srcSegDataDir)
        cpCmd = Scp(name=cmdName, srcFile=self.srcSegDataDir + '/pg_hba.conf',
                    dstFile=self.tempDir, dstHost=localHostname,ctxt=REMOTE,
                    remoteHost=self.srcSegHostname)
853 854 855
        cpCmd.run(validateAfter=True)

        # Don't need log files and gpperfmon files in template.
856 857
        rmCmd = RemoveDirectory('gpexpand remove gppermfon data from template',
                                self.tempDir + '/gpperfmon/data')
858
        rmCmd.run(validateAfter=True)
859 860
        rmCmd = RemoveDirectoryContents('gpexpand remove logs from template',
                                        self.tempDir + '/pg_log')
861 862
        rmCmd.run(validateAfter=True)

863
        # other files not needed
864
        rmCmd = RemoveFile('gpexpand remove postmaster.opt from template',
865 866
                            self.tempDir + '/postmaster.opts')
        rmCmd.run(validateAfter=True)
867
        rmCmd = RemoveFile('gpexpand remove postmaster.pid from template',
868 869
                            self.tempDir + '/postmaster.pid')
        rmCmd.run(validateAfter=True)
870
        rmCmd = RemoveGlob('gpexpand remove gpexpand files from template',
871 872 873 874 875 876 877 878 879 880 881 882 883
                            self.tempDir + '/gpexpand.*')
        rmCmd.run(validateAfter=True)

    def _tar_template(self):
        """Tars up the template files"""
        self.logger.info('Creating schema tar file')
        tarCmd = CreateTar('gpexpand tar segment template', self.tempDir, self.schema_tar_file)
        tarCmd.run(validateAfter=True)

    @staticmethod
    def cleanup_build_segment_template(tarFile, tempDir):
        """Reverts the work done by build_segment_template.  Deletes the temp
        directory and local tar file"""
884
        rmCmd = RemoveDirectory('gpexpand remove temp dir: %s' % tempDir, tempDir)
885
        rmCmd.run(validateAfter=True)
886
        rmCmd = RemoveFile('gpexpand remove segment template file', tarFile)
887 888 889 890 891 892 893 894 895 896 897 898 899 900
        rmCmd.run(validateAfter=True)

    @staticmethod
    def cleanup_build_new_segments(pool, tarFile, gparray, hosts=None, removeDataDirs=False):
        """Cleans up the work done by build_new_segments.  Deletes remote tar files and
        and removes remote data directories"""

        if not hosts:
            hosts = []
            for seg in gparray.getExpansionSegDbList():
                hosts.append(seg.getSegmentHostName())

        # Remove template tar file
        for host in hosts:
901 902
            rmCmd = RemoveFile('gpexpand remove segment template file on host: %s' % host,
                               tarFile, ctxt=REMOTE, remoteHost=host)
903 904 905 906 907
            pool.addCommand(rmCmd)

        if removeDataDirs:
            for seg in gparray.getExpansionSegDbList():
                hostname = seg.getSegmentHostName()
908 909 910 911
                datadir = seg.getSegmentDataDirectory()
                rmCmd = RemoveDirectory('gpexpand remove new segment data directory: %s:%s' % (hostname, datadir),
                                        datadir, ctxt=REMOTE, remoteHost=hostname)
                pool.addCommand(rmCmd)
912 913 914 915 916 917 918 919 920 921 922
        pool.join()
        pool.check_results()

    def cleanup(self):
        """Cleans up temporary files from the local system and new segment hosts"""

        self.logger.info('Cleaning up temporary template files')
        SegmentTemplate.cleanup_build_segment_template(self.schema_tar_file, self.tempDir)
        SegmentTemplate.cleanup_build_new_segments(self.pool, self.segTarFile, self.gparray, self.hosts)


923 924
# ------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------
925
class NewSegmentInput:
926
    def __init__(self, hostname, address, port, datadir, dbid, contentId, role):
927 928 929 930 931 932 933
        self.hostname = hostname
        self.address = address
        self.port = port
        self.datadir = datadir
        self.dbid = dbid
        self.contentId = contentId
        self.role = role
M
Marbin Tan 已提交
934

935

936 937
# ------------------------------------------------------------------------------------------------------
# ------------------------------------------------------------------------------------------------------
938
class gpexpand:
939
    def __init__(self, logger, gparray, dburl, options, parallel=1):
940 941 942
        self.pastThePointOfNoReturn = False
        self.logger = logger
        self.dburl = dburl
943
        self.options = options
944
        self.numworkers = parallel
945 946
        self.gparray = gparray
        self.unique_index_tables = {}
947
        self.conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8', allowSystemTableMods=True)
948
        self.old_segments = self.gparray.getSegDbList()
949 950 951 952 953
        if dburl.pgdb == 'template0' or dburl.pgdb == 'template1' or dburl.pgdb == 'postgres':
            raise ExpansionError("Invalid database '%s' specified.  Cannot use a template database.\n"
                                 "Please set the environment variable PGDATABASE to a different "
                                 "database or use the -D option to specify a database and re-run" % dburl.pgdb)

954 955 956 957
        datadir = self.gparray.master.getSegmentDataDirectory()
        self.statusLogger = GpExpandStatus(logger=logger,
                                           master_data_directory=datadir,
                                           master_mirror=self.gparray.standbyMaster)
958 959

        # Adjust batch size if it's too high given the number of segments
960 961 962 963
        seg_count = len(self.old_segments)
        if self.options.batch_size > seg_count:
            self.options.batch_size = seg_count
        self.pool = WorkerPool(numWorkers=self.options.batch_size)
964 965 966

        self.tempDir = self.statusLogger.get_temp_dir()
        if not self.tempDir:
967
            self.tempDir = createTempDirectoryName(self.options.master_data_directory, "gpexpand")
968 969 970 971
        self.queue = None
        self.segTemplate = None

    @staticmethod
972
    def prepare_gpdb_state(logger, dburl, options):
973 974 975 976
        """ Gets GPDB in the appropriate state for an expansion.
        This state will depend on if this is a new expansion setup,
        a continuation of a previous expansion or a rollback """
        # Get the database in the expected state for the expansion/rollback
977
        # If gpexpand status file exists ,the last run of gpexpand didn't finish properly
978 979 980
        status_file_exists = os.path.exists(options.master_data_directory + '/gpexpand.status')
        gpexpand_db_status = None

981
        if not status_file_exists:
982 983
            logger.info('Querying gpexpand schema for current expansion state')
            try:
984
                gpexpand_db_status = gpexpand.get_status_from_db(dburl, options)
985 986 987 988 989 990 991
            except Exception, e:
                raise Exception('Error while trying to query the gpexpand schema: %s' % e)
            logger.debug('Expansion status returned is %s' % gpexpand_db_status)

        return gpexpand_db_status

    @staticmethod
992
    def get_status_from_db(dburl, options):
993 994 995 996 997 998 999
        """Gets gpexpand status from the gpexpand schema"""
        status_conn = None
        gpexpand_db_status = None
        if get_local_db_mode(options.master_data_directory) == 'NORMAL':
            try:
                status_conn = dbconn.connect(dburl, encoding='UTF8')
                # Get the last status entry
1000
                cursor = dbconn.execSQL(status_conn, 'SELECT status FROM gpexpand.status ORDER BY updated DESC LIMIT 1')
1001 1002 1003
                if cursor.rowcount == 1:
                    gpexpand_db_status = cursor.fetchone()[0]

1004
            except Exception:
1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015
                # expansion schema doesn't exists or there was a connection failure.
                pass
            finally:
                if status_conn: status_conn.close()

        # make sure gpexpand schema doesn't exist since it wasn't in DB provided
        if not gpexpand_db_status:
            """
            MPP-14145 - If there's no discernable status, the schema must not exist.

            The checks in get_status_from_db claim to look for existence of the 'gpexpand' schema, but more accurately they're
M
Marbin Tan 已提交
1016 1017
            checking for non-emptiness of the gpexpand.status table. If the table were empty, but the schema did exist, gpexpand would presume
            a new expansion was taking place and it would try to CREATE SCHEMA later, which would fail. So, here, if this is the case, we error out.
1018 1019 1020 1021

            Note: -c/--clean will not necessarily work either, as it too has assumptions about the non-emptiness of the gpexpand schema.
            """
            with dbconn.connect(dburl, encoding='UTF8', utility=True) as conn:
1022
                count = dbconn.execSQLForSingleton(conn,
L
Larry Hamel 已提交
1023
                                                   "SELECT count(n.nspname) FROM pg_catalog.pg_namespace n WHERE n.nspname = 'gpexpand'")
1024
                if count > 0:
1025 1026
                    raise ExpansionError(
                        "Existing expansion state could not be determined, but a gpexpand schema already exists. Cannot proceed.")
1027 1028 1029 1030 1031 1032 1033

            # now determine whether gpexpand schema merely resides in another DB
            status_conn = dbconn.connect(dburl, encoding='UTF8')
            db_list = catalog.getDatabaseList(status_conn)
            status_conn.close()

            for db in db_list:
1034
                dbname = db[0]
1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047
                if dbname in ['template0', 'template1', 'postgres', dburl.pgdb]:
                    continue
                logger.debug('Looking for gpexpand schema in %s' % dbname.decode('utf-8'))
                test_url = copy.deepcopy(dburl)
                test_url.pgdb = dbname
                c = dbconn.connect(test_url, encoding='UTF8')
                try:
                    cursor = dbconn.execSQL(c, 'SELECT status FROM gpexpand.status ORDER BY updated DESC LIMIT 1')
                except:
                    # Not in here
                    pass
                else:
                    raise ExpansionError("""gpexpand schema exists in database %s, not in %s.
1048
Set PGDATABASE or use the -D option to specify the correct database to use.""" % (
S
Shoaib Lari 已提交
1049
                        dbname.decode('utf-8'), options.database))
1050 1051 1052 1053 1054 1055 1056 1057 1058 1059
                finally:
                    if c:
                        c.close()

        return gpexpand_db_status

    def validate_max_connections(self):
        try:
            conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
            max_connections = int(catalog.getSessionGUC(conn, 'max_connections'))
1060
        except DatabaseError, ex:
1061
            if self.options.verbose:
1062 1063 1064 1065 1066
                logger.exception(ex)
            logger.error('Failed to check max_connections GUC')
            if conn: conn.close()
            raise ex

1067 1068
        if max_connections < self.options.parallel * 2 + 1:
            self.logger.error('max_connections is too small to expand %d tables at' % self.options.parallel)
1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097
            self.logger.error('a time.  This will lead to connection errors.  Either')
            self.logger.error('reduce the value for -n passed to gpexpand or raise')
            self.logger.error('max_connections in postgresql.conf')
            return False

        return True

    def validate_unalterable_tables(self):
        conn = None
        unalterable_tables = []

        try:
            conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
            databases = catalog.getDatabaseList(conn)
            conn.close()

            tempurl = copy.deepcopy(self.dburl)
            for db in databases:
                if db[0] == 'template0':
                    continue
                self.logger.info('Checking database %s for unalterable tables...' % db[0].decode('utf-8'))
                tempurl.pgdb = db[0]
                conn = dbconn.connect(tempurl, utility=True, encoding='UTF8')
                cursor = dbconn.execSQL(conn, unalterable_table_sql)
                for row in cursor:
                    unalterable_tables.append(row)
                cursor.close()
                conn.close()

1098
        except DatabaseError, ex:
1099
            if self.options.verbose:
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141
                logger.exception(ex)
            logger.error('Failed to check for unalterable tables.')
            if conn: conn.close()
            raise ex

        if len(unalterable_tables) > 0:
            self.logger.error('The following tables cannot be altered because they contain')
            self.logger.error('dropped columns of user defined types:')
            for t in unalterable_tables:
                self.logger.error('\t%s.%s' % (t[0].decode('utf-8'), t[1].decode('utf-8')))
            self.logger.error('Please consult the documentation for instructions on how to')
            self.logger.error('correct this issue, then run gpexpand again')
            return False

        return True

    def check_unique_indexes(self):
        """ Checks if there are tables with unique indexes.
        Returns true if unique indexes exist"""

        conn = None
        has_unique_indexes = False

        try:
            conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
            databases = catalog.getDatabaseList(conn)
            conn.close()

            tempurl = copy.deepcopy(self.dburl)
            for db in databases:
                if db[0] == 'template0':
                    continue
                self.logger.info('Checking database %s for tables with unique indexes...' % db[0].decode('utf-8'))
                tempurl.pgdb = db[0]
                conn = dbconn.connect(tempurl, utility=True, encoding='UTF8')
                cursor = dbconn.execSQL(conn, has_unique_index_sql)
                for row in cursor:
                    has_unique_indexes = True
                    self.unique_index_tables[row[0]] = True
                cursor.close()
                conn.close()

1142
        except DatabaseError, ex:
1143
            if self.options.verbose:
1144 1145 1146 1147 1148 1149 1150
                logger.exception(ex)
            logger.error('Failed to check for unique indexes.')
            if conn: conn.close()
            raise ex

        return has_unique_indexes

1151
    def rollback(self, dburl):
1152 1153 1154 1155 1156 1157 1158 1159 1160
        """Rolls back and expansion setup that didn't successfully complete"""
        status_history = self.statusLogger.get_status_history()
        if not status_history:
            raise ExpansionError('No status history to rollback.')

        if (status_history[-1])[0] == 'EXPANSION_PREPARE_DONE':
            raise ExpansionError('Expansion preparation complete.  Nothing to rollback')

        for status in reversed(status_history):
J
Jialun 已提交
1161 1162 1163 1164
            if not self.statusLogger.can_rollback(status[0]):
                raise ExpansionError('Catalog has been changed, the cluster can not rollback.')

            elif status[0] == 'BUILD_SEGMENT_TEMPLATE_STARTED':
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186
                if self.statusLogger.is_standby():
                    self.logger.info('Running on standby master, skipping segment template rollback')
                    continue
                self.logger.info('Rolling back segment template build')
                SegmentTemplate.cleanup_build_segment_template('gpexpand_schema.tar', status[1])

            elif status[0] == 'BUILD_SEGMENTS_STARTED':
                self.logger.info('Rolling back building of new segments')
                newSegList = self.read_input_files(self.statusLogger.get_input_filename())
                self.addNewSegments(newSegList)
                SegmentTemplate.cleanup_build_new_segments(self.pool,
                                                           self.statusLogger.get_seg_tarfile(),
                                                           self.gparray, removeDataDirs=True)

            elif status[0] == 'UPDATE_CATALOG_STARTED':
                self.logger.info('Rolling back master update')
                self.restore_master()
                self.gparray = GpArray.initFromCatalog(dburl, utility=True)

            else:
                self.logger.debug('Skipping %s' % status[0])

1187 1188
        self.conn.close()

1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203
        self.statusLogger.remove_status_file()
        self.statusLogger.remove_segment_configuration_backup_file()

    def get_state(self):
        """Returns expansion state from status logger"""
        return self.statusLogger.get_current_status()[0]

    def generate_inputfile(self):
        """Writes a gpexpand input file based on expansion segments
        added to gparray by the gpexpand interview"""
        outputfile = 'gpexpand_inputfile_' + strftime("%Y%m%d_%H%M%S")
        outfile = open(outputfile, 'w')

        logger.info("Generating input file...")

1204
        for db in self.gparray.getExpansionSegDbList():
1205 1206 1207 1208 1209 1210 1211 1212
            tempStr = "%s:%s:%d:%s:%d:%d:%s" % (canonicalize_address(db.getSegmentHostName())
                                                , canonicalize_address(db.getSegmentAddress())
                                                , db.getSegmentPort()
                                                , db.getSegmentDataDirectory()
                                                , db.getSegmentDbId()
                                                , db.getSegmentContentId()
                                                , db.getSegmentPreferredRole()
                                                )
1213
            outfile.write(tempStr + "\n")
M
Marbin Tan 已提交
1214

1215 1216 1217 1218 1219 1220
        outfile.close()

        return outputfile

    def addNewSegments(self, inputFileEntryList):
        for seg in inputFileEntryList:
1221 1222 1223 1224 1225 1226 1227 1228 1229
            self.gparray.addExpansionSeg(content=int(seg.contentId)
                                         , preferred_role=seg.role
                                         , dbid=int(seg.dbid)
                                         , role=seg.role
                                         , hostname=seg.hostname.strip()
                                         , address=seg.address.strip()
                                         , port=int(seg.port)
                                         , datadir=os.path.abspath(seg.datadir.strip())
                                         )
1230 1231 1232 1233 1234
        try:
            self.gparray.validateExpansionSegs()
        except Exception, e:
            raise ExpansionError('Invalid input file: %s' % e)

1235
    def read_input_files(self, inputFilename=None):
1236 1237
        """Reads and validates line format of the input file passed
        in on the command line via the -i arg"""
M
Marbin Tan 已提交
1238

1239
        retValue = []
M
Marbin Tan 已提交
1240

1241
        if not self.options.filename and not inputFilename:
1242 1243
            raise ExpansionError('Missing input file')

1244 1245
        if self.options.filename:
            inputFilename = self.options.filename
1246 1247 1248 1249 1250 1251 1252
        f = None

        try:
            f = open(inputFilename, 'r')
            try:
                for line, l in line_reader(f):

1253
                    hostname, address, port, datadir, dbid, contentId, role \
1254 1255 1256
                        = parse_gpexpand_segment_line(inputFilename, line, l)

                    # Check that input values look reasonable.
1257
                    if hostname is None or len(hostname) == 0:
1258
                        raise ExpansionError("Invalid host name on line " + str(line))
1259
                    if address is None or len(address) == 0:
1260
                        raise ExpansionError("Invaid address on line " + str(line))
1261
                    if port is None or str(port).isdigit() == False or int(port) < 0:
1262
                        raise ExpansionError("Invalid port number on line " + str(line))
1263
                    if datadir is None or len(datadir) == 0:
1264
                        raise ExpansionError("Invalid data directory on line " + str(line))
1265
                    if dbid is None or str(dbid).isdigit() == False or int(dbid) < 0:
1266
                        raise ExpansionError("Invalid dbid on line " + str(line))
1267
                    if contentId is None or str(contentId).isdigit() == False or int(contentId) < 0:
1268
                        raise ExpansionError("Invalid contentId on line " + str(line))
1269
                    if role is None or len(role) > 1 or (role != 'p' and role != 'm'):
1270 1271 1272 1273 1274 1275 1276 1277 1278 1279
                        raise ExpansionError("Invalid role on line " + str(line))

                    retValue.append(NewSegmentInput(hostname=hostname
                                                    , port=port
                                                    , address=address
                                                    , datadir=datadir
                                                    , dbid=dbid
                                                    , contentId=contentId
                                                    , role=role
                                                    ))
1280 1281 1282 1283 1284 1285 1286 1287
            except ValueError:
                raise ExpansionError('Missing or invalid value on line %d.' % line)
            except Exception, e:
                raise ExpansionError('Invalid input file on line %d: %s' % (line, str(e)))
            finally:
                f.close()
            return retValue
        except IOError:
1288
            raise ExpansionError('Input file %s not found' % self.options.filename)
1289

1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305
    def lock_catalog(self):
        self.conn_catalog_lock = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
        self.logger.info('Locking catalog')
        dbconn.execSQL(self.conn_catalog_lock, "BEGIN")
        # FIXME: is CHECKPOINT inside BEGIN the one wanted by us?
        dbconn.execSQL(self.conn_catalog_lock, "select gp_expand_lock_catalog()")
        dbconn.execSQL(self.conn_catalog_lock, "CHECKPOINT")
        self.logger.info('Locked catalog')

    def unlock_catalog(self):
        self.logger.info('Unlocking catalog')
        dbconn.execSQL(self.conn_catalog_lock, "COMMIT")
        self.conn_catalog_lock.close()
        self.conn_catalog_lock = None
        self.logger.info('Unlocked catalog')

1306 1307
    def add_segments(self):
        """Starts the process of adding the new segments to the array"""
1308 1309 1310 1311 1312 1313 1314 1315 1316 1317
        self.segTemplate = SegmentTemplate(logger=self.logger,
                                           statusLogger=self.statusLogger,
                                           pool=self.pool,
                                           gparray=self.gparray,
                                           masterDataDirectory=self.options.master_data_directory,
                                           dburl=self.dburl,
                                           conn=self.conn,
                                           tempDir=self.tempDir,
                                           segTarDir=self.options.tardir,
                                           batch_size=self.options.batch_size)
1318 1319 1320 1321 1322 1323 1324
        try:
            self.segTemplate.build_segment_template()
            self.segTemplate.build_new_segments()
        except SegmentTemplateError, msg:
            raise ExpansionError(msg)

    def update_original_segments(self):
J
Jialun 已提交
1325
        """Updates the gp_id catalog table of existing hosts"""
1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337

        # Update the gp_id of original segments
        self.newPrimaryCount = 0;
        for seg in self.gparray.getExpansionSegDbList():
            if seg.isSegmentPrimary(False):
                self.newPrimaryCount += 1

        self.newPrimaryCount += self.gparray.get_primary_count()

        if self.segTemplate:
            self.segTemplate.cleanup()

1338 1339
        # FIXME: update postmaster.opts

1340
    def update_catalog(self):
M
Marbin Tan 已提交
1341 1342 1343 1344
        """
        Starts the database, calls updateSystemConfig() to setup
        the catalog tables and get the actual dbid and content id
        for the new segments.
1345
        """
1346
        self.statusLogger.set_gp_segment_configuration_backup(
1347
            self.options.master_data_directory + '/' + SEGMENT_CONFIGURATION_BACKUP_FILE)
1348
        self.gparray.dumpToFile(self.statusLogger.get_gp_segment_configuration_backup())
1349 1350 1351 1352 1353 1354 1355
        self.statusLogger.set_status('UPDATE_CATALOG_STARTED', self.statusLogger.get_gp_segment_configuration_backup())

        # Put expansion segment primaries in change tracking
        for seg in self.gparray.getExpansionSegDbList():
            if seg.isSegmentMirror() == True:
                continue
            if self.gparray.get_mirroring_enabled() == True:
1356
                seg.setSegmentMode(MODE_CHANGELOGGING)
1357 1358 1359 1360 1361

        # Set expansion segment mirror state = down
        for seg in self.gparray.getExpansionSegDbList():
            if seg.isSegmentPrimary() == True:
                continue
1362
            seg.setSegmentStatus(STATUS_DOWN)
1363 1364 1365 1366 1367

        # Update the catalog
        configurationInterface.getConfigurationProvider().updateSystemConfig(
            self.gparray,
            "%s: segment config for resync" % getProgramName(),
1368 1369 1370
            dbIdToForceMirrorRemoveAdd={},
            useUtilityMode=True,
            allowPrimary=True
1371 1372 1373 1374 1375 1376 1377 1378 1379 1380
        )

        # The content IDs may have changed, so we must make sure the array is in proper order.
        self.gparray.reOrderExpansionSegs()

        # Issue checkpoint due to forced shutdown below
        self.conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
        dbconn.execSQL(self.conn, "CHECKPOINT")
        self.conn.close()

1381 1382 1383
        # increase expand version 
        self.conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
        dbconn.execSQL(self.conn, "select gp_expand_bump_version()")
1384 1385
        self.conn.close()

J
Jialun 已提交
1386 1387
        inject_fault('gpexpand rollback test fault injection')

1388
        self.statusLogger.set_status('UPDATE_CATALOG_DONE')
J
Jialun 已提交
1389
        self.pastThePointOfNoReturn = True;
1390

1391
    # --------------------------------------------------------------------------
1392 1393 1394 1395
    def cleanup_new_segments(self):
        """
        This method is called after all new segments have been configured.
        """
M
Marbin Tan 已提交
1396

1397 1398
        self.logger.info('Cleaning up databases in new segments.')
        newSegments = self.gparray.getExpansionSegDbList()
M
Marbin Tan 已提交
1399

1400 1401 1402 1403 1404
        """ Get a list of databases. """
        conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8')
        databases = catalog.getDatabaseList(conn)
        conn.close()

M
Marbin Tan 已提交
1405
        """
1406
        Connect to each database in each segment and do some cleanup of tables that have stuff in them as a result of copying the segment from the master.
1407
        Note, this functionality used to be in segcopy and was therefore done just once to the original copy of the master.
1408 1409 1410
        """
        for seg in newSegments:
            if seg.isSegmentMirror() == True:
1411
                continue
M
Marbin Tan 已提交
1412
            """ Start all the new segments in utilty mode. """
1413 1414 1415
            segStartCmd = SegmentStart(
                name="Starting new segment dbid %s on host %s." % (str(seg.getSegmentDbId()), seg.getSegmentHostName())
                , gpdb=seg
1416
                , numContentsInCluster=self.newPrimaryCount  # Starting seg on it's own.
1417 1418 1419 1420 1421
                , era=None
                , mirrormode=MIRROR_MODE_MIRRORLESS
                , utilityMode=True
                , ctxt=REMOTE
                , remoteHost=seg.getSegmentHostName()
S
Shoaib Lari 已提交
1422
                , pg_ctl_wait=True
1423
                , timeout=SEGMENT_TIMEOUT_DEFAULT)
1424 1425 1426
            self.pool.addCommand(segStartCmd)
        self.pool.join()
        self.pool.check_results()
M
Marbin Tan 已提交
1427

1428 1429 1430 1431
        """
        Build the list of delete statements based on the MASTER_ONLY_TABLES
        defined in gpcatalog.py
        """
1432
        statements = ["delete from pg_catalog.%s" % tab for tab in MASTER_ONLY_TABLES]
M
Marbin Tan 已提交
1433 1434

        """
1435 1436 1437 1438
          Connect to each database in the new segments, and clean up the catalog tables.
        """
        for seg in newSegments:
            if seg.isSegmentMirror() == True:
1439
                continue
1440
            for database in databases:
1441 1442 1443 1444 1445 1446 1447
                if database[0] == 'template0':
                    continue
                dburl = dbconn.DbURL(hostname=seg.getSegmentHostName()
                                     , port=seg.getSegmentPort()
                                     , dbname=database[0]
                                     )
                name = "gpexpand execute segment cleanup commands. seg dbid = %s, command = %s" % (
S
Shoaib Lari 已提交
1448
                    seg.getSegmentDbId(), str(statements))
1449 1450 1451 1452 1453 1454 1455 1456 1457
                execSQLCmd = ExecuteSQLStatementsCommand(name=name
                                                         , url=dburl
                                                         , sqlCommandList=statements
                                                         )
                self.pool.addCommand(execSQLCmd)
                self.pool.join()
                ### need to fix self.pool.check_results(). Call getCompletedItems to clear the queue for now.
                self.pool.check_results()
                self.pool.getCompletedItems()
M
Marbin Tan 已提交
1458

1459
    # --------------------------------------------------------------------------
1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475
    def restore_master(self):
        """Restores the gp_segment_configuration catalog table for rollback"""
        backupFile = self.statusLogger.get_gp_segment_configuration_backup()

        if not os.path.exists(backupFile):
            raise ExpansionError('gp_segment_configuration backup file %s does not exist' % backupFile)

        # Create a new gpArray from the backup file
        array = GpArray.initFromFile(backupFile)

        originalDbIds = ""
        originalDbIdsList = []
        first = True
        for seg in array.getDbList():
            originalDbIdsList.append(int(seg.getSegmentDbId()))
            if first == False:
1476
                originalDbIds += ", "
1477 1478
            first = False
            originalDbIds += str(seg.getSegmentDbId())
M
Marbin Tan 已提交
1479

1480
        if len(originalDbIds) > 0:
1481 1482 1483
            # Update the catalog with the contents of the backup
            restore_conn = None
            try:
1484
                restore_conn = dbconn.connect(self.dburl, utility=True, encoding='UTF8', allowSystemTableMods=True)
1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534

                # Get a list of all the expand primary segments
                sqlStr = "select dbid from pg_catalog.gp_segment_configuration where dbid not in (%s) and role = 'p'" % str(
                    originalDbIds)
                curs = dbconn.execSQL(restore_conn, sqlStr)
                deleteDbIdList = []
                rows = curs.fetchall()
                for row in rows:
                    deleteDbIdList.append(int(row[0]))

                # Get a list of all the expand mirror segments
                sqlStr = "select content from pg_catalog.gp_segment_configuration where dbid not in (%s) and role = 'm'" % str(
                    originalDbIds)
                curs = dbconn.execSQL(restore_conn, sqlStr)
                deleteContentList = []
                rows = curs.fetchall()
                for row in rows:
                    deleteContentList.append(int(row[0]))

                #
                # The following is a sanity check to make sure we don't do something bad here.
                #
                if len(originalDbIdsList) < 2:
                    self.logger.error(
                        "The original DB DIS list is to small to be correct: %s " % str(len(originalDbIdsList)))
                    raise Exception("Unable to complete rollback")

                totalToDelete = len(deleteDbIdList) + len(deleteContentList)
                if int(totalToDelete) > int(self.statusLogger.get_number_new_segments()):
                    self.logger.error(
                        "There was a discrepancy between the number of expand segments to rollback (%s), and the expected number of segment to rollback (%s)" \
                        % (str(totalToDelete), str(self.statusLogger.get_number_new_segments())))
                    self.logger.error("  Expanded primary segment dbids = %s", str(deleteDbIdList))
                    self.logger.error("  Expansion mirror content ids   = %s", str(deleteContentList))
                    raise Exception("Unable to complete rollback")

                for content in deleteContentList:
                    sqlStr = "select * from gp_remove_segment_mirror(%s::smallint)" % str(content)
                    dbconn.execSQL(restore_conn, sqlStr)

                for dbid in deleteDbIdList:
                    sqlStr = "select * from gp_remove_segment(%s::smallint)" % str(dbid)
                    dbconn.execSQL(restore_conn, sqlStr)

                restore_conn.commit()
            except Exception, e:
                raise Exception("Unable to restore master. Exception: " + str(e))
            finally:
                if restore_conn != None:
                    restore_conn.close()
1535

1536 1537 1538 1539 1540 1541 1542
    def sync_new_mirrors(self):
        """ This method will execute gprecoverseg so that all new segments sync with their mirrors."""
        if self.gparray.get_mirroring_enabled():
            self.logger.info('Starting new mirror segment synchronization')
            cmd = GpRecoverSeg(name="gpexpand syncing mirrors", options="-a -F")
            cmd.run(validateAfter=True)

1543 1544
    def start_prepare(self):
        """Inserts into gpexpand.status that expansion preparation has started."""
1545
        if self.options.filename:
1546
            self.statusLogger.create_status_file()
1547
            self.statusLogger.set_status('EXPANSION_PREPARE_STARTED', os.path.abspath(self.options.filename))
1548 1549 1550 1551 1552 1553 1554 1555 1556 1557

    def finalize_prepare(self):
        """Removes the gpexpand status file and segment configuration backup file"""
        self.statusLogger.remove_status_file()
        self.statusLogger.remove_segment_configuration_backup_file()

    def setup_schema(self):
        """Used to setup the gpexpand schema"""
        self.statusLogger.set_status('SETUP_EXPANSION_SCHEMA_STARTED')
        self.logger.info('Creating expansion schema')
1558
        self.conn = dbconn.connect(self.dburl, encoding='UTF8')
1559 1560 1561
        dbconn.execSQL(self.conn, create_schema_sql)
        dbconn.execSQL(self.conn, status_table_sql)
        dbconn.execSQL(self.conn, status_detail_table_sql)
1562

1563
        # views
1564
        if not self.options.simple_progress:
1565
            dbconn.execSQL(self.conn, progress_view_sql)
1566
        else:
1567
            dbconn.execSQL(self.conn, progress_view_simple_sql)
1568 1569 1570 1571 1572 1573 1574 1575 1576 1577

        self.conn.commit()

        self.statusLogger.set_status('SETUP_EXPANSION_SCHEMA_DONE')

    def prepare_schema(self):
        """Prepares the gpexpand schema"""
        self.statusLogger.set_status('PREPARE_EXPANSION_SCHEMA_STARTED')

        if not self.conn:
1578
            self.conn = dbconn.connect(self.dburl, encoding='UTF8', allowSystemTableMods=True)
1579 1580 1581
            self.gparray = GpArray.initFromCatalog(self.dburl)

        nowStr = datetime.datetime.now()
1582
        statusSQL = "INSERT INTO %s.%s VALUES ( 'SETUP', '%s' ) " % (gpexpand_schema, status_table, nowStr)
1583

1584
        dbconn.execSQL(self.conn, statusSQL)
1585 1586 1587 1588

        db_list = catalog.getDatabaseList(self.conn)

        for db in db_list:
1589
            dbname = db[0]
1590 1591
            if dbname == 'template0':
                continue
1592
            self.logger.info('Populating %s.%s with data from database %s' % (
S
Shoaib Lari 已提交
1593
                gpexpand_schema, status_detail_table, dbname.decode('utf-8')))
1594 1595 1596 1597 1598
            self._populate_regular_tables(dbname)
            self._populate_partitioned_tables(dbname)
            inject_fault('gpexpand MPP-14620 fault injection')

        nowStr = datetime.datetime.now()
1599
        statusSQL = "INSERT INTO %s.%s VALUES ( 'SETUP DONE', '%s' ) " % (gpexpand_schema, status_table, nowStr)
1600 1601 1602 1603
        dbconn.execSQL(self.conn, statusSQL)

        self.conn.commit()

1604
        self.conn.close()
1605 1606 1607 1608 1609 1610 1611 1612

        self.statusLogger.set_status('PREPARE_EXPANSION_SCHEMA_DONE')
        self.statusLogger.set_status('EXPANSION_PREPARE_DONE')

        # At this point, no rollback is possible and the the system
        # including new segments has been started once before so finalize
        self.finalize_prepare()

1613
    def _populate_regular_tables(self, dbname):
1614 1615 1616 1617
        """ we don't do 3.2+ style partitioned tables here, but we do
            all other table types.
        """

1618
        src_bytes_str = "0" if self.options.simple_progress else "pg_relation_size(quote_ident(n.nspname) || '.' || quote_ident(c.relname))"
1619
        sql = """SELECT
1620
    quote_ident(n.nspname) || '.' || quote_ident(c.relname) as fq_name,
1621 1622 1623 1624
    n.oid as schemaoid,
    c.oid as tableoid,
    p.attrnums as distribution_policy,
    now() as last_updated,
P
Pengzhou Tang 已提交
1625
    %s,
1626 1627
    p.policytype as distribution_policy_type,
    NULL as root_partition_name
1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658
FROM
            pg_class c
    JOIN pg_namespace n ON (c.relnamespace=n.oid)
    JOIN pg_catalog.gp_distribution_policy p on (c.oid = p.localoid)
    LEFT JOIN pg_partition pp on (c.oid=pp.parrelid)
    LEFT JOIN pg_partition_rule pr on (c.oid=pr.parchildrelid)
WHERE
    pp.parrelid is NULL
    AND pr.parchildrelid is NULL
    AND n.nspname != 'gpexpand'
    AND n.nspname != 'pg_bitmapindex'
    AND c.relstorage != 'x';

                  """ % (src_bytes_str)
        self.logger.debug(sql)
        table_conn = self.connect_database(dbname)
        curs = dbconn.execSQL(table_conn, sql)
        rows = curs.fetchall()
        try:
            sql_file = os.path.abspath('./%s.dat' % status_detail_table)
            self.logger.debug('status_detail data file: %s' % sql_file)
            fp = open(sql_file, 'w')
            for row in rows:
                fqname = row[0]
                schema_oid = row[1]
                table_oid = row[2]
                if row[3]:
                    self.logger.debug("dist policy raw: %s " % row[3].decode('utf-8'))
                else:
                    self.logger.debug("dist policy raw: NULL")
                dist_policy = row[3]
1659
                (policy_name, policy_oids) = self.form_dist_policy_name(table_conn, row[3], table_oid)
1660 1661 1662 1663 1664
                rel_bytes = int(row[5])

                if dist_policy is None:
                    dist_policy = 'NULL'

P
Pengzhou Tang 已提交
1665
                dist_policy_type = row[6];
1666
                root_partition_name = row[7]
1667 1668 1669
                full_name = '%s.%s' % (dbname, fqname)
                rank = 1 if self.unique_index_tables.has_key(full_name) else 2

1670
                fp.write("""%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\tNULL\t%d\t%s\tNULL\tNULL\t%d\n""" % (
S
Shoaib Lari 已提交
1671 1672
                    dbname, fqname, schema_oid, table_oid,
                    dist_policy, policy_name, policy_oids,
1673
                    dist_policy_type, root_partition_name, rank, undone_status, rel_bytes))
1674 1675 1676 1677 1678 1679 1680 1681 1682
        except Exception, e:
            raise ExpansionError(e)
        finally:
            if fp: fp.close()

        try:
            copySQL = """COPY %s.%s FROM '%s' NULL AS 'NULL'""" % (gpexpand_schema, status_detail_table, sql_file)

            self.logger.debug(copySQL)
1683
            dbconn.execSQL(self.conn, copySQL)
1684 1685 1686 1687 1688 1689 1690 1691
        except Exception, e:
            raise ExpansionError(e)
        finally:
            os.unlink(sql_file)

        table_conn.commit()
        table_conn.close()

1692
    def _populate_partitioned_tables(self, dbname):
1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713
        """
        population of status_detail for partitioned tables, leaf partition cannot
        has different numsegments with root partition, we need to expand root
        partition in one shot, so just populate root partition for now.

        TODO:
        We used to use a tricky but effective way to expand leaf partition in
        in parallel, that way is still under discussion. Keep the old method
        here in case we need bring it back someday.

        Step1:
           BEGIN;
           Lock all root/interior/leaf partitions
           Change all numsegments of root/interior/leaf partitions to size of cluster;
           Change all leaf partition to random distributed;
           COMMIT;
        Step2:
           Change all leaf partition's policy back to old policy with a mandatory
           data movement.
        """
        src_bytes_str = "0" if self.options.simple_progress else "pg_relation_size(quote_ident(n.nspname) || '.' || quote_ident(c.relname))"
1714
        sql = """
1715
SELECT
1716
    quote_ident(n.nspname) || '.' || quote_ident(c.relname) as fq_name,
1717
    n.oid as schemaoid,
1718
    c.oid as tableoid,
1719 1720 1721
    d.attrnums as distributed_policy,
    now() as last_updated,
    %s,
1722
    quote_ident(n.nspname) || '.' || quote_ident(c.relname) as root_partition_name
1723 1724 1725
FROM
    pg_class c,
    pg_namespace n,
1726
    pg_partition p,
1727 1728
    gp_distribution_policy d
WHERE
1729 1730 1731 1732 1733
    c.relnamespace = n.oid
    AND p.parrelid = c.oid
    AND d.localoid = c.oid
    AND parlevel = 0
ORDER BY fq_name, tableoid desc;
1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753
                  """ % (src_bytes_str)
        self.logger.debug(sql)
        table_conn = self.connect_database(dbname)
        curs = dbconn.execSQL(table_conn, sql)
        rows = curs.fetchall()

        try:
            sql_file = os.path.abspath('./%s.dat' % status_detail_table)
            self.logger.debug('status_detail data file: %s' % sql_file)
            fp = open(sql_file, 'w')

            for row in rows:
                fqname = row[0]
                schema_oid = row[1]
                table_oid = row[2]
                if row[3]:
                    self.logger.debug("dist policy raw: %s " % row[3])
                else:
                    self.logger.debug("dist policy raw: NULL")
                dist_policy = row[3]
1754
                (policy_name, policy_oids) = self.form_dist_policy_name(table_conn, row[3], table_oid)
1755 1756 1757 1758 1759
                rel_bytes = int(row[5])

                if dist_policy is None:
                    dist_policy = 'NULL'

1760 1761
                dist_policy_type = 'p';
                root_partition_name = row[6]
1762 1763 1764
                full_name = '%s.%s' % (dbname, fqname)
                rank = 1 if self.unique_index_tables.has_key(full_name) else 2

1765
                fp.write("""%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\tNULL\t%d\t%s\tNULL\tNULL\t%d\n""" % (
S
Shoaib Lari 已提交
1766 1767
                    dbname, fqname, schema_oid, table_oid,
                    dist_policy, policy_name, policy_oids,
1768
                    dist_policy_type, root_partition_name, rank, undone_status, rel_bytes))
1769 1770 1771 1772 1773 1774 1775 1776 1777
        except Exception:
            raise
        finally:
            if fp: fp.close()

        try:
            copySQL = """COPY %s.%s FROM '%s' NULL AS 'NULL'""" % (gpexpand_schema, status_detail_table, sql_file)

            self.logger.debug(copySQL)
1778
            dbconn.execSQL(self.conn, copySQL)
1779 1780 1781 1782 1783 1784 1785 1786
        except Exception, e:
            raise ExpansionError(e)
        finally:
            os.unlink(sql_file)

        table_conn.commit()
        table_conn.close()

1787
    def form_dist_policy_name(self, conn, rs_val, table_oid):
1788
        if rs_val is None:
1789 1790
            return (None, None)
        rs_val = rs_val.lstrip('{').rstrip('}').strip()
1791

1792 1793
        namedict = {}
        oiddict = {}
1794
        sql = "select attnum, attname, attrelid from pg_attribute where attrelid =  %s and attnum > 0" % table_oid
1795
        cursor = dbconn.execSQL(conn, sql)
1796 1797 1798 1799
        for row in cursor:
            namedict[row[0]] = row[1]
            oiddict[row[0]] = row[2]

1800 1801
        name_list = []
        oid_list = []
1802 1803

        if rs_val != "":
1804
            rs_list = rs_val.split(',')
1805 1806 1807 1808 1809

            for colnum in rs_list:
                name_list.append(namedict[int(colnum)])
                oid_list.append(str(oiddict[int(colnum)]))

1810
        return (' , '.join(name_list), ' , '.join(oid_list))
1811 1812

    def perform_expansion(self):
1813
        """Performs the actual table re-organizations"""
1814 1815
        expansionStart = datetime.datetime.now()

1816 1817
        # setup a threadpool
        self.queue = WorkerPool(numWorkers=self.numworkers)
1818

1819
        # go through and reset any "IN PROGRESS" tables
1820
        self.conn = dbconn.connect(self.dburl, encoding='UTF8')
1821
        sql = "INSERT INTO %s.%s VALUES ( 'EXPANSION STARTED', '%s' ) " % (
S
Shoaib Lari 已提交
1822
            gpexpand_schema, status_table, expansionStart)
1823 1824 1825
        cursor = dbconn.execSQL(self.conn, sql)
        self.conn.commit()

1826 1827
        sql = """UPDATE gpexpand.status_detail set status = '%s' WHERE status = '%s' """ % (undone_status, start_status)
        cursor = dbconn.execSQL(self.conn, sql)
1828 1829
        self.conn.commit()

1830 1831 1832
        # read schema and queue up commands
        sql = "SELECT * FROM %s.%s WHERE status = 'NOT STARTED' ORDER BY rank" % (gpexpand_schema, status_detail_table)
        cursor = dbconn.execSQL(self.conn, sql)
1833 1834 1835 1836

        for row in cursor:
            self.logger.debug(row)
            name = "name"
1837
            tbl = ExpandTable(options=self.options, row=row)
1838
            cmd = ExpandCommand(name=name, status_url=self.dburl, table=tbl, options=self.options)
1839 1840
            self.queue.addCommand(cmd)

1841 1842
        table_expand_error = False

1843 1844
        stopTime = None
        stoppedEarly = False
1845 1846
        if self.options.end:
            stopTime = self.options.end
1847

1848
        # wait till done.
1849
        while not self.queue.isDone():
1850 1851
            logger.debug(
                "woke up.  queue: %d finished %d  " % (self.queue.num_assigned, self.queue.completed_queue.qsize()))
1852 1853 1854
            if stopTime and datetime.datetime.now() >= stopTime:
                stoppedEarly = True
                break
1855
            time.sleep(5)
1856 1857 1858 1859 1860 1861 1862 1863

        expansionStopped = datetime.datetime.now()

        self.pool.haltWork()
        self.pool.joinWorkers()
        self.queue.haltWork()
        self.queue.joinWorkers()

1864 1865 1866 1867 1868 1869 1870
        # Doing this after the halt and join workers guarantees that no new completed items can be added
        # while we're doing a check
        for expandCommand in self.queue.getCompletedItems():
            if expandCommand.table_expand_error:
                table_expand_error = True
                break

1871 1872
        if stoppedEarly:
            logger.info('End time reached.  Stopping expansion.')
1873
            sql = "INSERT INTO %s.%s VALUES ( 'EXPANSION STOPPED', '%s' ) " % (
S
Shoaib Lari 已提交
1874
                gpexpand_schema, status_table, expansionStopped)
1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887
            cursor = dbconn.execSQL(self.conn, sql)
            self.conn.commit()
            logger.info('You can resume expansion by running gpexpand again')
        elif table_expand_error:
            logger.warn('**************************************************')
            logger.warn('One or more tables failed to expand successfully.')
            logger.warn('Please check the log file, correct the problem and')
            logger.warn('run gpexpand again to finish the expansion process')
            logger.warn('**************************************************')
            # We'll try to update the status, but if the errors were caused by
            # going into read only mode, this will fail.  That's ok though as
            # gpexpand will resume next run
            try:
1888
                sql = "INSERT INTO %s.%s VALUES ( 'EXPANSION STOPPED', '%s' ) " % (
S
Shoaib Lari 已提交
1889
                    gpexpand_schema, status_table, expansionStopped)
1890 1891 1892 1893 1894
                cursor = dbconn.execSQL(self.conn, sql)
                self.conn.commit()
            except:
                pass
        else:
1895
            sql = "INSERT INTO %s.%s VALUES ( 'EXPANSION COMPLETE', '%s' ) " % (
S
Shoaib Lari 已提交
1896
                gpexpand_schema, status_table, expansionStopped)
1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913
            cursor = dbconn.execSQL(self.conn, sql)
            self.conn.commit()
            logger.info("EXPANSION COMPLETED SUCCESSFULLY")

    def shutdown(self):
        """used if the script is closed abrubtly"""
        logger.info('Shutting down gpexpand...')
        if self.pool:
            self.pool.haltWork()
            self.pool.joinWorkers()

        if self.queue:
            self.queue.haltWork()
            self.queue.joinWorkers()

        try:
            expansionStopped = datetime.datetime.now()
1914
            sql = "INSERT INTO %s.%s VALUES ( 'EXPANSION STOPPED', '%s' ) " % (
S
Shoaib Lari 已提交
1915
                gpexpand_schema, status_table, expansionStopped)
1916 1917 1918
            cursor = dbconn.execSQL(self.conn, sql)
            self.conn.commit()

1919
            cursor.close()
1920 1921 1922 1923 1924 1925 1926
            self.conn.close()
        except pg.OperationalError:
            pass
        except Exception:
            # schema doesn't exist.  Cancel or error during setup
            pass

1927 1928 1929 1930 1931 1932 1933 1934
    def halt_work(self):
        if self.pool:
            self.pool.haltWork()
            self.pool.joinWorkers()

        if self.queue:
            self.queue.haltWork()
            self.queue.joinWorkers()
1935 1936 1937

    def cleanup_schema(self, gpexpand_db_status):
        """Removes the gpexpand schema"""
1938
        # drop schema
1939 1940 1941 1942
        if gpexpand_db_status != 'EXPANSION COMPLETE':
            c = dbconn.connect(self.dburl, encoding='UTF8')
            self.logger.warn('Expansion has not yet completed.  Removing the expansion')
            self.logger.warn('schema now will leave the following tables unexpanded:')
1943
            unexpanded_tables_sql = "SELECT fq_name FROM %s.%s WHERE status = 'NOT STARTED' ORDER BY rank" % (
S
Shoaib Lari 已提交
1944
                gpexpand_schema, status_detail_table)
1945

1946
            cursor = dbconn.execSQL(c, unexpanded_tables_sql)
1947 1948 1949 1950 1951 1952 1953
            unexpanded_tables_text = ''.join("\t%s\n" % row[0] for row in cursor)

            c.close()

            self.logger.warn(unexpanded_tables_text)
            self.logger.warn('These tables will have to be expanded manually by setting')
            self.logger.warn('the distribution policy using the ALTER TABLE command.')
1954
            if not ask_yesno('', "Are you sure you want to drop the expansion schema?", 'N'):
1955 1956 1957 1958 1959
                logger.info("User Aborted. Exiting...")
                sys.exit(0)

        # See if user wants to dump the status_detail table to file
        c = dbconn.connect(self.dburl, encoding='UTF8')
1960 1961
        if ask_yesno('', "Do you want to dump the gpexpand.status_detail table to file?", 'Y'):
            self.logger.info(
1962 1963
                "Dumping gpexpand.status_detail to %s/gpexpand.status_detail" % self.options.master_data_directory)
            copy_gpexpand_status_detail_sql = "COPY gpexpand.status_detail TO '%s/gpexpand.status_detail'" % self.options.master_data_directory
1964 1965 1966
            dbconn.execSQL(c, copy_gpexpand_status_detail_sql)

        self.logger.info("Removing gpexpand schema")
1967
        dbconn.execSQL(c, drop_schema_sql)
1968 1969 1970
        c.commit()
        c.close()

1971
    def connect_database(self, dbname):
1972 1973
        test_url = copy.deepcopy(self.dburl)
        test_url.pgdb = dbname
1974
        c = dbconn.connect(test_url, encoding='UTF8', allowSystemTableMods=True)
1975 1976 1977 1978
        return c

    def sync_packages(self):
        """
M
Marbin Tan 已提交
1979
        The design decision here is to squash any exceptions resulting from the
1980 1981 1982 1983
        synchronization of packages. We should *not* disturb the user's attempts to expand.
        """
        try:
            logger.info('Syncing Greenplum Database extensions')
M
Marbin Tan 已提交
1984
            new_segment_list = self.gparray.getExpansionSegDbList()
1985
            new_host_set = set([h.getSegmentHostName() for h in new_segment_list])
M
Marbin Tan 已提交
1986 1987
            operations = [SyncPackages(host) for host in new_host_set]
            ParallelOperation(operations, self.numworkers).run()
1988 1989 1990
            # introspect outcomes
            for operation in operations:
                operation.get_ret()
1991
        except Exception:
1992 1993 1994
            logger.exception('Syncing of Greenplum Database extensions has failed.')
            logger.warning('Please run gppkg --clean after successful expansion.')

1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014
    def validate_heap_checksums(self):
        num_workers = min(len(self.gparray.get_hostlist()), MAX_PARALLEL_EXPANDS)
        heap_checksum_util = HeapChecksum(gparray=self.gparray, num_workers=num_workers, logger=self.logger)
        successes, failures = heap_checksum_util.get_segments_checksum_settings()
        if len(successes) == 0:
            logger.fatal("No segments responded to ssh query for heap checksum. Not expanding the cluster.")
            return 1

        consistent, inconsistent, master_heap_checksum = heap_checksum_util.check_segment_consistency(successes)

        inconsistent_segment_msgs = []
        for segment in inconsistent:
            inconsistent_segment_msgs.append("dbid: %s "
                                             "checksum set to %s differs from master checksum set to %s" %
                                             (segment.getSegmentDbId(), segment.heap_checksum,
                                              master_heap_checksum))

        if not heap_checksum_util.are_segments_consistent(consistent, inconsistent):
            self.logger.fatal("Cluster heap checksum setting differences reported")
            self.logger.fatal("Heap checksum settings on %d of %d segment instances do not match master <<<<<<<<"
2015
                              % (len(inconsistent_segment_msgs), len(self.gparray.segmentPairs)))
2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027
            self.logger.fatal("Review %s for details" % get_logfile())
            log_to_file_only("Failed checksum consistency validation:", logging.WARN)
            self.logger.fatal("gpexpand error: Cluster will not be modified as checksum settings are not consistent "
                              "across the cluster.")

            for msg in inconsistent_segment_msgs:
                log_to_file_only(msg, logging.WARN)
                raise Exception("Segments have heap_checksum set inconsistently to master")
        else:
            self.logger.info("Heap checksum setting consistent across cluster")


2028 2029
# -----------------------------------------------
class ExpandTable():
2030 2031
    def __init__(self, options, row=None):
        self.options = options
2032
        self.is_root_partition = False
2033
        if row is not None:
2034 2035
            (self.dbname, self.fq_name, self.schema_oid, self.table_oid,
             self.distrib_policy, self.distrib_policy_names, self.distrib_policy_coloids,
2036
             self.distrib_policy_type, self.root_partition_name,
2037 2038
             self.storage_options, self.rank, self.status,
             self.expansion_started, self.expansion_finished,
2039
             self.source_bytes) = row
2040 2041
        if self.fq_name == self.root_partition_name:
            self.is_root_partition = True
2042

2043
    def add_table(self, conn):
2044 2045
        insertSQL = """INSERT INTO %s.%s
                            VALUES ('%s','%s',%s,%s,
2046
                                    '%s','%s', '%s', '%s', '%s','%s',%d,'%s','%s','%s',%d)
2047
                    """ % (gpexpand_schema, status_detail_table,
2048
                           self.dbname, self.fq_name.replace("\'", "\'\'"), self.schema_oid, self.table_oid,
2049
                           self.distrib_policy, self.distrib_policy_names, self.distrib_policy_coloids,
2050
                           self.distrib_policy_type, self.root_partition_name,
2051 2052 2053
                           self.storage_options, self.rank, self.status,
                           self.expansion_started, self.expansion_finished,
                           self.source_bytes)
2054 2055
        logger.info('Added table %s.%s' % (self.dbname.decode('utf-8'), self.fq_name.decode('utf-8')))
        logger.debug(insertSQL.decode('utf-8'))
2056
        dbconn.execSQL(conn, insertSQL)
2057

2058
    def mark_started(self, status_conn, table_conn, start_time, cancel_flag):
2059 2060
        if cancel_flag:
            return
2061
        sql = "SELECT pg_relation_size(%s)" % (self.table_oid)
2062
        cursor = dbconn.execSQL(table_conn, sql)
2063 2064 2065 2066 2067 2068 2069 2070
        row = cursor.fetchone()
        src_bytes = int(row[0])
        logger.debug(" Table: %s has %d bytes" % (self.fq_name.decode('utf-8'), src_bytes))

        sql = """UPDATE %s.%s
                  SET status = '%s', expansion_started='%s',
                      source_bytes = %d
                  WHERE dbname = '%s' AND schema_oid = %s
2071 2072 2073 2074
                        AND table_oid = %s """ % (gpexpand_schema, status_detail_table,
                                                  start_status, start_time,
                                                  src_bytes, self.dbname,
                                                  self.schema_oid, self.table_oid)
2075 2076

        logger.debug("Mark Started: " + sql.decode('utf-8'))
2077
        dbconn.execSQL(status_conn, sql)
2078 2079 2080 2081 2082 2083
        status_conn.commit()

    def reset_started(self, status_conn):
        sql = """UPDATE %s.%s
                 SET status = '%s', expansion_started=NULL, expansion_finished=NULL
                 WHERE dbname = '%s' AND schema_oid = %s
2084 2085
                 AND table_oid = %s """ % (gpexpand_schema, status_detail_table, undone_status,
                                           self.dbname, self.schema_oid, self.table_oid)
2086 2087

        logger.debug('Reseting detailed_status: %s' % sql.decode('utf-8'))
2088
        dbconn.execSQL(status_conn, sql)
2089 2090
        status_conn.commit()

2091 2092 2093 2094 2095 2096 2097 2098
    def expand(self, table_conn, cancel_flag):
        # for root partition, we want to expand whose partition in one shot
        # TODO: expand leaf partitions seperately in parallel
        if self.is_root_partition:
            only_str = ""
        else:
            only_str = "ONLY"

2099 2100 2101 2102
        new_storage_options = ''
        if self.storage_options:
            new_storage_options = ',' + self.storage_options

2103
        sql = 'ALTER TABLE %s %s EXPAND TABLE' % (only_str, self.fq_name)
2104 2105 2106 2107 2108 2109

        logger.info('Expanding %s.%s' % (self.dbname.decode('utf-8'), self.fq_name.decode('utf-8')))
        logger.debug("Expand SQL: %s" % sql.decode('utf-8'))

        # check is atomic in python
        if not cancel_flag:
2110
            dbconn.execSQL(table_conn, sql)
2111
            table_conn.commit()
2112
            if self.options.analyze:
2113 2114
                sql = 'ANALYZE %s' % (self.fq_name)
                logger.info('Analyzing %s' % (self.fq_name.decode('utf-8')))
2115 2116 2117
                dbconn.execSQL(table_conn, sql)
                table_conn.commit()

2118 2119 2120 2121 2122
            return True

        # I can only get here if the cancel flag is True
        return False

2123
    def mark_finished(self, status_conn, start_time, finish_time):
2124 2125 2126
        sql = """UPDATE %s.%s
                  SET status = '%s', expansion_started='%s', expansion_finished='%s'
                  WHERE dbname = '%s' AND schema_oid = %s
2127 2128 2129
                  AND table_oid = %s """ % (gpexpand_schema, status_detail_table,
                                            done_status, start_time, finish_time,
                                            self.dbname, self.schema_oid, self.table_oid)
2130
        logger.debug(sql.decode('utf-8'))
2131
        dbconn.execSQL(status_conn, sql)
2132 2133 2134 2135 2136 2137
        status_conn.commit()

    def mark_does_not_exist(self, status_conn, finish_time):
        sql = """UPDATE %s.%s
                  SET status = '%s', expansion_finished='%s'
                  WHERE dbname = '%s' AND schema_oid = %s
2138
                  AND table_oid = %s """ % (gpexpand_schema, status_detail_table,
2139
                                            does_not_exist_status, finish_time,
2140
                                            self.dbname, self.schema_oid, self.table_oid)
2141
        logger.debug(sql.decode('utf-8'))
2142
        dbconn.execSQL(status_conn, sql)
2143 2144 2145
        status_conn.commit()


2146
# -----------------------------------------------
2147 2148 2149 2150
class ExecuteSQLStatementsCommand(SQLCommand):
    """
    This class will execute a list of SQL statements.
    """
2151

2152 2153 2154 2155 2156 2157
    def __init__(self, name, url, sqlCommandList):
        self.name = name
        self.url = url
        self.sqlCommandList = sqlCommandList
        self.conn = None
        self.error = None
M
Marbin Tan 已提交
2158

2159
        SQLCommand.__init__(self, name)
M
Marbin Tan 已提交
2160

2161
    def run(self, validateAfter=False):
2162 2163 2164 2165
        statement = None

        faultPoint = os.getenv('GP_COMMAND_FAULT_POINT')
        if faultPoint and self.name and self.name.startswith(faultPoint):
2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176
            # simulate error
            self.results = CommandResult(1, 'Fault Injection', 'Fault Injection', False, True)
            self.error = "Fault Injection"
            return

        self.results = CommandResult(rc=0
                                     , stdout=""
                                     , stderr=""
                                     , completed=True
                                     , halt=False
                                     )
2177 2178

        try:
2179
            self.conn = dbconn.connect(self.url, utility=True, encoding='UTF8', allowSystemTableMods=True)
2180 2181 2182 2183 2184 2185 2186 2187 2188
            for statement in self.sqlCommandList:
                dbconn.execSQL(self.conn, statement)
            self.conn.commit()
        except Exception, e:
            # traceback.print_exc()
            logger.error("Exception in ExecuteSQLStatements. URL = %s" % str(self.url))
            logger.error("  Statement = %s" % str(statement))
            logger.error("  Exception = %s" % str(e))
            self.error = str(e)
2189 2190 2191 2192 2193 2194
            self.results = CommandResult(rc=1
                                         , stdout=""
                                         , stderr=str(e)
                                         , completed=False
                                         , halt=True
                                         )
2195 2196
        finally:
            if self.conn != None:
2197
                self.conn.close()
M
Marbin Tan 已提交
2198

2199 2200
    def set_results(self, results):
        raise ExecutionError("TODO:  must implement", None)
2201 2202 2203 2204 2205 2206

    def get_results(self):
        return self.results

    def was_successful(self):
        if self.error != None:
2207
            return False
2208
        else:
2209
            return True
2210

2211 2212
    def validate(self, expected_rc=0):
        raise ExecutionError("TODO:  must implement", None)
2213 2214


2215
# -----------------------------------------------
2216
class ExpandCommand(SQLCommand):
2217
    def __init__(self, name, status_url, table, options):
2218 2219
        self.status_url = status_url
        self.table = table
2220
        self.options = options
2221
        self.cmdStr = "Expand %s.%s" % (table.dbname, table.fq_name)
2222 2223
        self.table_url = copy.deepcopy(status_url)
        self.table_url.pgdb = table.dbname
2224
        self.table_expand_error = False
2225

2226
        SQLCommand.__init__(self, name)
2227

2228 2229
    def run(self, validateAfter=False):
        # connect.
2230 2231 2232 2233 2234
        status_conn = None
        table_conn = None
        table_exp_success = False

        try:
2235
            status_conn = dbconn.connect(self.status_url, encoding='UTF8')
2236
            table_conn = dbconn.connect(self.table_url, encoding='UTF8')
2237
        except DatabaseError, ex:
2238
            if self.options.verbose:
2239 2240 2241 2242
                logger.exception(ex)
            logger.error(ex.__str__().strip())
            if status_conn: status_conn.close()
            if table_conn: table_conn.close()
2243
            self.table_expand_error = True
2244 2245 2246 2247 2248 2249
            return

        # validate table hasn't been dropped
        start_time = None
        try:
            sql = """select * from pg_class c, pg_namespace n
2250 2251
            where c.oid = %d and n.oid = c.relnamespace and n.oid=%d""" % (self.table.table_oid,
                                                                           self.table.schema_oid)
2252 2253 2254 2255

            cursor = dbconn.execSQL(table_conn, sql)

            if cursor.rowcount == 0:
2256
                logger.info('%s no longer exists in database %s' % (self.table.fq_name.decode('utf-8'),
2257 2258 2259 2260 2261 2262 2263 2264 2265 2266
                                                                       self.table.dbname.decode('utf-8')))

                self.table.mark_does_not_exist(status_conn, datetime.datetime.now())
                status_conn.close()
                table_conn.close()
                return
            else:
                # Set conn for  cancel
                self.cancel_conn = table_conn
                start_time = datetime.datetime.now()
2267
                if not self.options.simple_progress:
2268
                    self.table.mark_started(status_conn, table_conn, start_time, self.cancel_flag)
2269

2270
                table_exp_success = self.table.expand(table_conn, self.cancel_flag)
2271

2272 2273
        except Exception, ex:
            if ex.__str__().find('canceling statement due to user request') == -1 and not self.cancel_flag:
2274
                self.table_expand_error = True
2275
                if self.options.verbose:
2276 2277 2278 2279 2280
                    logger.exception(ex)
                logger.error('Table %s.%s failed to expand: %s' % (self.table.dbname.decode('utf-8'),
                                                                   self.table.fq_name.decode('utf-8'),
                                                                   ex.__str__().strip()))
            else:
2281
                logger.info('ALTER TABLE of %s.%s canceled' % (
S
Shoaib Lari 已提交
2282
                    self.table.dbname.decode('utf-8'), self.table.fq_name.decode('utf-8')))
2283 2284 2285

        if table_exp_success:
            end_time = datetime.datetime.now()
2286 2287 2288 2289
            # update metadata
            logger.info(
                "Finished expanding %s.%s" % (self.table.dbname.decode('utf-8'), self.table.fq_name.decode('utf-8')))
            self.table.mark_finished(status_conn, start_time, end_time)
2290
        elif not self.options.simple_progress:
2291
            logger.info("Reseting status_detail for %s.%s" % (
S
Shoaib Lari 已提交
2292
                self.table.dbname.decode('utf-8'), self.table.fq_name.decode('utf-8')))
2293 2294
            self.table.reset_started(status_conn)

2295
        # disconnect
2296 2297 2298
        status_conn.close()
        table_conn.close()

2299 2300
    def set_results(self, results):
        raise ExecutionError("TODO:  must implement", None)
2301 2302

    def get_results(self):
2303
        raise ExecutionError("TODO:  must implement", None)
2304 2305

    def was_successful(self):
2306
        raise ExecutionError("TODO:  must implement", None)
2307

2308 2309
    def validate(self, expected_rc=0):
        raise ExecutionError("TODO:  must implement", None)
2310 2311


2312
# ------------------------------- UI Help --------------------------------
2313
def read_hosts_file(hosts_file):
2314 2315
    new_hosts = []
    try:
2316
        f = open(hosts_file, 'r')
2317 2318 2319 2320 2321 2322 2323 2324 2325 2326
        try:
            for l in f:
                if l.strip().startswith('#') or l.strip() == '':
                    continue

                new_hosts.append(l.strip())

        finally:
            f.close()
    except IOError:
2327
        raise ExpansionError('Hosts file %s not found' % hosts_file)
2328 2329 2330 2331

    return new_hosts


2332
def interview_setup(gparray, options):
2333
    help = """
2334 2335 2336 2337
System Expansion is used to add segments to an existing GPDB array.
gpexpand did not detect a System Expansion that is in progress.

Before initiating a System Expansion, you need to provision and burn-in
2338 2339
the new hardware.  Please be sure to run gpcheckperf to make sure the
new hardware is working properly.
2340 2341 2342

Please refer to the Admin Guide for more information."""

2343
    if not ask_yesno(help, "Would you like to initiate a new System Expansion", 'N'):
2344 2345 2346
        logger.info("User Aborted. Exiting...")
        sys.exit(0)

2347
    help = """
2348 2349 2350
This utility can handle some expansion scenarios by asking a few questions.
More complex expansions can be done by providing an input file with
the --input <file>.  Please see the docs for the format of this file. """
M
Marbin Tan 已提交
2351

2352 2353
    standard, message = gparray.isStandardArray()
    if standard == False:
2354
        help = help + """
2355 2356 2357

       The current system appears to be non-standard.
       """
2358 2359
        help = help + message
        help = help + """
2360 2361
       gpexpand may not be able to symmetrically distribute the new segments appropriately.
       It is recommended that you specify your own input file with appropriate values."""
2362 2363 2364
        if not ask_yesno(help, "Are you sure you want to continue with this gpexpand session?", 'N'):
            logger.info("User Aborted. Exiting...")
            sys.exit(0)
M
Marbin Tan 已提交
2365

2366 2367 2368 2369 2370 2371
    help = help + """

We'll now ask you a few questions to try and build this file for you.
You'll have the opportunity to save this file and inspect it/modify it
before continuing by re-running this utility and providing the input file. """

2372 2373
    def datadir_validator(input_value, *args):
        if not input_value or input_value.find(' ') != -1 or input_value == '':
2374 2375
            return None
        else:
2376
            return input_value
2377 2378

    if options.hosts_file:
2379
        new_hosts = read_hosts_file(options.hosts_file)
2380
    else:
2381 2382 2383 2384
        new_hosts = ask_list(None,
                             "\nEnter a comma separated list of new hosts you want\n" \
                             "to add to your array.  Do not include interface hostnames.\n" \
                             "**Enter a blank line to only add segments to existing hosts**", [])
2385 2386 2387 2388 2389 2390 2391 2392 2393
        new_hosts = [host.strip() for host in new_hosts]

    num_new_hosts = len(new_hosts)

    mirror_type = 'none'

    if gparray.get_mirroring_enabled():
        if num_new_hosts < 2:
            raise ExpansionError('You must be adding two or more hosts when expanding a system with mirroring enabled.')
2394 2395 2396 2397 2398 2399 2400 2401
        mirror_type = ask_string(
            "\nYou must now specify a mirroring strategy for the new hosts.  Spread mirroring places\n" \
            "a given hosts mirrored segments each on a separate host.  You must be \n" \
            "adding more hosts than the number of segments per host to use this. \n" \
            "Grouped mirroring places all of a given hosts segments on a single \n" \
            "mirrored host.  You must be adding at least 2 hosts in order to use this.\n\n",
            "What type of mirroring strategy would you like?",
            'grouped', ['spread', 'grouped'])
2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429

    try:
        gparray.addExpansionHosts(new_hosts, mirror_type)
        gparray.validateExpansionSegs()
    except Exception, ex:
        num_new_hosts = 0
        if ex.__str__() == 'No new hosts to add':
            print
            print '** No hostnames were given that do not already exist in the **'
            print '** array. Additional segments will be added existing hosts. **'
        else:
            raise

    help = """
    By default, new hosts are configured with the same number of primary
    segments as existing hosts.  Optionally, you can increase the number
    of segments per host.

    For example, if existing hosts have two primary segments, entering a value
    of 2 will initialize two additional segments on existing hosts, and four
    segments on new hosts.  In addition, mirror segments will be added for
    these new primary segments if mirroring is enabled.
    """
    num_new_datadirs = ask_int(help, "How many new primary segments per host do you want to add?", None, 0, 0, 128)

    if num_new_datadirs > 0:
        new_datadirs = []
        new_mirrordirs = []
M
Marbin Tan 已提交
2430

2431 2432
        for i in range(1, num_new_datadirs + 1):
            new_datadir = ask_input(None, 'Enter new primary data directory %d' % i, '',
2433 2434
                                    '/data/gpdb_p%d' % i, datadir_validator, None)
            new_datadirs.append(new_datadir.strip())
M
Marbin Tan 已提交
2435

2436
        if len(new_datadirs) != num_new_datadirs:
2437 2438
            raise ExpansionError(
                'The number of data directories entered does not match the number of primary segments added')
2439 2440

        if gparray.get_mirroring_enabled():
2441 2442
            for i in range(1, num_new_datadirs + 1):
                new_mirrordir = ask_input(None, 'Enter new mirror data directory %d' % i, '',
2443 2444
                                          '/data/gpdb_m%d' % i, datadir_validator, None)
                new_mirrordirs.append(new_mirrordir.strip())
M
Marbin Tan 已提交
2445

2446
            if len(new_mirrordirs) != num_new_datadirs:
2447 2448 2449 2450 2451 2452 2453
                raise ExpansionError(
                    'The number of new mirror data directories entered does not match the number of segments added')

        gparray.addExpansionDatadirs(datadirs=new_datadirs
                                     , mirrordirs=new_mirrordirs
                                     , mirror_type=mirror_type
                                     )
2454 2455 2456 2457
        try:
            gparray.validateExpansionSegs()
        except Exception, ex:
            if ex.__str__().find('Port') == 0:
2458 2459
                raise ExpansionError(
                    'Current primary and mirror ports are contiguous.  The input file for gpexpand will need to be created manually.')
2460 2461 2462 2463 2464
    elif num_new_hosts == 0:
        raise ExpansionError('No new hosts or segments were entered.')

    print "\nGenerating configuration file...\n"

2465
    outfile = _gp_expand.generate_inputfile()
2466 2467

    outFileStr = ""
2468
    outFileStr = """\nInput configuration file was written to '%s'.""" % (outfile)
2469 2470 2471 2472 2473 2474 2475

    print outFileStr
    print """Please review the file and make sure that it is correct then re-run
with: gpexpand -i %s %s
                """ % (outfile, '-D %s' % options.database if options.database else '')


2476
def sig_handler(sig):
2477 2478
    if _gp_expand is not None:
        _gp_expand.shutdown()
2479 2480 2481 2482 2483 2484 2485 2486

    signal.signal(signal.SIGTERM, signal.SIG_DFL)
    signal.signal(signal.SIGHUP, signal.SIG_DFL)

    # raise sig
    os.kill(os.getpid(), sig)


2487 2488 2489
# --------------------------------------------------------------------------
# Main
# --------------------------------------------------------------------------
2490 2491
def main(options, args, parser):
    global _gp_expand
2492

2493
    remove_pid = True
2494 2495 2496 2497
    try:
        # setup signal handlers so we can clean up correctly
        signal.signal(signal.SIGTERM, sig_handler)
        signal.signal(signal.SIGHUP, sig_handler)
2498

2499
        logger = get_default_logger()
2500
        setup_tool_logging(EXECNAME, getLocalHostname(), getUserName())
2501

2502
        options, args = validate_options(options, args, parser)
2503

2504 2505
        if options.verbose:
            enable_verbose_logging()
2506

2507 2508 2509 2510 2511 2512 2513
        if is_gpexpand_running(options.master_data_directory):
            logger.error('gpexpand is already running.  Only one instance')
            logger.error('of gpexpand is allowed at a time.')
            remove_pid = False
            sys.exit(1)
        else:
            create_pid_file(options.master_data_directory)
2514

2515 2516
        # prepare provider for updateSystemConfig
        gpEnv = GpMasterEnvironment(options.master_data_directory, True)
2517 2518
        configurationInterface.registerConfigurationProvider(
            configurationImplGpdb.GpConfigurationProviderUsingGpdbCatalog())
2519
        configurationInterface.getConfigurationProvider().initializeProvider(gpEnv.getMasterPort())
2520

2521 2522 2523
        dburl = dbconn.DbURL()
        if options.database:
            dburl.pgdb = options.database
2524

2525
        gpexpand_db_status = gpexpand.prepare_gpdb_state(logger, dburl, options)
2526

2527 2528
        # Get array configuration
        try:
2529
            gparray = GpArray.initFromCatalog(dburl, utility=True)
2530 2531 2532 2533 2534 2535
        except DatabaseError, ex:
            logger.error('Failed to connect to database.  Make sure the')
            logger.error('Greenplum instance you wish to expand is running')
            logger.error('and that your environment is correct, then rerun')
            logger.error('gexpand ' + ' '.join(sys.argv[1:]))
            sys.exit(1)
2536

2537
        _gp_expand = gpexpand(logger, gparray, dburl, options, parallel=options.parallel)
2538

2539 2540
        gpexpand_file_status = None
        if not gpexpand_db_status:
2541
            gpexpand_file_status = _gp_expand.get_state()
2542

2543
        if options.clean and gpexpand_db_status is not None:
2544
            _gp_expand.cleanup_schema(gpexpand_db_status)
2545 2546
            logger.info('Cleanup Finished.  exiting...')
            sys.exit(0)
2547

2548 2549 2550 2551 2552 2553 2554 2555 2556 2557
        if options.rollback:
            try:
                if gpexpand_db_status:
                    logger.error('A previous expansion is either in progress or has')
                    logger.error('completed.  Since the setup portion of the expansion')
                    logger.error('has finished successfully there is nothing to rollback.')
                    sys.exit(1)
                if gpexpand_file_status is None:
                    logger.error('There is no partially completed setup to rollback.')
                    sys.exit(1)
2558
                _gp_expand.rollback(dburl)
2559 2560
                logger.info('Rollback complete.  Greenplum Database can now be started')
                sys.exit(0)
2561
            except ExpansionError, e:
2562
                logger.error(e)
2563 2564
                sys.exit(1)

2565
        if gpexpand_db_status == 'SETUP DONE' or gpexpand_db_status == 'EXPANSION STOPPED':
2566
            if not _gp_expand.validate_max_connections():
2567
                raise ValidationError()
2568
            _gp_expand.perform_expansion()
2569 2570 2571
        elif gpexpand_db_status == 'EXPANSION STARTED':
            logger.info('It appears the last run of gpexpand did not exit cleanly.')
            logger.info('Resuming the expansion process...')
2572
            if not _gp_expand.validate_max_connections():
2573
                raise ValidationError()
2574
            _gp_expand.perform_expansion()
2575 2576 2577 2578
        elif gpexpand_db_status == 'EXPANSION COMPLETE':
            logger.info('Expansion has already completed.')
            logger.info('If you want to expand again, run gpexpand -c to remove')
            logger.info('the gpexpand schema and begin a new expansion')
2579
        elif gpexpand_db_status is None and gpexpand_file_status is None and options.filename:
2580
            if not _gp_expand.validate_unalterable_tables():
2581
                raise ValidationError()
2582
            if _gp_expand.check_unique_indexes():
2583 2584 2585
                logger.warn("Tables with unique indexes exist.  Until these tables are successfully")
                logger.warn("redistributed, unique constraints may be violated.  For more information")
                logger.warn("on this issue, see the Greenplum Database Administrator Guide")
2586
                if not options.silent:
2587
                    if not ask_yesno(None, "Would you like to continue with System Expansion", 'N'):
2588
                        raise ValidationError()
2589 2590
            _gp_expand.validate_heap_checksums()
            newSegList = _gp_expand.read_input_files()
2591 2592 2593
            _gp_expand.addNewSegments(newSegList)
            _gp_expand.sync_packages()
            _gp_expand.start_prepare()
2594
            _gp_expand.lock_catalog()
2595 2596 2597
            _gp_expand.add_segments()
            _gp_expand.update_original_segments()
            _gp_expand.cleanup_new_segments()
2598 2599
            _gp_expand.update_catalog()
            _gp_expand.unlock_catalog()
2600 2601
            _gp_expand.setup_schema()
            _gp_expand.prepare_schema()
2602
            _gp_expand.sync_new_mirrors()
2603 2604 2605 2606 2607
            logger.info('************************************************')
            logger.info('Initialization of the system expansion complete.')
            logger.info('To begin table expansion onto the new segments')
            logger.info('rerun gpexpand')
            logger.info('************************************************')
2608
        elif options.filename is None and gpexpand_file_status is None:
2609
            interview_setup(gparray, options)
2610 2611 2612
        else:
            logger.error('The last gpexpand setup did not complete successfully.')
            logger.error('Please run gpexpand -r to rollback to the original state.')
2613

2614 2615
        logger.info("Exiting...")
        sys.exit(0)
2616

2617 2618
    except ValidationError:
        logger.info('Bringing Greenplum Database back online...')
2619 2620
        if _gp_expand is not None:
            _gp_expand.shutdown()
2621
        sys.exit()
2622
    except Exception, e:
2623
        if options and options.verbose:
2624
            logger.exception("gpexpand failed. exiting...")
2625
        else:
2626
            logger.error("gpexpand failed: %s \n\nExiting..." % e)
2627
        if _gp_expand is not None and _gp_expand.pastThePointOfNoReturn == True:
2628 2629
            logger.error(
                'gpexpand is past the point of rollback. Any remaining issues must be addressed outside of gpexpand.')
2630
        if _gp_expand is not None:
2631
            if not (gpexpand_db_status is None and _gp_expand.get_state() is None):
2632
                if _gp_expand.pastThePointOfNoReturn == False:
2633
                    logger.error('Please run \'gpexpand -r%s\' to rollback to the original state.' % (
S
Shoaib Lari 已提交
2634
                        '' if not options.database else ' -D %s' % options.database))
2635
            _gp_expand.shutdown()
2636 2637 2638
        sys.exit(3)
    except KeyboardInterrupt:
        # Disable SIGINT while we shutdown.
2639
        signal.signal(signal.SIGINT, signal.SIG_IGN)
2640

2641 2642
        if _gp_expand is not None:
            _gp_expand.shutdown()
2643

2644
        # Re-enabled SIGINT
2645
        signal.signal(signal.SIGINT, signal.default_int_handler)
2646

2647
        sys.exit('\nUser Interrupted')
2648 2649


2650 2651
    finally:
        try:
2652
            if remove_pid and options:
2653 2654 2655
                remove_pid_file(options.master_data_directory)
        except NameError:
            pass
M
Marbin Tan 已提交
2656

2657 2658 2659 2660
        if _gp_expand is not None:
            _gp_expand.halt_work()

if __name__ == '__main__':
2661
    options, args, parser = parseargs()
2662
    main(options, args, parser)