提交 4ccf47cf 编写于 作者: T Tingfang Bao 提交者: Ming LI

A better way to find out the sequence dependency for a table (#4996)

We find out all the sequences in the table, and verify if the
   sequence should be created. later, we get the complete dump sql
   that included the sequnce DDL and table DDL, It can create
   the sequence before creating the table

   This implementation is to avoid locking table during drop table with sequence in transaction, meantime execute command `psql -f ...` to create this sequence.
Signed-off-by: NMing LI <mli@apache.org>
上级 8d40268b
......@@ -1526,7 +1526,7 @@ class GpSchemaDump(Command):
"""
def __init__(self, name, host, port, user, full=True, database=None,
schema=None, table=None, ctxt=LOCAL, remoteHost=None, include_data = False):
tables=list(), ctxt=LOCAL, remoteHost=None, justIncludeData = False):
"""
name: name of the command
host: GPDB host
......@@ -1540,7 +1540,7 @@ class GpSchemaDump(Command):
cmdStr = None
self.full = full
if not full and (database is None or schema is None or table is None):
if not full and (database is None or tables[0].schema is None or tables[0].table is None):
raise Exception('database, schema and table are required for '
'table dump')
......@@ -1548,12 +1548,16 @@ class GpSchemaDump(Command):
cmdStr = 'pg_dumpall -s --gp-syntax -h %s -p %d -U %s' \
% (host, port, user)
else:
cmdStr = 'pg_dump -s -x -O --gp-syntax -h %s -p %d -U %s -t ' \
'\'\"%s\".\"%s\"\' %s' % (host, port, user, schema, table, database)
if justIncludeData:
cmdStr = 'pg_dump -a -x -O --gp-syntax -h %s -p %d -U %s ' % (host, port, user)
else:
cmdStr = 'pg_dump -s -x -O --gp-syntax -h %s -p %d -U %s ' % (host, port, user)
for table in tables:
tableOptions = " -t \'\"%s\".\"%s\"\' " % (table.schema, table.table)
cmdStr += tableOptions
if include_data:
cmdStr = 'pg_dump -x -O --gp-syntax -h %s -p %d -U %s -t ' \
'\'\"%s\".\"%s\"\' %s' % (host, port, user, schema, table, database)
cmdStr += " %s" % database
Command.__init__(self, name, cmdStr, ctxt, remoteHost)
......@@ -1698,8 +1702,6 @@ class GpTransferCommand(Command):
self._src_ready = Event()
self._dest_ready = Event()
self.seqs = []
self.create_seqs_before_table = []
self.create_seqs_after_table = []
Command.__init__(self, name, None, LOCAL, None)
......@@ -1727,8 +1729,6 @@ class GpTransferCommand(Command):
self._dest_conn = connect(url)
self.seqs = self._get_sequences()
self.create_seqs_before_table = []
self.create_seqs_after_table = []
if not self._dest_exists:
self._create_target_table()
elif self._truncate and not self._table_pair.dest.external:
......@@ -1750,11 +1750,7 @@ class GpTransferCommand(Command):
self._transfer_data()
if len(self.create_seqs_before_table) == 0 \
and len(self.create_seqs_after_table) == 0:
self._reset_sequence_nextval(self.seqs)
else:
self._reset_sequence_nextval(self.create_seqs_after_table)
self._reset_sequence_nextval(self.seqs)
if self._validator_class:
self._validate()
......@@ -1856,25 +1852,6 @@ FROM (
logger.info('Creating target table %s...', self._table_pair.dest)
schema_sql = self._get_source_table_schema()
sequence_dump_sql = ""
# find out the sequence that not included in schema_sql
for index, seq in enumerate(self.seqs):
if seq.schema.lower() == 'public':
target_str = "CREATE SEQUENCE %s" % seq.table
else:
target_str = "CREATE SEQUENCE %s.%s" % (seq.schema, seq.table)
sequence_dump_sql += "CREATE SCHEMA %s; \\n" % seq.schema
if schema_sql.find(target_str) == -1:
self.create_seqs_before_table.append(seq)
else:
self.create_seqs_after_table.append(seq)
# create the sequence in create_seqs_before_table
if len(self.create_seqs_before_table) > 0:
sequence_dump_sql += self._get_sequence_dump_sql(self.create_seqs_before_table)
self._create_sequences(sequence_dump_sql, use_psql_client=True)
cur = execSQL(self._dest_conn, schema_sql)
cur.close()
......@@ -1908,6 +1885,17 @@ FROM (
self._pool.empty_completed_items()
logger.info('Retrieving schema for table %s...',
self._table_pair.source)
allRelations = []
for seq in self.seqs:
if not doesSchemaExist(self._dest_conn, seq.schema):
execSQL(self._dest_conn, 'CREATE SCHEMA \"%s\"' % seq.schema)
if not self._doesSequenceExist(self._dest_conn, seq.schema, seq.table):
allRelations.append(seq)
allRelations.append(self._table_pair.source)
cmd = GpSchemaDump(
'schema dump of %s' % self._table_pair.source,
self._src_host,
......@@ -1915,8 +1903,7 @@ FROM (
self._src_user,
False,
self._table_pair.source.database,
self._table_pair.source.schema,
self._table_pair.source.table,
allRelations,
ctxt=REMOTE,
remoteHost=self._src_host
)
......@@ -1925,6 +1912,30 @@ FROM (
self._pool.check_results()
return cmd.get_schema_sql()
def _doesSequenceExist(self, conn, schema_name, seq_name):
sql = '''
select
a.relname
from
pg_class a, pg_namespace b
where
a.relnamespace = b.oid
and b.nspname='%s'
and a.relname='%s'
and a.relkind = 'S';
''' % (schema_name, seq_name)
cursor=None
try:
cursor=dbconn.execSQL(conn,sql)
numrows = cursor.rowcount
if numrows == 0:
return False
elif numrows == 1:
return True
finally:
if cursor:
cursor.close()
def _get_sequences(self):
"""
Gets the dependent sequence including implicit (serial column type) and
......@@ -1957,43 +1968,38 @@ FROM (
return sequences
def _get_sequence_dump_sql(self, sequences):
def _get_sequence_data_dump_sql(self, sequences):
if len(sequences) == 0:
return ""
ret = list()
for seq in sequences:
self._pool.empty_completed_items()
self._pool.empty_completed_items()
logger.info('Checking sequences for table %s...',
self._table_pair.source)
# Just dump the sequence metadata and data,
# then import them into the destination cluster
cmd = GpSchemaDump(
'schema dump of %s' % self._table_pair.source,
self._src_host,
self._src_port,
self._src_user,
False,
self._table_pair.source.database,
seq.schema,
seq.table,
ctxt=REMOTE,
remoteHost=self._src_host,
include_data=True
)
self._pool.addCommand(cmd)
self._pool.join()
self._pool.check_results()
logger.info('Checking sequences for table %s...',
self._table_pair.source)
# Just dump the sequence data,
cmd = GpSchemaDump(
'schema dump of %s' % self._table_pair.source,
self._src_host,
self._src_port,
self._src_user,
False,
self._table_pair.source.database,
list(sequences),
ctxt=REMOTE,
remoteHost=self._src_host,
justIncludeData=True
)
ret.append(cmd.get_schema_sql())
self._pool.addCommand(cmd)
self._pool.join()
self._pool.check_results()
return cmd.get_schema_sql()
query = '\\n'.join(ret)
return query
def _create_sequences(self, sequence_dump_sql, use_psql_client=False):
def _update_sequences(self, sequence_dump_sql, use_psql_client=False):
"""
Gets the SQL to create the dependent sequence from the source GPDB system.
use_psql_client will use psql to execute sql
......@@ -2030,8 +2036,8 @@ FROM (
def _reset_sequence_nextval(self, sequences):
if len(sequences) > 0:
sequence_dump_sql = self._get_sequence_dump_sql(sequences)
self._create_sequences(sequence_dump_sql, use_psql_client=True)
sequence_data_dump_sql = self._get_sequence_data_dump_sql(sequences)
self._update_sequences(sequence_data_dump_sql)
def _get_named_pipes(self):
"""
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册