diff --git a/gpMgmt/bin/gppylib/db/test/unit/test_cluster_dbconn.py b/gpMgmt/bin/gppylib/db/test/unit/test_cluster_dbconn.py index 28dc242139a5a1273b43fc52bf8468d9a41cc8e5..9e76e35692027f99ea59f699aa671b5d9c96d2fe 100644 --- a/gpMgmt/bin/gppylib/db/test/unit/test_cluster_dbconn.py +++ b/gpMgmt/bin/gppylib/db/test/unit/test_cluster_dbconn.py @@ -8,9 +8,17 @@ from gppylib.db import dbconn class ConnectTestCase(unittest.TestCase): """A test case for dbconn.connect().""" - def setUp(self): + @classmethod + def setUpClass(cls): # Connect to the database pointed to by PGHOST et al. - self.url = dbconn.DbURL() + with dbconn.connect(dbconn.DbURL()) as conn: + # using the pg.DB connection so that each SQL is done as a single + # transaction + db = pg.DB(conn) + test_database_name = "gpdb_test_database" + db.query("DROP DATABASE IF EXISTS %s" % test_database_name) + db.query("CREATE DATABASE %s" % test_database_name) + cls.url = dbconn.DbURL(dbname=test_database_name) def _raise_warning(self, db, msg): """Raises a WARNING message on the given pg.DB.""" @@ -72,6 +80,18 @@ class ConnectTestCase(unittest.TestCase): self.assertEqual(result, '"$user",public') + def test_search_path_cve_2018_1058(self): + with dbconn.connect(self.url) as conn: + dbconn.execSQL(conn, "CREATE TABLE public.Names (name VARCHAR(255))") + dbconn.execSQL(conn, "INSERT INTO public.Names VALUES ('AAA')") + + dbconn.execSQL(conn, "CREATE FUNCTION public.lower(VARCHAR) RETURNS text AS $$ " + " SELECT 'Alice was here: ' || $1;" + "$$ LANGUAGE SQL IMMUTABLE;") + + name = dbconn.execSQLForSingleton(conn, "SELECT lower(name) FROM public.Names") + self.assertEqual(name, 'aaa') + def test_no_transaction_after_connect(self): with dbconn.connect(self.url) as conn: db = pg.DB(conn)