未验证 提交 9f83fee5 编写于 作者: H Huiliang.liu 提交者: GitHub

support fast_match option in gpload config file (#5317)

- add fast_match option in gpload config file. If both reuse_tables
and fast_match are true, gpload will try fast match external
table(without checking columns). If reuse_tables is false and
fast_match is true, it will print warning message.
上级 c8a03d20
......@@ -122,6 +122,7 @@ valid_tokens = {
"preload": {'parse_children': True, 'parent': 'gpload'},
"truncate": {'parse_children': False, 'parent': 'preload'},
"reuse_tables": {'parse_children': False, 'parent': 'preload'},
"fast_match": {'parse_children': False, 'parent': 'preload'},
"staging_table": {'parse_children': False, 'parent': 'preload'},
"sql": {'parse_children': True, 'parent': 'gpload'},
"before": {'parse_children': False, 'parent': 'sql'},
......@@ -2074,6 +2075,66 @@ class gpload:
self.log(self.DEBUG, "query used to identify reusable external relations: %s" % sql)
return sql
# Fast path to find out whether we have an existing external table in the
# catalog which could be reused for this operation. we only make sure the
# location, data format and error limit are same. we don't check column
# names and types
#
# This function will return the SQL to run in order to find out whether
# such a table exists.
#
def get_fast_match_exttable_query(self, locationStr, formatType, formatOpts, limitStr, schemaName, log_errors):
sqlFormat = """select relname from pg_class
join
pg_exttable pgext
on(pg_class.oid = pgext.reloid)
%s
where
relstorage = 'x' and
relname like 'ext_gpload_reusable_%%' and
%s
"""
joinStr = ""
conditionStr = ""
# if schemaName is None, find the resuable ext table which is visible to
# current search path. Else find the resuable ext table under the specific
# schema, and this needs to join pg_namespace.
if schemaName is None:
joinStr = ""
conditionStr = "pg_table_is_visible(pg_class.oid)"
else:
joinStr = """join
pg_namespace pgns
on(pg_class.relnamespace = pgns.oid)"""
conditionStr = "pgns.nspname = '%s'" % schemaName
sql = sqlFormat % (joinStr, conditionStr)
if log_errors:
sql += " and pgext.fmterrtbl = pgext.reloid "
else:
sql += " and pgext.fmterrtbl IS NULL "
for i, l in enumerate(self.locations):
sql += " and pgext.urilocation[%s] = %s\n" % (i + 1, quote(l))
sql+= """and pgext.fmttype = %s
and pgext.writable = false
and pgext.fmtopts like %s """ % (quote(formatType[0]),quote("%" + quote_unident(formatOpts.rstrip()) +"%"))
if limitStr:
sql += "and pgext.rejectlimit = %s " % limitStr
else:
sql += "and pgext.rejectlimit IS NULL "
sql+= "limit 1;"
self.log(self.DEBUG, "query used to fast match external relations:\n %s" % sql)
return sql
#
# Create a string from the following conditions to reuse staging table:
# 1. same target table
......@@ -2279,8 +2340,13 @@ class gpload:
else:
# process the single quotes in order to successfully find an existing external table to reuse.
self.formatOpts = self.formatOpts.replace("E'\\''","'\''")
sql = self.get_reuse_exttable_query(formatType, self.formatOpts,
limitStr, from_cols, self.extSchemaName, self.log_errors)
if self.fast_match:
sql = self.get_fast_match_exttable_query(locationStr, formatType, self.formatOpts,
limitStr, self.extSchemaName, self.log_errors)
else:
sql = self.get_reuse_exttable_query(formatType, self.formatOpts,
limitStr, from_cols, self.extSchemaName, self.log_errors)
resultList = self.db.query(sql.encode('utf-8')).getresult()
if len(resultList) > 0:
# found an external table to reuse. no need to create one. we're done here.
......@@ -2678,6 +2744,9 @@ class gpload:
if preload:
truncate = self.getconfig('gpload:preload:truncate',bool,False)
self.reuse_tables = self.getconfig('gpload:preload:reuse_tables',bool,False)
self.fast_match = self.getconfig('gpload:preload:fast_match',bool,False)
if self.reuse_tables == False and self.fast_match == True:
self.log(self.WARN, 'fast_match is ignored when reuse_tables is false!')
self.staging_table = self.getconfig('gpload:preload:staging_table', unicode, default=None)
if self.error_table:
self.log_errors = True
......
......@@ -95,7 +95,7 @@ d = mkpath('config')
if not os.path.exists(d):
os.mkdir(d)
def write_config_file(mode='insert', reuse_flag='',columns_flag='0',mapping='0',portNum='8081',database='reuse_gptest',host='localhost',formatOpts='text',file='data/external_file_01.txt',table='texttable',format='text',delimiter="'|'",escape='',quote='',truncate='False',log_errors=None, error_limit='0',error_table=None,externalSchema=None,staging_table=None):
def write_config_file(mode='insert', reuse_flag='',columns_flag='0',mapping='0',portNum='8081',database='reuse_gptest',host='localhost',formatOpts='text',file='data/external_file_01.txt',table='texttable',format='text',delimiter="'|'",escape='',quote='',truncate='False',log_errors=None, error_limit='0',error_table=None,externalSchema=None,staging_table=None,fast_match='false'):
f = open(mkpath('config/config_file'),'w')
f.write("VERSION: 1.0.0.1")
......@@ -177,6 +177,7 @@ def write_config_file(mode='insert', reuse_flag='',columns_flag='0',mapping='0',
f.write("\n - SCHEMA: "+externalSchema)
f.write("\n PRELOAD:")
f.write("\n - REUSE_TABLES: "+reuse_flag)
f.write("\n - FAST_MATCH: "+fast_match)
if staging_table:
f.write("\n - STAGING_TABLE: "+staging_table)
f.write("\n")
......@@ -425,7 +426,7 @@ class GPLoad_FormatOpts_TestCase(unittest.TestCase):
def test_00_gpload_formatOpts_setup(self):
"0 gpload setup"
for num in range(1,30):
for num in range(1,33):
f = open(mkpath('query%d.sql' % num),'w')
f.write("\! gpload -f "+mkpath('config/config_file')+ " -d reuse_gptest\n"+"\! gpload -f "+mkpath('config/config_file')+ " -d reuse_gptest\n")
f.close()
......@@ -669,6 +670,27 @@ class GPLoad_FormatOpts_TestCase(unittest.TestCase):
write_config_file(mode='insert',reuse_flag='true',file='data_file.txt',log_errors=True, error_limit='100')
self.doTest(29)
def test_30_gpload_reuse_table_update_mode_with_fast_match(self):
"30 gpload update mode with fast match"
drop_tables()
copy_data('external_file_04.txt','data_file.txt')
write_config_file(mode='update',reuse_flag='true',fast_match='true',file='data_file.txt')
self.doTest(30)
def test_31_gpload_reuse_table_update_mode_with_fast_match_and_different_columns_number(self):
"31 gpload update mode with fast match and differenct columns number) "
psql_run(cmd="ALTER TABLE texttable ADD column n8 text",dbname='reuse_gptest')
copy_data('external_file_08.txt','data_file.txt')
write_config_file(mode='update',reuse_flag='true',fast_match='true',file='data_file.txt')
self.doTest(31)
def test_32_gpload_update_mode_without_reuse_table_with_fast_match(self):
"32 gpload update mode when reuse table is false and fast match is true"
drop_tables()
copy_data('external_file_08.txt','data_file.txt')
write_config_file(mode='update',reuse_flag='false',fast_match='true',file='data_file.txt')
self.doTest(32)
if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromTestCase(GPLoad_FormatOpts_TestCase)
runner = unittest.TextTestRunner(verbosity=2)
......
2017-04-10 07:07:08|INFO|gpload session started 2017-04-10 07:07:08
2017-04-10 07:07:08|INFO|setting schema 'public' for table 'texttable'
2017-04-10 07:07:08|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/gpload/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt" -t 30
2017-04-10 07:07:08|INFO|did not find a staging table to reuse. creating staging_gpload_reusable_afbaac0da7ced19791c9ab9c537f41d3
2017-04-10 07:07:08|INFO|did not find an external table to reuse. creating ext_gpload_reusable_4fc43864_1dbc_11e7_a4b1_0242ac110005
2017-04-10 07:07:08|INFO|running time: 0.08 seconds
2017-04-10 07:07:08|INFO|rows Inserted = 0
2017-04-10 07:07:08|INFO|rows Updated = 32
2017-04-10 07:07:08|INFO|data formatting errors = 0
2017-04-10 07:07:08|INFO|gpload succeeded
2017-04-10 07:07:08|INFO|gpload session started 2017-04-10 07:07:08
2017-04-10 07:07:08|INFO|setting schema 'public' for table 'texttable'
2017-04-10 07:07:08|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/gpload/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt" -t 30
2017-04-10 07:07:08|INFO|reusing staging table staging_gpload_reusable_afbaac0da7ced19791c9ab9c537f41d3
2017-04-10 07:07:08|INFO|reusing external table ext_gpload_reusable_4fc43864_1dbc_11e7_a4b1_0242ac110005
2017-04-10 07:07:09|INFO|running time: 0.08 seconds
2017-04-10 07:07:09|INFO|rows Inserted = 0
2017-04-10 07:07:09|INFO|rows Updated = 32
2017-04-10 07:07:09|INFO|data formatting errors = 0
2017-04-10 07:07:09|INFO|gpload succeeded
2018-07-20 09:06:30|INFO|gpload session started 2018-07-20 09:06:30
2018-07-20 09:06:30|INFO|setting schema 'public' for table 'texttable'
2018-07-20 09:06:30|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt" -t 30
2018-07-20 09:06:30|INFO|did not find a staging table to reuse. creating staging_gpload_reusable_9faa546d615fa55cc3e9e2cee6f130b0
2018-07-20 09:06:30|INFO|reusing external table ext_gpload_reusable_30024be2_8bfc_11e8_83d4_0242ac110002
2018-07-20 09:06:30|ERROR|ERROR: column "n8" does not exist
LINE 1: ..."s2","s3","dt","n1","n2","n3","n4","n5","n6","n7","n8" FROM ...
^
encountered while running INSERT INTO staging_gpload_reusable_9faa546d615fa55cc3e9e2cee6f130b0 ("s1","s2","s3","dt","n1","n2","n3","n4","n5","n6","n7","n8") SELECT "s1","s2","s3","dt","n1","n2","n3","n4","n5","n6","n7","n8" FROM ext_gpload_reusable_30024be2_8bfc_11e8_83d4_0242ac110002
2018-07-20 09:06:30|INFO|rows Inserted = 0
2018-07-20 09:06:30|INFO|rows Updated = 0
2018-07-20 09:06:30|INFO|data formatting errors = 0
2018-07-20 09:06:30|INFO|gpload failed
2018-07-20 09:06:30|INFO|gpload session started 2018-07-20 09:06:30
2018-07-20 09:06:30|INFO|setting schema 'public' for table 'texttable'
2018-07-20 09:06:30|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt" -t 30
2018-07-20 09:06:30|INFO|did not find a staging table to reuse. creating staging_gpload_reusable_9faa546d615fa55cc3e9e2cee6f130b0
2018-07-20 09:06:30|INFO|reusing external table ext_gpload_reusable_30024be2_8bfc_11e8_83d4_0242ac110002
2018-07-20 09:06:30|ERROR|ERROR: column "n8" does not exist
LINE 1: ..."s2","s3","dt","n1","n2","n3","n4","n5","n6","n7","n8" FROM ...
^
encountered while running INSERT INTO staging_gpload_reusable_9faa546d615fa55cc3e9e2cee6f130b0 ("s1","s2","s3","dt","n1","n2","n3","n4","n5","n6","n7","n8") SELECT "s1","s2","s3","dt","n1","n2","n3","n4","n5","n6","n7","n8" FROM ext_gpload_reusable_30024be2_8bfc_11e8_83d4_0242ac110002
2018-07-20 09:06:30|INFO|rows Inserted = 0
2018-07-20 09:06:30|INFO|rows Updated = 0
2018-07-20 09:06:30|INFO|data formatting errors = 0
2018-07-20 09:06:30|INFO|gpload failed
2018-07-20 09:11:14|INFO|gpload session started 2018-07-20 09:11:14
2018-07-20 09:11:14|INFO|setting schema 'public' for table 'texttable'
2018-07-20 09:11:14|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt" -t 30
2018-07-20 09:11:14|WARN|fast_match is ignored when reuse_tables is false!
2018-07-20 09:11:14|INFO|running time: 0.28 seconds
2018-07-20 09:11:14|INFO|rows Inserted = 0
2018-07-20 09:11:14|INFO|rows Updated = 34
2018-07-20 09:11:14|INFO|data formatting errors = 0
2018-07-20 09:11:14|INFO|gpload succeeded
2018-07-20 09:11:14|INFO|gpload session started 2018-07-20 09:11:14
2018-07-20 09:11:14|INFO|setting schema 'public' for table 'texttable'
2018-07-20 09:11:14|INFO|started gpfdist -p 8081 -P 8082 -f "/home/gpadmin/workspace/gpdb/gpMgmt/bin/gpload_test/gpload2/data_file.txt" -t 30
2018-07-20 09:11:14|WARN|fast_match is ignored when reuse_tables is false!
2018-07-20 09:11:14|INFO|running time: 0.29 seconds
2018-07-20 09:11:14|INFO|rows Inserted = 0
2018-07-20 09:11:14|INFO|rows Updated = 34
2018-07-20 09:11:14|INFO|data formatting errors = 0
2018-07-20 09:11:14|INFO|gpload succeeded
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册