提交 cba20278 编写于 作者: P Pengcheng Tang

gptransfer to perform sequential transferring of leaf partition tables when batch-size set to 1

gptransfer starts multiple threads in workerpool to transfer each source and destination
pair, to ensure the order specified by user, batch-size should be set to 1.

Partition table transfer uses dictionary as a map between source and destination pair,
which will not follow the order of insertion. Since gpdb uses python 2.6 and the
OrderedDict is only available from 2.7, so updated with a workaround using list.

Authors: Pengcheng Tang, Lawrence Hamel
上级 0be2e5b0
......@@ -463,33 +463,13 @@ Feature: gptransfer tests
And the database "gptransfer_testdb3" does not exist
And the database "gptransfer_testdb4" does not exist
And the database "gptransfer_testdb5" does not exist
And the user runs "gptransfer --source-port $GPTRANSFER_SOURCE_PORT --source-host $GPTRANSFER_SOURCE_HOST --source-user $GPTRANSFER_SOURCE_USER --dest-user $GPTRANSFER_DEST_USER --dest-port $GPTRANSFER_DEST_PORT --dest-host $GPTRANSFER_DEST_HOST --source-map-file $GPTRANSFER_MAP_FILE --validate md5 -f gppylib/test/behave/mgmt_utils/steps/data/gptransfer_infile"
And the user runs "gptransfer --source-port $GPTRANSFER_SOURCE_PORT --source-host $GPTRANSFER_SOURCE_HOST --source-user $GPTRANSFER_SOURCE_USER --dest-user $GPTRANSFER_DEST_USER --dest-port $GPTRANSFER_DEST_PORT --dest-host $GPTRANSFER_DEST_HOST --source-map-file $GPTRANSFER_MAP_FILE --validate md5 -f gppylib/test/behave/mgmt_utils/steps/data/gptransfer_infile --batch-size=1"
Then gptransfer should return a return code of 0
And verify that gptransfer is in order of "gppylib/test/behave/mgmt_utils/steps/data/gptransfer_infile"
And verify that gptransfer is in order of "gppylib/test/behave/mgmt_utils/steps/data/gptransfer_infile" when partition transfer is "None"
And verify that table "t0" in "gptransfer_testdb1" has "100" rows
And verify that table "t3" in "gptransfer_testdb2" has "400" rows
And verify that table "t0" in "gptransfer_testdb3" has "700" rows
@T506748
Scenario: gptransfer input file
Given the database is running
And the database "gptransfer_destdb" does not exist
And the database "gptransfer_testdb1" does not exist
And the database "gptransfer_testdb2" does not exist
And the database "gptransfer_testdb3" does not exist
And the database "gptransfer_testdb4" does not exist
And the database "gptransfer_testdb5" does not exist
And the user runs "gptransfer --source-port $GPTRANSFER_SOURCE_PORT --source-host $GPTRANSFER_SOURCE_HOST --source-user $GPTRANSFER_SOURCE_USER --dest-user $GPTRANSFER_DEST_USER --dest-port $GPTRANSFER_DEST_PORT --dest-host $GPTRANSFER_DEST_HOST --source-map-file $GPTRANSFER_MAP_FILE --validate md5 -f gppylib/test/behave/mgmt_utils/steps/data/gptransfer_wildcard_infile"
Then gptransfer should return a return code of 0
And verify that gptransfer is in order of "gppylib/test/behave/mgmt_utils/steps/data/gptransfer_wildcard_infile"
And verify that table "t0" in "gptransfer_testdb1" has "100" rows
And verify that table "t1" in "gptransfer_testdb1" has "200" rows
And verify that table "t2" in "gptransfer_testdb1" has "300" rows
And verify that table "t3" in "gptransfer_testdb2" has "400" rows
And verify that table "t4" in "gptransfer_testdb2" has "500" rows
And verify that table "s1.t0" in "gptransfer_testdb3" has "800" rows
And verify that table "s2.t0" in "gptransfer_testdb3" has "900" rows
@T886748
Scenario: gptransfer -F exclude input file
Given the database is running
......@@ -2210,6 +2190,19 @@ Feature: gptransfer tests
Then gptransfer should return a return code of 2
And gptransfer should print Destination table name "gptest.public.tbl.extra_tablename" isn't fully qualified format to stdout
@partition_transfer
@prt_transfer_46
Scenario: gptransfer partition with batch size 1, tables transferred in sequential order
Given the database is running
And database "gptest" exists
And database "gptest" is created if not exists on host "GPTRANSFER_SOURCE_HOST" with port "GPTRANSFER_SOURCE_PORT" with user "GPTRANSFER_SOURCE_USER"
And the user runs "psql -p $GPTRANSFER_SOURCE_PORT -h $GPTRANSFER_SOURCE_HOST -U $GPTRANSFER_SOURCE_USER -f gppylib/test/behave/mgmt_utils/steps/data/gptransfer/two_level_range_list_prt_1.sql -d gptest"
And the user runs "psql -p $GPTRANSFER_DEST_PORT -h $GPTRANSFER_DEST_HOST -U $GPTRANSFER_DEST_USER -f gppylib/test/behave/mgmt_utils/steps/data/gptransfer/two_level_range_list_prt_1.sql -d gptest"
And there is a file "input_file" with tables "gptest.public.sales_1_prt_2_2_prt_asia, gptest.public.sales_1_prt_2_2_prt_asia|gptest.public.sales_1_prt_3_2_prt_asia, gptest.public.sales_1_prt_3_2_prt_asia|gptest.public.sales_1_prt_2_2_prt_other_regions, gptest.public.sales_1_prt_2_2_prt_other_regions"
When the user runs "gptransfer -f input_file --partition-transfer --source-port $GPTRANSFER_SOURCE_PORT --source-host $GPTRANSFER_SOURCE_HOST --source-user $GPTRANSFER_SOURCE_USER --dest-user $GPTRANSFER_DEST_USER --dest-port $GPTRANSFER_DEST_PORT --dest-host $GPTRANSFER_DEST_HOST --source-map-file $GPTRANSFER_MAP_FILE --batch-size=1"
Then gptransfer should return a return code of 0
And verify that gptransfer is in order of "input_file" when partition transfer is "True"
@gptransfer_help
Scenario: use gptransfer --help with another gptransfer process already running.
Given the database is running
......
......@@ -3699,12 +3699,13 @@ def impl(context, filepath, line):
if line in open(filepath).read():
raise Exception("The file '%s' does contain '%s'" % (filepath, line))
@then('verify that gptransfer is in order of "{filepath}"')
def impl(context, filepath):
@then('verify that gptransfer is in order of "{filepath}" when partition transfer is "{is_partition_transfer}"')
def impl(context, filepath, is_partition_transfer):
table = []
with open(filepath) as f:
input_file = f.read().splitlines()
table = [x.replace('/', "") for x in input_file]
table = f.read().splitlines()
if is_partition_transfer != "None":
table = [x.split(',')[0] for x in table]
split_message = re.findall("Starting transfer of.*\n", context.stdout_message)
......
......@@ -2360,6 +2360,8 @@ class GpTransfer(object):
self._src_databases = None
self._dest_databases = None
self._src_dest_partition_table_mapping = None
self._all_src_databases = get_user_databases(self._options.source_host,
self._options.source_port,
self._options.source_user)
......@@ -3151,6 +3153,9 @@ class GpTransfer(object):
# --partition-transfer will have the exact database name for each destination table,
# not support the regexp
if self._options.partition_transfer:
self._src_dest_partition_table_mapping = dict()
self._get_tables_from_partition_input_file()
user_tables = get_user_tables(self._options.source_host,
self._options.source_port,
......@@ -3158,11 +3163,22 @@ class GpTransfer(object):
self._src_databases,
False,
self._options.partition_transfer)
# filter out non-exist source tables from source databases.
for tbl in self.src_dest_partition_table_mapping:
# key in dict: self._src_dest_partition_table_mapping maps to index of list: source_tables
key = 0
for i in range(len(self._src_prt_tables)):
tbl = self._src_prt_tables[i]
for user_tbl in user_tables:
if tbl == str(user_tbl):
source_tables.append(user_tbl)
self._src_dest_partition_table_mapping[key] = self._dest_prt_tables[i]
key += 1
break
del self._src_prt_tables[:]
del self._dest_prt_tables[:]
return source_tables
if self._options.full:
......@@ -3285,7 +3301,8 @@ class GpTransfer(object):
read comma ',' separated partition table pair: <source, destination>,
populate all source-destination pairs with their exact name(does not expect regexp).
"""
self.src_dest_partition_table_mapping = dict()
self._src_prt_tables = []
self._dest_prt_tables = []
self._dest_databases = []
self._src_databases = []
fqn_format = '<db_name>.<schema_name>.<table_name>'
......@@ -3326,14 +3343,17 @@ class GpTransfer(object):
self._options.source_port == self._options.dest_port):
raise Exception('Cannot transfer between same partition table %s' % src_table)
if (src_table in self.src_dest_partition_table_mapping and
self.src_dest_partition_table_mapping[src_table] == dest_table):
if src_table in self._src_prt_tables:
indexes = [i for i, tbl in enumerate(self._src_prt_tables) if tbl == src_table]
for i in indexes:
if self._dest_prt_tables[i] == dest_table:
logger.warning('Duplicate entries found, source partition table %s, destination partition table %s ' %
(src_table, dest_table))
continue
logger.debug('Getting source partition table "%s", destination partition table "%s"' % (src_table, dest_table))
self.src_dest_partition_table_mapping[src_table] = dest_table
self._src_prt_tables.append(src_table)
self._dest_prt_tables.append(dest_table)
src_db = split_fqn(src_table)[0]
if src_db not in self._src_databases:
......@@ -3442,10 +3462,13 @@ class GpTransfer(object):
table_pairs = list()
seen = set()
for src_table in self._src_tables:
for i in range(len(self._src_tables)):
src_table = self._src_tables[i]
if self._options.partition_transfer:
src_tbl_key = str(src_table)
dest_db, dest_schema, dest_tbl = split_fqn(self.src_dest_partition_table_mapping[src_tbl_key])
dest_prt_table = self._src_dest_partition_table_mapping[i]
dest_db, dest_schema, dest_tbl = split_fqn(dest_prt_table)
table_pair = GpTransferTablePair(src_table,
GpTransferTable(dest_db,
dest_schema,
......@@ -3466,6 +3489,9 @@ class GpTransfer(object):
seen.add(table_pair)
table_pairs.append(table_pair)
if self._src_dest_partition_table_mapping:
self._src_dest_partition_table_mapping.clear()
logger.info('Number of tables to transfer: %s', len(table_pairs))
return table_pairs
......
......@@ -228,6 +228,9 @@ OPTIONS
the destination database. If not specified, the default is 2. The
maximum is 10.
When set to 1, it starts transferring of all tables based on the order
specified from -t and -f option.
-d <database>
A source database to copy. This option can be specified multiple times
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册