提交 9d1ae66c 编写于 作者: J Jamie McAtamney

Refactored argument passing in backup and restore to use a context variable.

Previously, the functions and subclasses in gpcrondump, gpdbrestore, dump.py,
and restore.py all required long lists of arguments to be passed around and
required many helper functions.  This made adding new command-line options
or new features quite time-consuming to implement and to test.

Now, a Context class has been added to backup_utils.py that holds all of the
variables used for command-line options, as well as other variables used in
many places during backup and restore; additionally, many helper functions
have been condensed into a few functions within Context, and in the future
more such functions can be moved to Context as needed.  This will make it
much easier to expand dump and restore functionality going forward.
上级 b71771ce
...@@ -32,11 +32,9 @@ try: ...@@ -32,11 +32,9 @@ try:
from gppylib.gpversion import GpVersion from gppylib.gpversion import GpVersion
from gppylib.db import dbconn from gppylib.db import dbconn
from gppylib.operations.unix import CheckDir, CheckFile, MakeDir from gppylib.operations.unix import CheckDir, CheckFile, MakeDir
from gppylib.operations.dump import get_partition_state_tuples, compare_metadata, \ from gppylib.operations.dump import get_partition_state_tuples, compare_metadata, compare_dict, get_pgstatlastoperations_dict, \
compare_dict, get_pgstatlastoperations_dict, \ write_lines_to_file, verify_lines_in_file, ValidateSchemaExists
write_lines_to_file, verify_lines_in_file, ValidateIncludeTargets, \ from gppylib.operations.backup_utils import execute_sql, get_lines_from_file, Context
ValidateSchemaExists
from gppylib.operations.backup_utils import execute_sql, get_lines_from_file
from pygresql import pg from pygresql import pg
except ImportError, e: except ImportError, e:
...@@ -273,6 +271,11 @@ class AnalyzeDb(Operation): ...@@ -273,6 +271,11 @@ class AnalyzeDb(Operation):
global CURR_TIME global CURR_TIME
CURR_TIME = generate_timestamp() CURR_TIME = generate_timestamp()
# The below object is needed for some functions imported from dump.py
self.context = Context()
self.context.master_port = self.pg_port
self.context.dump_database = self.dbname
# The input_col_dict keeps track of the requested columns to analyze. # The input_col_dict keeps track of the requested columns to analyze.
# key: table name (e.g. 'public.foo') # key: table name (e.g. 'public.foo')
# value: a set of requested column names (e.g. set(['col1','col2']), or '-1' indicating all columns # value: a set of requested column names (e.g. set(['col1','col2']), or '-1' indicating all columns
...@@ -447,7 +450,7 @@ class AnalyzeDb(Operation): ...@@ -447,7 +450,7 @@ class AnalyzeDb(Operation):
self._parse_column(col_dict, self.single_table, schema, table, self.include_cols, self.exclude_cols, True) self._parse_column(col_dict, self.single_table, schema, table, self.include_cols, self.exclude_cols, True)
elif self.schema: # all tables in a schema elif self.schema: # all tables in a schema
ValidateSchemaExists(self.dbname, self.schema, self.pg_port).run() ValidateSchemaExists(self.context, self.schema).run()
logger.debug("getting all tables in the schema...") logger.debug("getting all tables in the schema...")
all_schema_tables = run_sql(self.conn, GET_ALL_DATA_TABLES_IN_SCHEMA_SQL % self.schema) all_schema_tables = run_sql(self.conn, GET_ALL_DATA_TABLES_IN_SCHEMA_SQL % self.schema)
# convert table name from ['public','foo'] to 'public.foo' and populate col_dict as all columns requested # convert table name from ['public','foo'] to 'public.foo' and populate col_dict as all columns requested
...@@ -547,7 +550,7 @@ class AnalyzeDb(Operation): ...@@ -547,7 +550,7 @@ class AnalyzeDb(Operation):
logger.debug("getting ao state...") logger.debug("getting ao state...")
oid_str = get_oid_str(input_tables_set) oid_str = get_oid_str(input_tables_set)
ao_partition_info = run_sql(self.conn, GET_REQUESTED_AO_DATA_TABLE_INFO_SQL % oid_str) ao_partition_info = run_sql(self.conn, GET_REQUESTED_AO_DATA_TABLE_INFO_SQL % oid_str)
return get_partition_state_tuples(self.pg_port, self.dbname, 'pg_aoseg', ao_partition_info) return get_partition_state_tuples(self.context, 'pg_aoseg', ao_partition_info)
def _get_lastop_state(self, input_tables_set): def _get_lastop_state(self, input_tables_set):
# oid, action, subtype, timestamp # oid, action, subtype, timestamp
......
此差异已折叠。
此差异已折叠。
...@@ -1340,7 +1340,7 @@ def get_gphome(): ...@@ -1340,7 +1340,7 @@ def get_gphome():
def get_masterdatadir(): def get_masterdatadir():
logger.debug("Checking if MASTER_DATA_DIRECTORY env variable is set.") logger.debug("Checking if MASTER_DATA_DIRECTORY env variable is set.")
master_datadir = os.environ.get('MASTER_DATA_DIRECTORY') master_datadir = os.environ.get('MASTER_DATA_DIRECTORY')
if master_datadir is None: if not master_datadir:
raise GpError("Environment Variable MASTER_DATA_DIRECTORY not set!") raise GpError("Environment Variable MASTER_DATA_DIRECTORY not set!")
return master_datadir return master_datadir
......
...@@ -761,6 +761,18 @@ class Segment: ...@@ -761,6 +761,18 @@ class Segment:
if (prim_status, prim_mode, mirror_status, mirror_role) not in VALID_SEGMENT_STATES: if (prim_status, prim_mode, mirror_status, mirror_role) not in VALID_SEGMENT_STATES:
return False return False
return True return True
def get_active_primary(self):
if self.primaryDB.isSegmentPrimary(current_role=True):
return self.primaryDB
else:
for mirror in mirrorDBs:
if mirror.isSegmentPrimary(current_role=True):
return mirror
def get_primary_dbid(self):
return self.primaryDB.getSegmentDbId()
# -------------------------------------------------------------------- # --------------------------------------------------------------------
# -------------------------------------------------------------------- # --------------------------------------------------------------------
class SegmentRow(): class SegmentRow():
...@@ -1723,6 +1735,14 @@ class GpArray: ...@@ -1723,6 +1735,14 @@ class GpArray:
dbs.extend(seg.get_dbs()) dbs.extend(seg.get_dbs())
return dbs return dbs
# --------------------------------------------------------------------
def getSegmentList(self, includeExpansionSegs=False):
"""Return a list of all GpDb objects for all segments in the array"""
dbs=[]
dbs.extend(self.segments)
if includeExpansionSegs:
dbs.extend(self.expansionSegments)
return dbs
# -------------------------------------------------------------------- # --------------------------------------------------------------------
def getSegDbMap(self): def getSegDbMap(self):
......
...@@ -21,7 +21,7 @@ from gppylib.operations.utils import ParallelOperation, RemoteOperation ...@@ -21,7 +21,7 @@ from gppylib.operations.utils import ParallelOperation, RemoteOperation
from gppylib.operations.unix import CleanSharedMem from gppylib.operations.unix import CleanSharedMem
from gppylib.operations.filespace import PG_SYSTEM_FILESPACE, GP_TRANSACTION_FILES_FILESPACE, GP_TEMPORARY_FILES_FILESPACE, GetMoveOperationList, GetFilespaceEntriesDict, GetFilespaceEntries, GetCurrentFilespaceEntries, RollBackFilespaceChanges, UpdateFlatFiles, FileType, MoveFilespaceError from gppylib.operations.filespace import PG_SYSTEM_FILESPACE, GP_TRANSACTION_FILES_FILESPACE, GP_TEMPORARY_FILES_FILESPACE, GetMoveOperationList, GetFilespaceEntriesDict, GetFilespaceEntries, GetCurrentFilespaceEntries, RollBackFilespaceChanges, UpdateFlatFiles, FileType, MoveFilespaceError
from gppylib.commands.gp import is_pid_postmaster, get_pid_from_remotehost from gppylib.commands.gp import is_pid_postmaster, get_pid_from_remotehost
from gppylib.commands.unix import check_pid_on_remotehost from gppylib.commands.unix import check_pid_on_remotehost, Scp
logger = get_default_logger() logger = get_default_logger()
...@@ -533,9 +533,9 @@ class GpMirrorListToBuild: ...@@ -533,9 +533,9 @@ class GpMirrorListToBuild:
# #
# Copy remote files from the sample segment to the master # Copy remote files from the sample segment to the master
# #
for toCopyFromRemote in ["postgresql.conf", "pg_hba.conf"]: for toCopyFromRemote in ["postgresql.conf", "pg_hba.conf", "db_dumps"]:
cmd = gp.RemoteCopy('copying %s from a segment' % toCopyFromRemote, cmd = gp.RemoteCopy('copying %s from a segment' % toCopyFromRemote,
sampleSegment.getSegmentDataDirectory() + '/' + toCopyFromRemote, os.path.join(sampleSegment.getSegmentDataDirectory(), toCopyFromRemote),
masterSegment.getSegmentHostName(), schemaDir, ctxt=base.REMOTE, masterSegment.getSegmentHostName(), schemaDir, ctxt=base.REMOTE,
remoteHost=sampleSegment.getSegmentAddress()) remoteHost=sampleSegment.getSegmentAddress())
cmd.run(validateAfter=True) cmd.run(validateAfter=True)
...@@ -632,6 +632,21 @@ class GpMirrorListToBuild: ...@@ -632,6 +632,21 @@ class GpMirrorListToBuild:
cmds.append(createConfigureNewSegmentCommand(hostName, 'configure blank segments', False)) cmds.append(createConfigureNewSegmentCommand(hostName, 'configure blank segments', False))
self.__runWaitAndCheckWorkerPoolForErrorsAndClear(cmds, "unpacking basic segment directory") self.__runWaitAndCheckWorkerPoolForErrorsAndClear(cmds, "unpacking basic segment directory")
#
# copy dump files from old segment to new segment
#
for srcSeg in srcSegments:
for destSeg in destSegments:
if srcSeg.content == destSeg.content:
cmd = Scp('copy db_dumps from old segment to new segment',
os.path.join(srcSeg.getSegmentDataDirectory(), 'db_dumps*', '*'),
os.path.join(destSeg.getSegmentDataDirectory(), 'db_dumps'),
srcSeg.getSegmentAddress(),
destSeg.getSegmentAddress(),
recursive=True)
cmd.run(validateAfter=True)
break
# #
# Clean up copied tar from each remote host # Clean up copied tar from each remote host
# #
......
...@@ -10,7 +10,7 @@ Create table " co_T`~@#$%^&*()-+[{]}|\;: \'""/?><1 " (Column1 int, Column2 varch ...@@ -10,7 +10,7 @@ Create table " co_T`~@#$%^&*()-+[{]}|\;: \'""/?><1 " (Column1 int, Column2 varch
(Partition p1 values('backup') , Partition p2 values('restore')) ; (Partition p1 values('backup') , Partition p2 values('restore')) ;
Create table " ao_T`~@#$%^&*()-+[{]}|\;: \'""/?><1 " (Column1 int, Column2 varchar(20), Column3 date) Create table " ao_T`~@#$%^&*()-+[{]}|\;: \'""/?><1 " (Column1 int, Column2 varchar(20), Column3 date)
WITH(appendonly = true, orientation = row, compresstype = quicklz) WITH(appendonly = true, orientation = row)
Distributed Randomly Partition by list(Column2) Distributed Randomly Partition by list(Column2)
Subpartition by range(Column3) Subpartition Template Subpartition by range(Column3) Subpartition Template
(start (date '2014-01-01') end (date '2016-01-01') every (interval '1 year')) (start (date '2014-01-01') end (date '2016-01-01') every (interval '1 year'))
......
...@@ -13,6 +13,7 @@ import yaml ...@@ -13,6 +13,7 @@ import yaml
from gppylib.commands.gp import SegmentStart, GpStandbyStart from gppylib.commands.gp import SegmentStart, GpStandbyStart
from gppylib.commands.unix import findCmdInPath from gppylib.commands.unix import findCmdInPath
from gppylib.operations.backup_utils import Context
from gppylib.operations.dump import get_partition_state from gppylib.operations.dump import get_partition_state
from gppylib.operations.startSegments import MIRROR_MODE_MIRRORLESS from gppylib.operations.startSegments import MIRROR_MODE_MIRRORLESS
from gppylib.operations.unix import ListRemoteFilesByPattern, CheckRemoteFile from gppylib.operations.unix import ListRemoteFilesByPattern, CheckRemoteFile
...@@ -1498,10 +1499,12 @@ def impl(context, table, dbname, ao_table): ...@@ -1498,10 +1499,12 @@ def impl(context, table, dbname, ao_table):
ao_sch, ao_tbl = ao_table.split('.') ao_sch, ao_tbl = ao_table.split('.')
part_info = [(1, ao_sch, ao_tbl, tbl)] part_info = [(1, ao_sch, ao_tbl, tbl)]
try: try:
backup_utils = Context()
backup_utils.master_port = os.environ.get('PGPORT')
backup_utils.dump_database = dbname
context.exception = None context.exception = None
context.partition_list_res = None context.partition_list_res = None
context.partition_list_res = get_partition_state(master_port=os.environ.get('PGPORT'), context.partition_list_res = get_partition_state(backup_utils, sch, part_info)
dbname=dbname, catalog_schema=sch, partition_info=part_info)
except Exception as e: except Exception as e:
context.exception = e context.exception = e
......
...@@ -346,7 +346,7 @@ def check_table_exists(context, dbname, table_name, table_type=None, host=None, ...@@ -346,7 +346,7 @@ def check_table_exists(context, dbname, table_name, table_type=None, host=None,
SQL = """ SQL = """
select c.oid, c.relkind, c.relstorage, c.reloptions select c.oid, c.relkind, c.relstorage, c.reloptions
from pg_class c, pg_namespace n from pg_class c, pg_namespace n
where c.relname = E'%s' and n.nspname = E'%s' and c.relnamespace = n.oid; where c.relname = '%s' and n.nspname = '%s' and c.relnamespace = n.oid;
""" % (pg.escape_string(tablename), pg.escape_string(schemaname)) """ % (pg.escape_string(tablename), pg.escape_string(schemaname))
else: else:
SQL = """ SQL = """
...@@ -1475,4 +1475,4 @@ def remove_local_path(dirname): ...@@ -1475,4 +1475,4 @@ def remove_local_path(dirname):
def validate_local_path(path): def validate_local_path(path):
list = glob.glob(os.path.join(os.path.curdir, path)) list = glob.glob(os.path.join(os.path.curdir, path))
return len(list) return len(list)
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册