提交 9dfaf11e 编写于 作者: S Stephen Wu 提交者: Nikhil Kak

Use leaked schema dropper in gpcheckcat

- gpcheckcat should return a return code of 0 if schemas are found/dropped
- Backfilled tests for leaked schema logging
- Also cleaned up typo in Makefile
上级 37e045f4
......@@ -82,7 +82,7 @@ install: generate_greenplum_path_file
mkdir -p $(prefix)/lib/python
mkdir -p $(prefix)/sbin
#symlink gpcheckcat from bin to bin/lib to mainitain backward compatibility
#symlink gpcheckcat from bin to bin/lib to maintain backward compatibility
if [ -f bin/gpcheckcat ]; then \
ln -sf ../gpcheckcat bin/lib/gpcheckcat; \
fi
......
......@@ -24,6 +24,7 @@ import sys
from datetime import datetime
from time import localtime, strftime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
REPAIR_SCRIPT = 'runsql_%s.sh' % TIMESTAMP
......@@ -35,6 +36,7 @@ try:
from pygresql.pgdb import DatabaseError
from pygresql import pg
from gpcheckcat_modules.unique_index_violation_check import UniqueIndexViolationCheck
from gpcheckcat_modules.leaked_schema_dropper import LeakedSchemaDropper
except ImportError, e:
sys.exit('Error: unable to import module: ' + str(e))
......@@ -1866,56 +1868,6 @@ def checkPGClass():
logger.error("...")
def getLeakedSchemas():
logger.info('-----------------------------------')
logger.info('Checking for leaked temporary schemas')
db = connect2(GV.cfg[1], utilityMode=False)
leaked_schemas = []
# This query does a union of all the leaked temp schemas on the master as well as all the segments.
# The first part of the query uses gp_dist_random which gets the leaked schemas from only the segments
# The second part of the query gets the leaked temp schemas from just the master
# The simpler form of this query that pushed the union into the
# inner select does not run correctly on 3.2.x
qry = '''
SELECT distinct nspname as schema
FROM (
SELECT nspname, replace(nspname, 'pg_temp_','')::int as sess_id
FROM gp_dist_random('pg_namespace')
WHERE nspname ~ '^pg_temp_[0-9]+'
) n LEFT OUTER JOIN pg_stat_activity x using (sess_id)
WHERE x.sess_id is null
UNION
SELECT nspname as schema
FROM (
SELECT nspname, replace(nspname, 'pg_temp_','')::int as sess_id
FROM pg_namespace
WHERE nspname ~ '^pg_temp_[0-9]+'
) n LEFT OUTER JOIN pg_stat_activity x using (sess_id)
WHERE x.sess_id is null
'''
try:
curs = db.query(qry)
if curs.ntuples() == 0:
logger.info('[OK] temporary schemas')
else:
GV.checkStatus = False
setError(ERROR_REMOVE)
logger.info('[FAIL] temporary schemas')
logger.error('found %d unbound temporary schemas' % curs.ntuples())
for row in curs.getresult():
logger.error(" ... %s" % row[0])
leaked_schemas.append(row[0])
except Exception, e:
setError(ERROR_NOREPAIR)
myprint('[ERROR] executing test: checkPGNamespace')
myprint(' Execution error: ' + str(e))
return leaked_schemas
#############
def checkPGNamespace():
# Check for objects in various catalogs that are in a schema that has
......@@ -2049,6 +2001,26 @@ def addDemoteConstraint(seg, repair_sequence):
#############
def drop_leaked_schemas(leaked_schema_dropper, dbname):
logger.info('-----------------------------------')
logger.info('Checking for leaked temporary schemas')
db_connection = connect(database=dbname)
try:
dropped_schemas = leaked_schema_dropper.drop_leaked_schemas(db_connection)
if not dropped_schemas:
logger.info('[OK] temporary schemas')
else:
logger.info('[FAIL] temporary schemas')
myprint("Found and dropped %d unbound temporary schemas" % len(dropped_schemas))
logger.error('Dropped leaked schemas \'%s\' in the database \'%s\'' % (dropped_schemas, dbname))
except Exception, e:
setError(ERROR_NOREPAIR)
myprint(' Execution error: ' + str(e))
finally:
db_connection.close()
def checkDepend():
# Check for dependencies on non-existent objects
logger.info('-----------------------------------')
......@@ -3839,31 +3811,6 @@ def checkOwnersRepair():
else:
return None, None
def dropLeakedSchemas(dbname):
leaked_schemas = getLeakedSchemas()
if len(leaked_schemas) <= 0:
return
myprint("Dropping leaked schemas '%s' in the database '%s' " % (leaked_schemas, dbname))
db = connect(database=dbname)
try:
for schema in leaked_schemas:
# the query will return unquoted schema names
# but we only search for schemas with name like pg_temp_[0-9](see the method getLeakedSchemas),
# so we don't really need to quote the sql statement
qry = 'DROP SCHEMA IF EXISTS "%s" CASCADE;\n' % schema
logger.info(qry)
db.query(qry)
except Exception, e:
setError(ERROR_NOREPAIR)
myprint(' Execution error: ' + str(e))
finally:
if db is not None:
db.close()
def checkPoliciesRepair():
# changes to distribution policies
if len(GV.Policies) > 0:
......@@ -4879,6 +4826,7 @@ if __name__ == '__main__':
GV.report_cfg = getReportConfiguration()
GV.max_content = max([GV.cfg[dbid]['content'] for dbid in GV.cfg])
GV.catalog = getCatalog()
leaked_schema_dropper = LeakedSchemaDropper()
for dbname in GV.alldb:
......@@ -4892,7 +4840,8 @@ if __name__ == '__main__':
% (GV.opt['-U'], GV.dbname, GV.report_cfg[-1]['port'], GV.version))
myprint('-------------------------------------------------------------------')
dropLeakedSchemas(dbname)
drop_leaked_schemas(leaked_schema_dropper, dbname)
if GV.opt['-R']:
name = GV.opt['-R']
try:
......
......@@ -15,11 +15,13 @@ Feature: gpcheckcat tests
And psql should print pg_temp_ to stdout
And psql should print (1 row) to stdout
When the user runs "gpcheckcat leak"
Then gpchekcat should return a return code of 0
And the user runs "psql leak -f gppylib/test/behave/mgmt_utils/steps/data/gpcheckcat/leaked_schema.sql"
Then psql should return a return code of 0
And psql should print (0 rows) to stdout
And verify that the schema "good_schema" exists in "leak"
And the user runs "dropdb leak"
And verify that a log was created by gpcheckcat in the user's "gpAdminLogs" directory
Scenario: gpcheckcat should report unique index violations
Given database "test_index" is dropped and recreated
......
......@@ -16,11 +16,14 @@ class GpCheckCatTestCase(GpTestCase):
self.subject = imp.load_source('gpcheckcat', gpcheckcat_file)
self.subject.logger = Mock(spec=['log', 'info', 'debug', 'error'])
self.db_connection = Mock(spec=[])
self.db_connection = Mock(spec=['close'])
self.unique_index_violation_check = Mock(spec=['runCheck'])
self.unique_index_violation_check.runCheck.return_value = []
self.leaked_schema_dropper = Mock(spec=['drop_leaked_schemas'])
self.leaked_schema_dropper.drop_leaked_schemas.return_value = []
# MagicMock: we are choosing to trust the implementation of GV.cfg
# If we wanted full coverage we would make this a normal Mock()
# and fully define its behavior
......@@ -54,7 +57,7 @@ class GpCheckCatTestCase(GpTestCase):
def test_running_unique_index_violation_check__when_no_violations_are_found__passes_the_check(self):
self.subject.runOneCheck('unique_index_violation')
self.assertEqual(self.subject.GV.checkStatus, True)
self.assertTrue(self.subject.GV.checkStatus)
self.subject.setError.assert_not_called()
def test_running_unique_index_violation_check__when_violations_are_found__fails_the_check(self):
......@@ -65,7 +68,7 @@ class GpCheckCatTestCase(GpTestCase):
self.subject.runOneCheck('unique_index_violation')
self.assertEqual(self.subject.GV.checkStatus, False)
self.assertFalse(self.subject.GV.checkStatus)
self.subject.setError.assert_any_call(self.subject.ERROR_NOREPAIR)
def test_checkcat_report__after_running_unique_index_violations_check__reports_violations(self):
......@@ -83,16 +86,33 @@ class GpCheckCatTestCase(GpTestCase):
self.assertIn(expected_message1, log_messages)
self.assertIn(expected_message2, log_messages)
def test_drop_leaked_schemas__drops_orphaned_and_leaked_schemas(self):
self.db_connection.mock_add_spec(['close', 'query'])
self.subject.getLeakedSchemas = Mock(return_value=["fake_leak_1", "fake_leak_2"])
def test_drop_leaked_schemas__when_no_leaked_schemas_exist__passes_gpcheckcat(self):
self.subject.drop_leaked_schemas(self.leaked_schema_dropper, self.db_connection)
self.subject.setError.assert_not_called()
def test_drop_leaked_schemas____when_leaked_schemas_exist__finds_and_drops_leaked_schemas(self):
self.leaked_schema_dropper.drop_leaked_schemas.return_value = ['schema1', 'schema2']
self.subject.drop_leaked_schemas(self.leaked_schema_dropper, self.db_connection)
self.leaked_schema_dropper.drop_leaked_schemas.assert_called_once_with(self.db_connection)
self.subject.dropLeakedSchemas(dbname="fake_db")
def test_drop_leaked_schemas__when_leaked_schemas_exist__passes_gpcheckcat(self):
self.leaked_schema_dropper.drop_leaked_schemas.return_value = ['schema1', 'schema2']
drop_query_expected_list = [call('DROP SCHEMA IF EXISTS \"fake_leak_1\" CASCADE;\n'),
call('DROP SCHEMA IF EXISTS \"fake_leak_2\" CASCADE;\n')]
self.db_connection.query.assert_has_calls(drop_query_expected_list)
self.subject.drop_leaked_schemas(self.leaked_schema_dropper, self.db_connection)
self.subject.setError.assert_not_called()
def test_drop_leaked_schemas__when_leaked_schemas_exist__reports_which_schemas_are_dropped(self):
self.leaked_schema_dropper.drop_leaked_schemas.return_value = ['schema1', 'schema2']
self.subject.drop_leaked_schemas(self.leaked_schema_dropper, "some_db_name")
expected_message = "Found and dropped 2 unbound temporary schemas"
log_messages = [args[0][1] for args in self.subject.logger.log.call_args_list]
self.assertIn(expected_message, log_messages)
if __name__ == '__main__':
run_tests()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册