提交 92763dd8 编写于 作者: P Pengcheng Tang

gpdbrestore to validate table names before starting the restore process

gpdbrestore now performs pre validation of tables (also partition tables)
in the dump file, this fix applies to ddboost, netbackup, and other remote
dump location.

Also made some refactoring of the code.

Authors:
Pengcheng Tang, Nikhil Kak,
Lawrence Hamel, Marbin Victor Tan
上级 8ae2a5e8
......@@ -6,8 +6,9 @@ from gppylib.mainUtils import addMasterDirectoryOptionForSingleClusterProgram, a
ExceptionNoStackTraceNeeded, ProgramArgumentValidationException, UserAbortedException
import fnmatch
import os
import stat
import sys
from optparse import OptionGroup, OptionValueError
from optparse import OptionGroup, OptionValueError, SUPPRESS_HELP
from pygresql import pg
try:
......@@ -25,13 +26,15 @@ try:
from gppylib.gpparseopts import OptParser, OptChecker
from gppylib.operations import Operation
from gppylib.operations.backup_utils import check_dir_writable, generate_createdb_prefix, generate_master_dbdump_prefix, \
generate_report_filename, get_lines_from_file, check_schema_exists, check_funny_chars_in_names
generate_report_filename, get_lines_from_file, check_schema_exists, check_funny_chars_in_names, \
get_full_timestamp_for_incremental, get_master_dump_file
from gppylib.operations.restore import GetDbName, GetDumpTables, RecoverRemoteDumps, RestoreDatabase, ValidateTimestamp, \
config_files_dumped, create_restore_plan, get_restore_dir, get_restore_tables_from_table_file, \
global_file_dumped, is_begin_incremental_run, is_incremental_restore, restore_cdatabase_file_with_nbu, \
restore_config_files_with_nbu, restore_global_file_with_nbu, restore_statistics_file_with_nbu, \
restore_increments_file_with_nbu, restore_partition_list_file_with_nbu, restore_report_file_with_nbu, \
restore_state_files_with_nbu, statistics_file_dumped, truncate_restore_tables, validate_tablenames
restore_state_files_with_nbu, statistics_file_dumped, truncate_restore_tables, check_table_name_format_and_duplicate, \
get_incremental_restore_timestamps, validate_tablenames_exist_in_dump_file
from gppylib.operations.utils import DEFAULT_NUM_WORKERS
from gppylib.operations.unix import CheckFile, CheckRemoteFile, ListFilesByPattern, ListRemoteFilesByPattern
except ImportError, e:
......@@ -89,6 +92,7 @@ class GpdbRestore(Operation):
self.restore_stats = options.restore_stats
self.metadata_only = options.metadata_only
self.schema_level_restore_list = options.restore_schemas
self.no_validate_table_name = options.no_validate_table_name
if self.restore_stats:
self.no_analyze = True
......@@ -211,10 +215,14 @@ class GpdbRestore(Operation):
if statistics_file_dumped(self.master_datadir, self.backup_dir, self.dump_dir, self.dump_prefix, self.db_timestamp, self.netbackup_service_host):
restore_statistics_file_with_nbu(self.master_datadir, self.backup_dir, self.dump_dir, self.dump_prefix, self.db_timestamp, self.netbackup_service_host, self.netbackup_block_size)
info = self._gather_dump_info()
if self.list_tables and self.db_timestamp is not None:
return self._list_dump_tables()
return self._list_dump_tables(info['restore_timestamp'], info['dump_host'], info['dump_file'], info['compress'])
info.update(self._gather_cluster_info())
info = self._gather_info()
self.validate_tablename_from_filtering_options(info['restore_timestamp'], info['dump_host'], info['dump_file'], info['compress'])
if self.restore_stats == "only":
restore_type = "Statistics-Only Restore"
......@@ -380,14 +388,20 @@ class GpdbRestore(Operation):
pgconf_dict = pgconf.readfile(datadir + "/postgresql.conf")
return pgconf_dict.int('port')
def _list_dump_tables(self):
dump_tables = GetDumpTables(master_datadir = self.master_datadir,
backup_dir = self.backup_dir,
restore_timestamp = self.db_timestamp,
dump_dir = self.dump_dir,
dump_prefix = self.dump_prefix,
ddboost = self.ddboost,
netbackup_service_host = self.netbackup_service_host).run()
def _list_dump_tables(self, restore_timestamp, dump_host, dump_file, compress):
"""
Only list dumped tables for specified timestamp, this is specified through the -t option along with the -L option
"""
dump_tables = GetDumpTables(master_datadir=self.master_datadir,
backup_dir=self.backup_dir,
restore_timestamp=restore_timestamp,
dump_dir=self.dump_dir,
dump_prefix=self.dump_prefix,
compress=compress,
ddboost=self.ddboost,
netbackup_service_host=self.netbackup_service_host,
remote_host=dump_host,
dump_file=dump_file).get_dump_tables()
logger.info("--------------------------------------------------------------------")
logger.info("List of database tables for dump file with time stamp %s" % self.db_timestamp)
......@@ -405,7 +419,11 @@ class GpdbRestore(Operation):
logger.info("Backup Timestamp: %s" % (line.split(':')[0].strip()))
logger.info("--------------------------------------------------------------------")
def _gather_info(self):
def _gather_cluster_info(self):
"""
Checking cluster status, no primary should be down.
"""
fault_action = self.gparray.getFaultStrategy()
primaries = [seg for seg in self.gparray.getDbList() if seg.isSegmentPrimary(current_role=True)]
fail_count = len([seg for seg in primaries if seg.isSegmentDown()])
......@@ -416,7 +434,15 @@ class GpdbRestore(Operation):
logger.info("Use gprecoverseg utility to recover failed segment instances")
raise ExceptionNoStackTraceNeeded("Unable to continue")
(restore_timestamp, restore_db, compress) = (None, None, None)
return {'fault_action': fault_action,
'fail_count': fail_count}
def _gather_dump_info(self):
"""
Validate the restore timestamp, collect information of the dump path and location.
"""
(restore_timestamp, restore_db, compress, dump_host, dump_file) = (None, None, None, None, None)
if self.db_timestamp is not None:
(restore_timestamp, restore_db, compress) = ValidateTimestamp(candidate_timestamp = self.db_timestamp,
master_datadir = self.master_datadir,
......@@ -428,7 +454,7 @@ class GpdbRestore(Operation):
elif self.db_date_dir is not None:
(restore_timestamp, restore_db, compress) = self._validate_db_date_dir()
elif self.db_host_path is not None:
(restore_timestamp, restore_db, compress) = self._validate_db_host_path()
(restore_timestamp, restore_db, compress, dump_host, dump_file) = self._validate_db_host_path()
elif self.search_for_dbname is not None:
(restore_timestamp, restore_db, compress) = self._search_for_latest()
......@@ -441,22 +467,50 @@ class GpdbRestore(Operation):
raise ExceptionNoStackTraceNeeded("Database %s does not exist and -e option not supplied" % restore_db)
table_counts = []
if len(self.restore_tables) > 0 or len(self.schema_level_restore_list) > 0:
self.restore_tables, self.schema_level_restore_list = validate_tablenames(self.restore_tables, self.master_datadir, self.backup_dir, self.dump_dir,
self.dump_prefix, restore_timestamp, self.schema_level_restore_list)
if not self.db_host_path and not self.ddboost:
report_filename = generate_report_filename(self.master_datadir, self.backup_dir, self.dump_dir, self.dump_prefix, restore_timestamp)
if not os.path.isfile(report_filename):
raise ExceptionNoStackTraceNeeded("Report file does not exist for the given restore timestamp %s: '%s'" % (restore_timestamp, report_filename))
return {'fault_action': fault_action,
'fail_count': fail_count,
'restore_timestamp': restore_timestamp,
if not self.db_host_path:
dump_file = get_master_dump_file(self.master_datadir, self.backup_dir, self.dump_dir, restore_timestamp, self.dump_prefix, self.ddboost)
dump_file = dump_file + ('.gz' if compress else '')
return {'restore_timestamp': restore_timestamp,
'restore_db': restore_db,
'compress': compress,
'dump_host': dump_host,
'dump_file': dump_file,
'table_counts': table_counts}
def validate_tablename_from_filtering_options(self, restore_timestamp, dump_host=None, dump_file=None, compress=None):
"""
This validates table name format and resolves duplicates of user inputs from any of the gpdbrestore filtering options
"""
if len(self.restore_tables) > 0 or len(self.schema_level_restore_list) > 0:
self.restore_tables, self.schema_level_restore_list = check_table_name_format_and_duplicate(self.restore_tables, self.schema_level_restore_list)
if not self.no_validate_table_name and len(self.restore_tables) > 0:
is_remote_service = self.ddboost or self.netbackup_service_host or self.db_host_path
if not is_remote_service and stat.S_ISFIFO(os.stat(dump_file).st_mode):
logger.warn('Skipping validation of tables in dump file due to the use of named pipes')
else:
dumped_tables = GetDumpTables(
restore_timestamp = restore_timestamp,
master_datadir = self.master_datadir,
backup_dir = self.backup_dir,
dump_dir = self.dump_dir,
dump_prefix = self.dump_prefix,
compress = compress,
ddboost = self.ddboost,
netbackup_service_host = self.netbackup_service_host,
remote_host = dump_host,
dump_file = dump_file).get_dump_tables()
validate_tablenames_exist_in_dump_file(self.restore_tables, dumped_tables)
def _validate_db_date_dir(self):
root = os.path.join(get_restore_dir(self.master_datadir, self.backup_dir), self.dump_dir, self.db_date_dir)
if not os.path.isdir(root):
......@@ -477,24 +531,30 @@ class GpdbRestore(Operation):
temp = match[len(generate_createdb_prefix(self.dump_prefix)):]
restore_timestamp = temp[0:14]
restore_db = GetDbName(os.path.join(root, "%s%s" % (generate_createdb_prefix(self.dump_prefix), restore_timestamp))).run()
compressed_file = "%s%s.gz" % (generate_master_dbdump_prefix(self.dump_prefix), restore_timestamp)
compress = CheckFile(os.path.join(root, compressed_file)).run()
if not self.ddboost:
compressed_file = os.path.join(root, "%s%s.gz" % (generate_master_dbdump_prefix(self.dump_prefix), restore_timestamp))
else:
compressed_file = os.path.join(root, "%s%s_post_data.gz" % (generate_master_dbdump_prefix(self.dump_prefix), restore_timestamp))
compress = CheckFile(compressed_file).run()
return (restore_timestamp, restore_db, compress)
def _validate_db_host_path(self):
# The format of the -R option should be 'hostname:path_to_dumpset', hence the length should be 2 (hostname, path_to_dumpset)
if len(self.db_host_path) != 2:
raise ProgramArgumentValidationException("The arguments of the -R flag are incorrect. The correct form should be as follows:\nIPv4_address:path_to_dumpset\n-OR-\n[IPv6_address]:path_to_dumpset\n", False)
raise ProgramArgumentValidationException("The arguments of the -R flag are incorrect. The correct form should be as "
"follows:\nIPv4_address:path_to_dumpset\n-OR-\n[IPv6_address]:path_to_dumpset\n", False)
host, path = self.db_host_path
logger.debug("The host is %s" % host)
logger.debug("The path is %s" % path)
Ping.local('Pinging %s' % host, host)
matching = ListRemoteFilesByPattern(path, "%s*" % generate_createdb_prefix(self.dump_prefix), host).run()
if len(matching) == 0:
raise ExceptionNoStackTraceNeeded("Could not locate Master database dump files at %s:%s" % (host, path))
matching = sorted(matching, key=lambda x: int(x[len(generate_createdb_prefix(self.dump_prefix)):].strip()))
if len(matching) > 1:
dates_and_times = []
for match in matching:
......@@ -514,9 +574,21 @@ class GpdbRestore(Operation):
restore_db = GetDbName(filename).run()
os.remove(filename)
compressed_file = "%s%s.gz" % (generate_master_dbdump_prefix(self.dump_prefix), restore_timestamp)
compress = CheckRemoteFile(os.path.join(path, compressed_file), host).run()
return (restore_timestamp, restore_db, compress)
compress = False
file_name = "%s%s" % (generate_master_dbdump_prefix(self.dump_prefix), restore_timestamp)
uncompressed_file_path = os.path.join(path, file_name)
compressed_file_path = uncompressed_file_path + '.gz'
res = CheckRemoteFile(uncompressed_file_path, host).run()
if not res:
res = CheckRemoteFile(compressed_file_path, host).run()
if not res:
raise Exception("Cannot find dump file %s on remote %s" % (compressed_file_path, host))
else:
compress = True
return (restore_timestamp, restore_db, compress, host, compressed_file_path)
else:
return (restore_timestamp, restore_db, compress, host, uncompressed_file_path)
def _search_for_latest(self):
logger.info("Scanning Master host for latest dump file set for database %s" % self.search_for_dbname)
......@@ -539,7 +611,14 @@ class GpdbRestore(Operation):
restore_timestamp = str(max(timestamps))
logger.info("Identified latest dump timestamp for %s as %s" % (self.search_for_dbname, restore_timestamp))
restore_db = self.search_for_dbname
compressed_file = os.path.join(get_restore_dir(self.master_datadir, self.backup_dir), self.dump_dir, restore_timestamp[0:8], "%s%s.gz" % (generate_master_dbdump_prefix(self.dump_prefix), restore_timestamp))
if not self.ddboost:
compressed_file = os.path.join(get_restore_dir(self.master_datadir, self.backup_dir),
self.dump_dir, restore_timestamp[0:8],
"%s%s.gz" % (generate_master_dbdump_prefix(self.dump_prefix), restore_timestamp))
else:
compressed_file = os.path.join(get_restore_dir(self.master_datadir, self.backup_dir),
self.dump_dir, restore_timestamp[0:8],
"%s%s_post_data.gz" % (generate_master_dbdump_prefix(self.dump_prefix), restore_timestamp))
compress = CheckFile(compressed_file).run()
return (restore_timestamp, restore_db, compress)
......@@ -652,6 +731,8 @@ def create_parser():
help="Different schema name to which tables will be restored")
addTo.add_option('--restore-stats', dest='restore_stats', action="callback", callback=restore_stats_callback,
help="Restore database statistics. Analysis is skipped as if the --noanalyze flag were set.")
# --no-validate-table-name option used only for separating table restore and data restore phases, set as hidden
addTo.add_option('--no-validate-table-name', action='store_true', dest='no_validate_table_name', default=False, help=SUPPRESS_HELP)
parser.add_option_group(addTo)
# For Incremental Restore
......
......@@ -43,14 +43,14 @@ def list_to_quoted_string(filter_tables):
def convert_parents_to_leafs(dbname, parents):
partition_leaves_sql = """
SELECT x.partitionschemaname || '.' || x.partitiontablename
SELECT x.partitionschemaname || '.' || x.partitiontablename
FROM (
SELECT distinct schemaname, tablename, partitionschemaname, partitiontablename, partitionlevel
FROM pg_partitions
SELECT distinct schemaname, tablename, partitionschemaname, partitiontablename, partitionlevel
FROM pg_partitions
WHERE schemaname || '.' || tablename in (%s)
) as X,
(SELECT schemaname, tablename maxtable, max(partitionlevel) maxlevel
FROM pg_partitions
) as X,
(SELECT schemaname, tablename maxtable, max(partitionlevel) maxlevel
FROM pg_partitions
group by (tablename, schemaname)
) as Y
WHERE x.schemaname = y.schemaname and x.tablename = Y.maxtable and x.partitionlevel = Y.maxlevel;
......@@ -222,22 +222,30 @@ def get_dbname_from_cdatabaseline(line):
logger.error('Failed to find substring %s in line %s, error: %s' % (cdatabase, line, str(e)))
return None
with_template = " WITH TEMPLATE = "
all_positions = get_all_occurrences(with_template, line)
if all_positions != None:
keyword = " WITH TEMPLATE = "
pos = get_nonquoted_keyword_index(line, keyword, '"', len(keyword))
if pos != -1:
dbname = line[start+len(cdatabase) : pos]
return dbname
return None
def get_nonquoted_keyword_index(line, keyword, quote, keyword_len):
# quote can be single quote or double quote
all_positions = get_all_occurrences(keyword, line)
if all_positions != None and len(all_positions) > 0:
for pos in all_positions:
pre_string = line[:pos]
post_string = line[pos + len(with_template):]
double_quotes_before = get_all_occurrences('"', pre_string)
double_quotes_after = get_all_occurrences('"', post_string)
num_double_quotes_before = 0 if double_quotes_before is None else len(double_quotes_before)
num_double_quotes_after = 0 if double_quotes_after is None else len(double_quotes_after)
if num_double_quotes_before % 2 == 0 and num_double_quotes_after % 2 == 0:
dbname = line[start+len(cdatabase) : pos]
return dbname
return None
post_string = line[pos + keyword_len:]
quotes_before = get_all_occurrences('%s' % quote, pre_string)
quotes_after = get_all_occurrences('%s' % quote, post_string)
num_quotes_before = 0 if (quotes_before is None or len(quotes_before) == 0) else len(quotes_before)
num_quotes_after = 0 if (quotes_after is None or len(quotes_after) == 0) else len(quotes_after)
if num_quotes_before % 2 == 0 and num_quotes_after % 2 == 0:
return pos
return -1
def get_all_occurrences(substr, line):
# substr is used for generating the pattern, escape those special chars in regexp
if substr is None or line is None or len(substr) > len(line):
return None
return [m.start() for m in re.finditer('(?=%s)' % substr, line)]
......@@ -517,17 +525,30 @@ def get_timestamp_from_increments_filename(filename, dump_prefix):
raise Exception("Invalid increments file '%s' passed to get_timestamp_from_increments_filename" % filename)
return parts[-2].strip()
def get_full_timestamp_for_incremental(backup_dir, dump_dir, dump_prefix, incremental_timestamp):
pattern = '%s/%s/[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]/%sgp_dump_[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]_increments' % (backup_dir, dump_dir, dump_prefix)
increments_files = glob.glob(pattern)
def get_full_timestamp_for_incremental(master_datadir, dump_dir, dump_prefix, incremental_timestamp, backup_dir=None, ddboost=False, netbackup_service_host=None, netbackup_block_size=None):
full_timestamp = None
if netbackup_service_host:
full_timestamp = get_full_timestamp_for_incremental_with_nbu(dump_prefix, incremental_timestamp, netbackup_service_host, netbackup_block_size)
else:
if ddboost:
backup_dir = master_datadir
else:
backup_dir = get_restore_dir(master_datadir, backup_dir)
for increments_file in increments_files:
increment_ts = get_lines_from_file(increments_file)
if incremental_timestamp in increment_ts:
full_timestamp = get_timestamp_from_increments_filename(increments_file, dump_prefix)
return full_timestamp
pattern = '%s/%s/[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]/%sgp_dump_[0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9][0-9]_increments' % (backup_dir, dump_dir, dump_prefix)
increments_files = glob.glob(pattern)
for increments_file in increments_files:
increment_ts = get_lines_from_file(increments_file)
if incremental_timestamp in increment_ts:
full_timestamp = get_timestamp_from_increments_filename(increments_file, dump_prefix)
break
if not full_timestamp:
raise Exception("Could not locate fullbackup associated with timestamp '%s'. Either increments file or full backup is missing." % incremental_timestamp)
return full_timestamp
return None
# backup_dir will be either MDD or some other directory depending on call
def get_latest_full_dump_timestamp(dbname, backup_dir, dump_dir, dump_prefix, ddboost=False):
......@@ -592,7 +613,7 @@ def check_funny_chars_in_names(names, is_full_qualified_name=True):
"""
'\n' inside table name makes it hard to specify the object name in shell command line,
this may be worked around by using table file, but currently we read input line by line.
'!' inside table name will mess up with the shell history expansion.
'!' inside table name will mess up with the shell history expansion.
',' is used for separating tables in plan file during incremental restore.
'.' dot is currently being used for full qualified table name in format: schema.table
"""
......@@ -720,6 +741,11 @@ def check_schema_exists(schema_name, dbname):
return False
return True
def unescape_string(string):
if string:
string = string.replace('\\\\', '\\').replace("''", "'")
return string
def isDoubleQuoted(string):
if len(string) > 2 and string[0] == '"' and string[-1] == '"':
return True
......@@ -748,7 +774,7 @@ def escapeDoubleQuoteInSQLString(string, forceDoubleQuote=True):
def removeEscapingDoubleQuoteInSQLString(string, forceDoubleQuote=True):
"""
Remove the escaping double quote in database/schema/table name.
Remove the escaping double quote in database/schema/table name.
"""
if string is None:
return string
......@@ -761,7 +787,7 @@ def removeEscapingDoubleQuoteInSQLString(string, forceDoubleQuote=True):
def formatSQLString(rel_file, isTableName=False):
"""
Read the full qualified schema or table name, do a split
Read the full qualified schema or table name, do a split
if each item is a table name into schema and table,
escape the double quote inside the name properly.
"""
......@@ -801,3 +827,59 @@ def remove_file_on_segments(master_port, filename, batch_default=DEFAULT_NUM_WOR
run_pool_command(addresses, cmd, batch_default, check_results=False)
except Exception as e:
logger.error("cleaning up file failed: %s" % e.__str__())
def get_restore_dir(data_dir, backup_dir):
if backup_dir is not None:
return backup_dir
else:
return data_dir
def get_table_info(line):
"""
It's complex to split when table name/schema name/user name/ tablespace name
contains full context of one of others', which is very unlikely, but in
case it happens, return None.
Since we only care about table name, type, and schema name, strip the input
is safe here.
line: contains the true (un-escaped) schema name, table name, and user name.
"""
COMMENT_EXPR = '-- Name: '
TYPE_EXPR = '; Type: '
SCHEMA_EXPR = '; Schema: '
OWNER_EXPR = '; Owner: '
TABLESPACE_EXPR = '; Tablespace: '
temp = line.strip('\n')
type_start = get_all_occurrences(TYPE_EXPR, temp)
schema_start = get_all_occurrences(SCHEMA_EXPR, temp)
owner_start = get_all_occurrences(OWNER_EXPR, temp)
tblspace_start = get_all_occurrences(TABLESPACE_EXPR, temp)
if len(type_start) != 1 or len(schema_start) != 1 or len(owner_start) != 1:
return (None, None, None, None)
name = temp[len(COMMENT_EXPR) : type_start[0]]
type = temp[type_start[0] + len(TYPE_EXPR) : schema_start[0]]
schema = temp[schema_start[0] + len(SCHEMA_EXPR) : owner_start[0]]
if not tblspace_start:
tblspace_start.append(None)
owner = temp[owner_start[0] + len(OWNER_EXPR) : tblspace_start[0]]
return (name, type, schema, owner)
def get_master_dump_file(master_datadir, backup_dir, dump_dir, timestamp, dump_prefix, ddboost):
"""
Generate the path to master dump file for ddboost, local cluster and netbackup dump, this function
does not generate path to other remote dump location.
Currently the netbackup and local dump both have same backup directory.
DDboost is different from netbackup & local dump
"""
dump_file_name = "%s%s" % (generate_master_dbdump_prefix(dump_prefix), timestamp)
if ddboost:
dump_file = os.path.join(dump_dir, timestamp[0:8], dump_file_name)
else:
dump_file = os.path.join(get_restore_dir(master_datadir, backup_dir), dump_dir, timestamp[0:8], dump_file_name)
return dump_file
......@@ -27,7 +27,8 @@ from gppylib.operations.backup_utils import check_backup_type, check_dir_writabl
get_full_timestamp_for_incremental_with_nbu, get_lines_from_file, restore_file_with_nbu, run_pool_command, scp_file_to_hosts, \
verify_lines_in_file, write_lines_to_file, split_fqn, escapeDoubleQuoteInSQLString, get_dbname_from_cdatabaseline, \
checkAndRemoveEnclosingDoubleQuote, checkAndAddEnclosingDoubleQuote, removeEscapingDoubleQuoteInSQLString, \
create_temp_file_with_schemas, check_funny_chars_in_names, remove_file_on_segments
create_temp_file_with_schemas, check_funny_chars_in_names, remove_file_on_segments, get_restore_dir, get_nonquoted_keyword_index, \
unescape_string, get_table_info
from gppylib.operations.unix import CheckFile, CheckRemoteDir, MakeRemoteDir, CheckRemotePath
from re import compile, search, sub
......@@ -98,12 +99,6 @@ def update_ao_statistics(master_port, dbname, restored_tables, restored_schema=[
logger.info("Error updating ao statistics after restore")
raise e
def get_restore_dir(data_dir, backup_dir):
if backup_dir is not None:
return backup_dir
else:
return data_dir
def get_restore_tables_from_table_file(table_file):
if not os.path.isfile(table_file):
raise Exception('Table file does not exist "%s"' % table_file)
......@@ -171,15 +166,7 @@ def create_restore_plan(master_datadir, backup_dir, dump_dir, dump_prefix, db_ti
table_set_from_metadata_file = [schema + '.' + table for schema, table in dump_tables]
if ddboost:
full_timestamp = get_full_timestamp_for_incremental(master_datadir, dump_dir, dump_prefix, db_timestamp)
elif netbackup_service_host:
full_timestamp = get_full_timestamp_for_incremental_with_nbu(dump_prefix, db_timestamp, netbackup_service_host, netbackup_block_size)
else:
full_timestamp = get_full_timestamp_for_incremental(get_restore_dir(master_datadir, backup_dir), dump_dir, dump_prefix, db_timestamp)
if not full_timestamp:
raise Exception("Could not locate fullbackup associated with ts '%s'. Either increments file or fullback is missing." % db_timestamp)
full_timestamp = get_full_timestamp_for_incremental(master_datadir, dump_dir, dump_prefix, db_timestamp, backup_dir, ddboost, netbackup_service_host, netbackup_block_size)
incremental_restore_timestamps = get_incremental_restore_timestamps(master_datadir, backup_dir, dump_dir, dump_prefix, full_timestamp, db_timestamp)
......@@ -414,7 +401,7 @@ def statistics_file_dumped(master_datadir, backup_dir, dump_dir, dump_prefix, re
def _build_gpdbrestore_cmd_line(ts, table_file, backup_dir, redirected_restore_db, report_status_dir, dump_prefix, ddboost=False, netbackup_service_host=None,
netbackup_block_size=None, change_schema=None, schema_level_restore_file=None):
cmd = 'gpdbrestore -t %s --table-file %s -a -v --noplan --noanalyze --noaostats' % (ts, table_file)
cmd = 'gpdbrestore -t %s --table-file %s -a -v --noplan --noanalyze --noaostats --no-validate-table-name' % (ts, table_file)
if backup_dir is not None:
cmd += " -u %s" % backup_dir
if dump_prefix:
......@@ -686,7 +673,6 @@ class RestoreDatabase(Operation):
for tbl in analyze_list:
analyze_table = "analyze " + tbl
logger.info('analyze table statement is %s' % analyze_table)
try:
execSQL(conn, analyze_table)
except Exception as e:
......@@ -1154,7 +1140,18 @@ class ValidateTimestamp(Operation):
.format(ucfile=uncompressed_file))
return compress
def validate_timestamp_format(self):
if not self.candidate_timestamp:
raise Exception('Timestamp must not be None.')
else:
# timestamp has to be a string of 14 digits(YYYYMMDDHHMMSS)
timestamp_pattern = compile(r'\d{14}')
if not search(timestamp_pattern, self.candidate_timestamp):
raise Exception('Invalid timestamp specified, please specify in the following format: YYYYMMDDHHMMSS.')
def execute(self):
self.validate_timestamp_format()
path = os.path.join(get_restore_dir(self.master_datadir, self.backup_dir), self.dump_dir, self.candidate_timestamp[0:8])
createdb_file = generate_createdb_filename(self.master_datadir, self.backup_dir, self.dump_dir, self.dump_prefix, self.candidate_timestamp, self.ddboost)
if not CheckFile(createdb_file).run():
......@@ -1197,7 +1194,7 @@ class ValidateSegments(Operation):
if not exists:
raise ExceptionNoStackTraceNeeded("No dump file on %s at %s" % (seg.getSegmentHostName(), path))
def validate_tablenames(table_list, master_data_dir, backup_dir, dump_dir, dump_prefix, timestamp, schema_level_restore_list=None):
def check_table_name_format_and_duplicate(table_list, schema_level_restore_list=None):
"""
verify table list, and schema list, resolve duplicates and overlaps
"""
......@@ -1222,26 +1219,20 @@ def validate_tablenames(table_list, master_data_dir, backup_dir, dump_dir, dump_
table_set.add((schema, table))
restore_table_list.append(restore_table)
# validate tables
filename = generate_metadata_filename(master_data_dir, backup_dir, dump_dir, dump_prefix, timestamp)
return restore_table_list, schema_level_restore_list
dumped_tables = []
lines = get_lines_from_zipped_file(filename)
for line in lines:
pattern = "-- Name: (.+?); Type: (.+?); Schema: (.+?); Owner"
match = search(pattern, line)
if match is None:
continue
name, type, schema = match.group(1), match.group(2), match.group(3)
if type == "TABLE":
schema = pg.escape_string(schema)
name = pg.escape_string(name)
dumped_tables.append('%s.%s' % (schema, name))
for table in restore_table_list:
if table not in dumped_tables:
raise Exception("Table %s not found in backup" % table)
def validate_tablenames_exist_in_dump_file(restore_tables, dumped_tables):
unmatched_table_names = []
if dumped_tables:
dumped_table_names = [schema + '.' + table for (schema, table, _) in dumped_tables]
for table in restore_tables:
if table not in dumped_table_names:
unmatched_table_names.append(table)
else:
raise Exception('No dumped tables to restore.')
return restore_table_list, schema_level_restore_list
if len(unmatched_table_names) > 0:
raise Exception("Tables %s not found in backup" % unmatched_table_names)
class ValidateRestoreTables(Operation):
def __init__(self, restore_tables, restore_db, master_port):
......@@ -1396,105 +1387,158 @@ class RecoverRemoteDumps(Operation):
self.pool.join()
self.pool.check_results()
class GetDumpTables(Operation):
def __init__(self, restore_timestamp, master_datadir, backup_dir, dump_dir, dump_prefix, ddboost, netbackup_service_host):
class GetDumpTablesOperation(Operation):
def __init__(self, restore_timestamp, master_datadir, backup_dir, dump_dir, dump_prefix, compress):
self.master_datadir = master_datadir
self.restore_timestamp = restore_timestamp
self.dump_dir = dump_dir
self.dump_prefix = dump_prefix
self.backup_dir = backup_dir
self.ddboost = ddboost
self.grep_cmdStr = ''' | grep -e "-- Name: " -e "^\W*START (" -e "^\W*PARTITION " -e "^\W*DEFAULT PARTITION " -e "^\W*SUBPARTITION " -e "^\W*DEFAULT SUBPARTITION "'''
self.compress = compress
self.gunzip_maybe = ' | gunzip' if self.compress else ''
def extract_dumped_tables(self, lines):
schema = ''
owner = ''
table = ''
ret = []
for line in lines:
if line.startswith("-- Name: "):
table, table_type, schema, owner = get_table_info(line)
if table_type in ["TABLE", "EXTERNAL TABLE"]:
ret.append((schema, table, owner))
else:
line = line.strip()
if (line.startswith("START (") or line.startswith("DEFAULT PARTITION ") or line.startswith("PARTITION ") or
line.startswith("SUBPARTITION ") or line.startswith("DEFAULT SUBPARTITION ")):
keyword = " WITH \(tablename=E"
# minus the length of keyword below as we escaped '(' with an extra back slash (\)
pos = get_nonquoted_keyword_index(line, keyword, "'", len(keyword) - 1)
if pos == -1:
keyword = " WITH \(tablename="
pos = get_nonquoted_keyword_index(line, keyword, "'", len(keyword) - 1)
if pos == -1:
continue
# len(keyword) plus one to not include the first single quote
table = line[pos + len(keyword) : line.rfind("'")]
# unescape table name to get the defined name in database
table = unescape_string(table)
ret.append((schema, table, owner))
return ret
class GetDDboostDumpTablesOperation(GetDumpTablesOperation):
def __init__(self, restore_timestamp, master_datadir, backup_dir, dump_dir, dump_prefix, compress, dump_file):
self.dump_file = dump_file
super(GetDDboostDumpTablesOperation, self).__init__(restore_timestamp, master_datadir, backup_dir, dump_dir, dump_prefix, compress)
def execute(self):
ddboost_cmdStr = 'gpddboost --readFile --from-file=%s' % self.dump_file
cmdStr = ddboost_cmdStr + self.gunzip_maybe + self.grep_cmdStr
cmd = Command('DDBoost copy of master dump file', cmdStr)
cmd.run(validateAfter=True)
line_list = cmd.get_results().stdout.splitlines()
ret = self.extract_dumped_tables(line_list)
return ret
class GetNetBackupDumpTablesOperation(GetDumpTablesOperation):
def __init__(self, restore_timestamp, master_datadir, backup_dir, dump_dir, dump_prefix, compress, netbackup_service_host, dump_file):
self.netbackup_service_host = netbackup_service_host
self.dump_file = dump_file
super(GetNetBackupDumpTablesOperation, self).__init__(restore_timestamp, master_datadir, backup_dir, dump_dir, dump_prefix, compress)
def execute(self):
(restore_timestamp, restore_db, compress) = ValidateTimestamp(master_datadir = self.master_datadir,
backup_dir = self.backup_dir,
candidate_timestamp = self.restore_timestamp,
dump_dir = self.dump_dir,
dump_prefix = self.dump_prefix,
netbackup_service_host = self.netbackup_service_host,
ddboost = self.ddboost).run()
dump_file = os.path.join(get_restore_dir(self.master_datadir, self.backup_dir), self.dump_dir, restore_timestamp[0:8], "%s%s" % (generate_master_dbdump_prefix(self.dump_prefix), restore_timestamp))
if compress:
dump_file += '.gz'
nbu_cmdStr = 'gp_bsa_restore_agent --netbackup-service-host %s --netbackup-filename %s' % (self.netbackup_service_host, self.dump_file)
cmdStr = nbu_cmdStr + self.gunzip_maybe + self.grep_cmdStr
if self.ddboost:
from_file = os.path.join(self.dump_dir, restore_timestamp[0:8], "%s%s" % (generate_master_dbdump_prefix(self.dump_prefix), restore_timestamp))
if compress:
from_file += '.gz'
ret = []
schema = ''
owner = ''
if compress:
cmd = Command('DDBoost copy of master dump file',
'gpddboost --readFile --from-file=%s | gunzip | grep -e "SET search_path = " -e "-- Data for Name: " -e "COPY "'
% (from_file))
cmd = Command('NetBackup copy of master dump file', cmdStr)
cmd.run(validateAfter=True)
line_list = cmd.get_results().stdout.splitlines()
ret = self.extract_dumped_tables(line_list)
return ret
class GetLocalDumpTablesOperation(GetDumpTablesOperation):
def __init__(self, restore_timestamp, master_datadir, backup_dir, dump_dir, dump_prefix, compress, dump_file):
self.dump_file = dump_file
super(GetLocalDumpTablesOperation, self).__init__(restore_timestamp, master_datadir, backup_dir, dump_dir, dump_prefix, compress)
def execute(self):
ret = []
f = None
try:
if self.compress:
f = gzip.open(self.dump_file, 'r')
else:
cmd = Command('DDBoost copy of master dump file',
'gpddboost --readFile --from-file=%s | grep -e "SET search_path = " -e "-- Data for Name: " -e "COPY "'
% (from_file))
# TODO: The code below is duplicated from the code for local restore
# We need to refactor this. Probably use the same String IO interfaces
# to extract lines in both the cases.
cmd.run(validateAfter=True)
line_list = cmd.get_results().stdout.splitlines()
for line in line_list:
if line.startswith("SET search_path = "):
line = line[len("SET search_path = ") : ]
if ", pg_catalog;" in line:
schema = line[ : line.index(", pg_catalog;")]
else:
schema = "pg_catalog"
elif line.startswith("-- Data for Name: "):
owner = line[line.index("; Owner: ") + 9 : ].rstrip()
elif line.startswith("COPY "):
table = line[5:]
if table.rstrip().endswith(") FROM stdin;"):
if table.startswith("\""):
table = table[: table.index("\" (") + 1]
else:
table = table[: table.index(" (")]
else:
table = table[: table.index(" FROM stdin;")]
table = table.rstrip()
ret.append((schema, table, owner))
return ret
f = open(self.dump_file, 'r')
lines = f.readlines()
ret = self.extract_dumped_tables(lines)
finally:
if f is not None:
f.close()
return ret
class GetRemoteDumpTablesOperation(GetDumpTablesOperation):
def __init__(self, restore_timestamp, master_datadir, backup_dir, dump_dir, dump_prefix, compress, remote_host, dump_file):
self.host = remote_host
self.dump_file = dump_file
super(GetRemoteDumpTablesOperation, self).__init__(restore_timestamp, master_datadir, backup_dir, dump_dir, dump_prefix, compress)
def execute(self):
cat_cmdStr = 'cat %s%s' % (self.dump_file, self.gunzip_maybe)
get_remote_dump_tables = '''ssh %s %s%s''' % (self.host, cat_cmdStr, self.grep_cmdStr)
cmd = Command('Get remote copy of dumped tables', get_remote_dump_tables)
cmd.run(validateAfter=True)
line_list = cmd.get_results().stdout.splitlines()
return self.extract_dumped_tables(line_list)
class GetDumpTables():
def __init__(self, restore_timestamp, master_datadir, backup_dir,
dump_dir, dump_prefix, compress, ddboost,
netbackup_service_host, remote_host=None,
dump_file=None):
"""
backup_dir: user specified backup directory, using -u option
dump_dir: dump directory name, e.g. ddboost default dump directory
compress: dump file is compressed or not
remote_host: not ddboost or netbackup server, a normal remote host name where a dump file exist
dump_file: the path to the dump file with exact file format(.gz)
"""
self.restore_timestamp = restore_timestamp
self.master_datadir = master_datadir
self.backup_dir = backup_dir
self.dump_dir = dump_dir
self.dump_prefix = dump_prefix
self.compress = compress
self.ddboost = ddboost
self.netbackup_service_host = netbackup_service_host
self.remote_hostname = remote_host
self.dump_file = dump_file
def get_dump_tables(self):
if self.ddboost:
get_dump_table_cmd = GetDDboostDumpTablesOperation(self.restore_timestamp, self.master_datadir, self.backup_dir,
self.dump_dir, self.dump_prefix, self.compress, self.dump_file)
elif self.netbackup_service_host:
get_dump_table_cmd = GetNetBackupDumpTablesOperation(self.restore_timestamp, self.master_datadir, self.backup_dir, self.dump_dir,
self.dump_prefix, self.compress, self.netbackup_service_host, self.dump_file)
elif self.remote_hostname:
get_dump_table_cmd = GetRemoteDumpTablesOperation(self.restore_timestamp, self.master_datadir, self.backup_dir, self.dump_dir,
self.dump_prefix, self.compress, self.remote_hostname, self.dump_file)
else:
f = None
schema = ''
owner = ''
ret = []
try:
if compress:
f = gzip.open(dump_file, 'r')
else:
f = open(dump_file, 'r')
while True:
line = f.readline()
if not line:
break
if line.startswith("SET search_path = "):
line = line[len("SET search_path = ") : ]
if ", pg_catalog;" in line:
schema = line[ : line.index(", pg_catalog;")]
else:
schema = "pg_catalog"
elif line.startswith("-- Data for Name: "):
owner = line[line.index("; Owner: ") + 9 : ].rstrip()
elif line.startswith("COPY "):
table = line[5:]
if table.rstrip().endswith(") FROM stdin;"):
if table.startswith("\""):
table = table[: table.index("\" (") + 1]
else:
table = table[: table.index(" (")]
else:
table = table[: table.index(" FROM stdin;")]
table = table.rstrip()
ret.append((schema, table, owner))
finally:
if f is not None:
f.close()
return ret
get_dump_table_cmd = GetLocalDumpTablesOperation(self.restore_timestamp, self.master_datadir, self.backup_dir,
self.dump_dir, self.dump_prefix, self.compress, self.dump_file)
return get_dump_table_cmd.run()
......@@ -889,16 +889,16 @@ class BackupUtilsTestCase(unittest.TestCase):
def test_get_full_timestamp_for_incremental_00(self, m1):
backup_dir = 'home'
ts = '20130207133000'
full_ts = get_full_timestamp_for_incremental(backup_dir, self.dump_dir, self.dump_prefix, ts)
self.assertEquals(full_ts, None)
with self.assertRaisesRegexp(Exception, "Could not locate fullbackup associated with timestamp '20130207133000'. Either increments file or full backup is missing."):
get_full_timestamp_for_incremental(backup_dir, self.dump_dir, self.dump_prefix, ts)
@patch('glob.glob', return_value=['foo'])
@patch('gppylib.operations.backup_utils.get_lines_from_file', return_value=[])
def test_get_full_timestamp_for_incremental_01(self, m1, m2):
backup_dir = 'home'
ts = '20130207133000'
full_ts = get_full_timestamp_for_incremental(backup_dir, self.dump_dir, self.dump_prefix, ts)
self.assertEquals(full_ts, None)
with self.assertRaisesRegexp(Exception, "Could not locate fullbackup associated with timestamp '20130207133000'. Either increments file or full backup is missing."):
get_full_timestamp_for_incremental(backup_dir, self.dump_dir, self.dump_prefix, ts)
@patch('glob.glob', return_value=['/tmp/db_dumps/20130207/gp_dump_20130207093000_increments'])
@patch('gppylib.operations.backup_utils.get_lines_from_file', return_value=['20130207133001', '20130207133000'])
......@@ -1241,8 +1241,8 @@ class BackupUtilsTestCase(unittest.TestCase):
backup_dir = 'home'
ts = '20130207133000'
dump_prefix = 'foo_'
full_ts = get_full_timestamp_for_incremental(backup_dir, self.dump_dir, self.dump_prefix, ts)
self.assertEquals(full_ts, None)
with self.assertRaisesRegexp(Exception, "Could not locate fullbackup associated with timestamp '20130207133000'. Either increments file or full backup is missing."):
get_full_timestamp_for_incremental(backup_dir, self.dump_dir, self.dump_prefix, ts)
@patch('glob.glob', return_value=['foo'])
@patch('gppylib.operations.backup_utils.get_lines_from_file', return_value=[])
......@@ -1250,8 +1250,8 @@ class BackupUtilsTestCase(unittest.TestCase):
backup_dir = 'home'
ts = '20130207133000'
dump_prefix = 'foo_'
full_ts = get_full_timestamp_for_incremental(backup_dir, self.dump_dir, self.dump_prefix, ts)
self.assertEquals(full_ts, None)
with self.assertRaisesRegexp(Exception, "Could not locate fullbackup associated with timestamp '20130207133000'. Either increments file or full backup is missing."):
get_full_timestamp_for_incremental(backup_dir, self.dump_dir, self.dump_prefix, ts)
@patch('glob.glob', return_value=['/tmp/db_dumps/20130207/foo_gp_dump_20130207093000_increments'])
@patch('gppylib.operations.backup_utils.get_lines_from_file', return_value=['20130207133001', '20130207133000'])
......
......@@ -1121,7 +1121,7 @@ Feature: Validate command line arguments
@backupfire
Scenario: Test gpcrondump dump deletion (-c option)
Given the test is initialized
And there is a "heap" table "public.heap_table" with compression "None" in "bkdb" with data
And there is a "heap" table "public.heap_table" with compression "None" in "bkdb" with data
And the user runs "gpcrondump -a -x bkdb"
And the full backup timestamp from gpcrondump is stored
And gpcrondump should return a return code of 0
......@@ -1460,9 +1460,9 @@ Feature: Validate command line arguments
And gpcrondump should return a return code of 0
And the timestamp from gpcrondump is stored
And all the data from "bkdb" is saved for verification
And the user runs gpdbrestore with the stored timestamp and options "-T public.heap_table -T public.invalid"
And the user runs gpdbrestore with the stored timestamp and options "-T public.heap_table -T public.invalid -q"
Then gpdbrestore should return a return code of 2
And gpdbrestore should print Invalid tables for -T option: The following tables were not found in plan file to stdout
And gpdbrestore should print Tables \[\'public.invalid\'\] not found in backup to stdout
@backupfire
Scenario: gpdbrestore -L with -u option
......@@ -1823,9 +1823,11 @@ Feature: Validate command line arguments
And the timestamp from gpcrondump is stored in a list
And the named pipe script for the "restore" is run for the files under " "
And all the data from "bkdb" is saved for verification
And the user runs gpdbrestore with the stored timestamp
And gpdbrestore should return a return code of 0
And verify that the data of "10" tables in "bkdb" is validated after restore
When the named pipe script for the "restore" is run for the files under " "
And the user runs gpdbrestore with the stored timestamp and options "-T public.ao_part_table"
Then gpdbrestore should print \[WARNING\]:-Skipping validation of tables in dump file due to the use of named pipes to stdout
And close all opened pipes
Scenario: Incremental Backup and Restore with -t filter for Full
......@@ -3035,6 +3037,7 @@ Feature: Validate command line arguments
Scenario: Funny characters in the table name or schema name for gpdbrestore
Given the test is initialized
And database "testdb" exists
And there is a "heap" table "public.table1" in "testdb" with data
When the user runs command "gpcrondump -a -x testdb"
And the timestamp from gpcrondump is stored
When the user runs gpdbrestore with the stored timestamp and options "--table-file gppylib/test/behave/mgmt_utils/steps/data/special_chars/funny_char_table.txt"
......@@ -3049,7 +3052,7 @@ Feature: Validate command line arguments
When the user runs command "gpdbrestore -s "A\\t\\n.,!1""
Then gpdbrestore should return a return code of 2
And gpdbrestore should print Name has an invalid character to stdout
When the user runs gpdbrestore with the stored timestamp and options "-T public.table --change-schema A\\t\\n.,!1"
When the user runs gpdbrestore with the stored timestamp and options "-T public.table1 --change-schema A\\t\\n.,!1"
Then gpdbrestore should return a return code of 2
And gpdbrestore should print Name has an invalid character to stdout
When the user runs gpdbrestore with the stored timestamp and options "-S A\\t\\n.,!1"
......@@ -3129,7 +3132,6 @@ Feature: Validate command line arguments
And verify that there is no table " co_T`~@#$%^&*()-+[{]}|\;: \'"/?><1 " in " DB`~@#$%^&*()_-+[{]}|\;: \'/?><;1 "
And the user runs command "dropdb " DB\`~@#\$%^&*()_-+[{]}|\\;: \\'/?><;1 ""
Scenario: gpcrondump with --schema-file, --exclude-schema-file, -s and -S option when schema name and database name contains special character
Given the test is initialized
And the user runs "psql -f gppylib/test/behave/mgmt_utils/steps/data/special_chars/create_special_database.sql template1"
......@@ -3355,9 +3357,9 @@ Feature: Validate command line arguments
When the user runs "gpcrondump -a -x bkdb"
Then gpcrondump should return a return code of 0
And the timestamp from gpcrondump is stored
When the user runs gpdbrestore with the stored timestamp and options "-T public.heap_table2"
When the user runs gpdbrestore with the stored timestamp and options "-T public.heap_table2 -q"
Then gpdbrestore should return a return code of 2
Then gpdbrestore should print Table public.heap_table2 not found in backup to stdout
Then gpdbrestore should print Tables \[\'public.heap_table2\'\] to stdout
Then gpdbrestore should not print Issue with 'ANALYZE' of restored table 'public.heap_table2' in 'bkdb' database to stdout
Scenario: Absolute path should be provided with -u option for gpcrondump
......
......@@ -2465,6 +2465,7 @@ def impl(context, timestamp_key, dir):
if not 'named pipe' in results.stdout:
raise Exception('Expected %s to be a named pipe' % filename)
@when('the named pipe script for the "{operation}" is run for the files under "{dump_directory}"')
@then('the named pipe script for the "{operation}" is run for the files under "{dump_directory}"')
def impl(context, operation, dump_directory):
dump_dir = dump_directory if len(dump_directory.strip()) != 0 else master_data_dir
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册