提交 a8cc0a63 编写于 作者: C Cleber Rosa

avocado.utils.archive: add gzip uncompression support

A gzip file doesn't have all the "archive" properties that a tarfile
or zipfile has, but still, it could be useful to wrap in the archive
module.

At this point, support is being added to allow for files compressed
with gzip, to be acknowledged as valid in avocado.utils.is_archive()
and transparently handled with avocado.util.uncompress().

The goal of this implementation is to allow for gzip files, which
are compressed tarbals, to be properly handled at uncompress() time.
Signed-off-by: NCleber Rosa <crosa@redhat.com>
上级 4f4e428d
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
Module to help extract and create compressed archives. Module to help extract and create compressed archives.
""" """
import gzip
import logging import logging
import os import os
import platform import platform
...@@ -39,6 +40,37 @@ except ImportError: ...@@ -39,6 +40,37 @@ except ImportError:
LZMA_CAPABLE = False LZMA_CAPABLE = False
#: The first two bytes that all gzip files start with
GZIP_MAGIC = b'\037\213'
def is_gzip_file(path):
"""
Checks if file given by path has contents that suggests gzip file
"""
with open(path, 'rb') as gzip_file:
return gzip_file.read(len(GZIP_MAGIC)) == GZIP_MAGIC
def gzip_uncompress(path, output_path):
"""
Uncompress a gzipped file at path, to either a file or dir at output_path
"""
with gzip.GzipFile(filename=path, mode='rb') as input_file:
if os.path.isdir(output_path):
basename = os.path.basename(path)
if basename.endswith('.gz'):
basename = basename[:-3]
output_path = os.path.join(output_path, basename)
with open(output_path, 'wb') as output_file:
while True:
chunk = input_file.read(4096)
if not chunk:
break
output_file.write(chunk)
return output_path
class ArchiveException(Exception): class ArchiveException(Exception):
""" """
Base exception for all archive errors. Base exception for all archive errors.
...@@ -243,7 +275,8 @@ def is_archive(filename): ...@@ -243,7 +275,8 @@ def is_archive(filename):
:param filename: file to test. :param filename: file to test.
:return: `True` if it is an archive. :return: `True` if it is an archive.
""" """
return zipfile.is_zipfile(filename) or tarfile.is_tarfile(filename) return (zipfile.is_zipfile(filename) or tarfile.is_tarfile(filename) or
is_gzip_file(filename))
def compress(filename, path): def compress(filename, path):
...@@ -272,6 +305,9 @@ def uncompress(filename, path): ...@@ -272,6 +305,9 @@ def uncompress(filename, path):
:param filename: archive file name. :param filename: archive file name.
:param path: destination path to extract to. :param path: destination path to extract to.
""" """
if is_gzip_file(filename) and not tarfile.is_tarfile(filename):
return gzip_uncompress(filename, path)
else:
with ArchiveFile.open(filename) as x: with ArchiveFile.open(filename) as x:
return x.extract(path) return x.extract(path)
......
...@@ -155,6 +155,32 @@ class ArchiveTest(unittest.TestCase): ...@@ -155,6 +155,32 @@ class ArchiveTest(unittest.TestCase):
self.assertEqual(ret, None, "Empty archive should return None (%s)" self.assertEqual(ret, None, "Empty archive should return None (%s)"
% ret) % ret)
def test_is_gzip_file(self):
gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz')
self.assertTrue(archive.is_gzip_file(gz_path))
def test_gzip_uncompress_to_dir(self):
gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz')
ret = archive.gzip_uncompress(gz_path, self.decompressdir)
self.assertEqual(ret, os.path.join(self.decompressdir, 'avocado'))
def test_gzip_uncompress_to_file(self):
gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz')
filename = os.path.join(self.decompressdir, 'other')
ret = archive.gzip_uncompress(gz_path, filename)
self.assertEqual(ret, filename)
def test_gzip_is_archive(self):
gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz')
self.assertTrue(archive.is_archive(gz_path))
def test_uncompress_gzip(self):
gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz')
ret = archive.uncompress(gz_path, self.decompressdir)
self.assertEqual(ret, os.path.join(self.decompressdir, 'avocado'))
with open(ret, 'rb') as decompressed:
self.assertEqual(decompressed.read(), b'avocado\n')
def tearDown(self): def tearDown(self):
try: try:
shutil.rmtree(self.basedir) shutil.rmtree(self.basedir)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册