From a8cc0a63668bcf7bfc7502cf507fafcf5b0e7d05 Mon Sep 17 00:00:00 2001 From: Cleber Rosa Date: Mon, 26 Nov 2018 18:59:46 -0500 Subject: [PATCH] avocado.utils.archive: add gzip uncompression support A gzip file doesn't have all the "archive" properties that a tarfile or zipfile has, but still, it could be useful to wrap in the archive module. At this point, support is being added to allow for files compressed with gzip, to be acknowledged as valid in avocado.utils.is_archive() and transparently handled with avocado.util.uncompress(). The goal of this implementation is to allow for gzip files, which are compressed tarbals, to be properly handled at uncompress() time. Signed-off-by: Cleber Rosa --- avocado/utils/archive.py | 42 ++++++++++++++++++++++++++++++--- selftests/.data/avocado.gz | Bin 0 -> 36 bytes selftests/unit/test_archive.py | 26 ++++++++++++++++++++ 3 files changed, 65 insertions(+), 3 deletions(-) create mode 100644 selftests/.data/avocado.gz diff --git a/avocado/utils/archive.py b/avocado/utils/archive.py index 29992065..b969c205 100644 --- a/avocado/utils/archive.py +++ b/avocado/utils/archive.py @@ -15,6 +15,7 @@ Module to help extract and create compressed archives. """ +import gzip import logging import os import platform @@ -39,6 +40,37 @@ except ImportError: LZMA_CAPABLE = False +#: The first two bytes that all gzip files start with +GZIP_MAGIC = b'\037\213' + + +def is_gzip_file(path): + """ + Checks if file given by path has contents that suggests gzip file + """ + with open(path, 'rb') as gzip_file: + return gzip_file.read(len(GZIP_MAGIC)) == GZIP_MAGIC + + +def gzip_uncompress(path, output_path): + """ + Uncompress a gzipped file at path, to either a file or dir at output_path + """ + with gzip.GzipFile(filename=path, mode='rb') as input_file: + if os.path.isdir(output_path): + basename = os.path.basename(path) + if basename.endswith('.gz'): + basename = basename[:-3] + output_path = os.path.join(output_path, basename) + with open(output_path, 'wb') as output_file: + while True: + chunk = input_file.read(4096) + if not chunk: + break + output_file.write(chunk) + return output_path + + class ArchiveException(Exception): """ Base exception for all archive errors. @@ -243,7 +275,8 @@ def is_archive(filename): :param filename: file to test. :return: `True` if it is an archive. """ - return zipfile.is_zipfile(filename) or tarfile.is_tarfile(filename) + return (zipfile.is_zipfile(filename) or tarfile.is_tarfile(filename) or + is_gzip_file(filename)) def compress(filename, path): @@ -272,8 +305,11 @@ def uncompress(filename, path): :param filename: archive file name. :param path: destination path to extract to. """ - with ArchiveFile.open(filename) as x: - return x.extract(path) + if is_gzip_file(filename) and not tarfile.is_tarfile(filename): + return gzip_uncompress(filename, path) + else: + with ArchiveFile.open(filename) as x: + return x.extract(path) # Some aliases diff --git a/selftests/.data/avocado.gz b/selftests/.data/avocado.gz new file mode 100644 index 0000000000000000000000000000000000000000..0d9ccab4c19492e4ac474bffc2260c8edfeffa57 GIT binary patch literal 36 rcmb2|=HL)*{S(c=oLH8hoS2f&;H`7o-_Pgdb0&tx?(*^+3=9ka)9nhG literal 0 HcmV?d00001 diff --git a/selftests/unit/test_archive.py b/selftests/unit/test_archive.py index 12211c6a..37324b8d 100644 --- a/selftests/unit/test_archive.py +++ b/selftests/unit/test_archive.py @@ -155,6 +155,32 @@ class ArchiveTest(unittest.TestCase): self.assertEqual(ret, None, "Empty archive should return None (%s)" % ret) + def test_is_gzip_file(self): + gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz') + self.assertTrue(archive.is_gzip_file(gz_path)) + + def test_gzip_uncompress_to_dir(self): + gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz') + ret = archive.gzip_uncompress(gz_path, self.decompressdir) + self.assertEqual(ret, os.path.join(self.decompressdir, 'avocado')) + + def test_gzip_uncompress_to_file(self): + gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz') + filename = os.path.join(self.decompressdir, 'other') + ret = archive.gzip_uncompress(gz_path, filename) + self.assertEqual(ret, filename) + + def test_gzip_is_archive(self): + gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz') + self.assertTrue(archive.is_archive(gz_path)) + + def test_uncompress_gzip(self): + gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz') + ret = archive.uncompress(gz_path, self.decompressdir) + self.assertEqual(ret, os.path.join(self.decompressdir, 'avocado')) + with open(ret, 'rb') as decompressed: + self.assertEqual(decompressed.read(), b'avocado\n') + def tearDown(self): try: shutil.rmtree(self.basedir) -- GitLab