diff --git a/avocado/utils/archive.py b/avocado/utils/archive.py index 29992065e8f69fd95a2114cf2b2285c3d3639851..b969c205bb52f9af49b4f8da6341e652d313c822 100644 --- a/avocado/utils/archive.py +++ b/avocado/utils/archive.py @@ -15,6 +15,7 @@ Module to help extract and create compressed archives. """ +import gzip import logging import os import platform @@ -39,6 +40,37 @@ except ImportError: LZMA_CAPABLE = False +#: The first two bytes that all gzip files start with +GZIP_MAGIC = b'\037\213' + + +def is_gzip_file(path): + """ + Checks if file given by path has contents that suggests gzip file + """ + with open(path, 'rb') as gzip_file: + return gzip_file.read(len(GZIP_MAGIC)) == GZIP_MAGIC + + +def gzip_uncompress(path, output_path): + """ + Uncompress a gzipped file at path, to either a file or dir at output_path + """ + with gzip.GzipFile(filename=path, mode='rb') as input_file: + if os.path.isdir(output_path): + basename = os.path.basename(path) + if basename.endswith('.gz'): + basename = basename[:-3] + output_path = os.path.join(output_path, basename) + with open(output_path, 'wb') as output_file: + while True: + chunk = input_file.read(4096) + if not chunk: + break + output_file.write(chunk) + return output_path + + class ArchiveException(Exception): """ Base exception for all archive errors. @@ -243,7 +275,8 @@ def is_archive(filename): :param filename: file to test. :return: `True` if it is an archive. """ - return zipfile.is_zipfile(filename) or tarfile.is_tarfile(filename) + return (zipfile.is_zipfile(filename) or tarfile.is_tarfile(filename) or + is_gzip_file(filename)) def compress(filename, path): @@ -272,8 +305,11 @@ def uncompress(filename, path): :param filename: archive file name. :param path: destination path to extract to. """ - with ArchiveFile.open(filename) as x: - return x.extract(path) + if is_gzip_file(filename) and not tarfile.is_tarfile(filename): + return gzip_uncompress(filename, path) + else: + with ArchiveFile.open(filename) as x: + return x.extract(path) # Some aliases diff --git a/selftests/.data/avocado.gz b/selftests/.data/avocado.gz new file mode 100644 index 0000000000000000000000000000000000000000..0d9ccab4c19492e4ac474bffc2260c8edfeffa57 Binary files /dev/null and b/selftests/.data/avocado.gz differ diff --git a/selftests/unit/test_archive.py b/selftests/unit/test_archive.py index 12211c6ac2d7af1fa3622acb4d104f79e73db242..37324b8d3057b7db29e1c65b0298643caebe5b90 100644 --- a/selftests/unit/test_archive.py +++ b/selftests/unit/test_archive.py @@ -155,6 +155,32 @@ class ArchiveTest(unittest.TestCase): self.assertEqual(ret, None, "Empty archive should return None (%s)" % ret) + def test_is_gzip_file(self): + gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz') + self.assertTrue(archive.is_gzip_file(gz_path)) + + def test_gzip_uncompress_to_dir(self): + gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz') + ret = archive.gzip_uncompress(gz_path, self.decompressdir) + self.assertEqual(ret, os.path.join(self.decompressdir, 'avocado')) + + def test_gzip_uncompress_to_file(self): + gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz') + filename = os.path.join(self.decompressdir, 'other') + ret = archive.gzip_uncompress(gz_path, filename) + self.assertEqual(ret, filename) + + def test_gzip_is_archive(self): + gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz') + self.assertTrue(archive.is_archive(gz_path)) + + def test_uncompress_gzip(self): + gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz') + ret = archive.uncompress(gz_path, self.decompressdir) + self.assertEqual(ret, os.path.join(self.decompressdir, 'avocado')) + with open(ret, 'rb') as decompressed: + self.assertEqual(decompressed.read(), b'avocado\n') + def tearDown(self): try: shutil.rmtree(self.basedir)