未验证 提交 1c732fdd 编写于 作者: C Caio Carrara

Merge remote-tracking branch 'clebergnu/archive_uncompress_gz_v3'

Signed-off-by: NCaio Carrara <ccarrara@redhat.com>
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
Module to help extract and create compressed archives. Module to help extract and create compressed archives.
""" """
import gzip
import logging import logging
import os import os
import platform import platform
...@@ -39,6 +40,37 @@ except ImportError: ...@@ -39,6 +40,37 @@ except ImportError:
LZMA_CAPABLE = False LZMA_CAPABLE = False
#: The first two bytes that all gzip files start with
GZIP_MAGIC = b'\037\213'
def is_gzip_file(path):
"""
Checks if file given by path has contents that suggests gzip file
"""
with open(path, 'rb') as gzip_file:
return gzip_file.read(len(GZIP_MAGIC)) == GZIP_MAGIC
def gzip_uncompress(path, output_path):
"""
Uncompress a gzipped file at path, to either a file or dir at output_path
"""
with gzip.GzipFile(filename=path, mode='rb') as input_file:
if os.path.isdir(output_path):
basename = os.path.basename(path)
if basename.endswith('.gz'):
basename = basename[:-3]
output_path = os.path.join(output_path, basename)
with open(output_path, 'wb') as output_file:
while True:
chunk = input_file.read(4096)
if not chunk:
break
output_file.write(chunk)
return output_path
class ArchiveException(Exception): class ArchiveException(Exception):
""" """
Base exception for all archive errors. Base exception for all archive errors.
...@@ -243,7 +275,8 @@ def is_archive(filename): ...@@ -243,7 +275,8 @@ def is_archive(filename):
:param filename: file to test. :param filename: file to test.
:return: `True` if it is an archive. :return: `True` if it is an archive.
""" """
return zipfile.is_zipfile(filename) or tarfile.is_tarfile(filename) return (zipfile.is_zipfile(filename) or tarfile.is_tarfile(filename) or
is_gzip_file(filename))
def compress(filename, path): def compress(filename, path):
...@@ -272,8 +305,11 @@ def uncompress(filename, path): ...@@ -272,8 +305,11 @@ def uncompress(filename, path):
:param filename: archive file name. :param filename: archive file name.
:param path: destination path to extract to. :param path: destination path to extract to.
""" """
with ArchiveFile.open(filename) as x: if is_gzip_file(filename) and not tarfile.is_tarfile(filename):
return x.extract(path) return gzip_uncompress(filename, path)
else:
with ArchiveFile.open(filename) as x:
return x.extract(path)
# Some aliases # Some aliases
......
...@@ -155,6 +155,32 @@ class ArchiveTest(unittest.TestCase): ...@@ -155,6 +155,32 @@ class ArchiveTest(unittest.TestCase):
self.assertEqual(ret, None, "Empty archive should return None (%s)" self.assertEqual(ret, None, "Empty archive should return None (%s)"
% ret) % ret)
def test_is_gzip_file(self):
gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz')
self.assertTrue(archive.is_gzip_file(gz_path))
def test_gzip_uncompress_to_dir(self):
gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz')
ret = archive.gzip_uncompress(gz_path, self.decompressdir)
self.assertEqual(ret, os.path.join(self.decompressdir, 'avocado'))
def test_gzip_uncompress_to_file(self):
gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz')
filename = os.path.join(self.decompressdir, 'other')
ret = archive.gzip_uncompress(gz_path, filename)
self.assertEqual(ret, filename)
def test_gzip_is_archive(self):
gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz')
self.assertTrue(archive.is_archive(gz_path))
def test_uncompress_gzip(self):
gz_path = os.path.join(BASEDIR, 'selftests', '.data', 'avocado.gz')
ret = archive.uncompress(gz_path, self.decompressdir)
self.assertEqual(ret, os.path.join(self.decompressdir, 'avocado'))
with open(ret, 'rb') as decompressed:
self.assertEqual(decompressed.read(), b'avocado\n')
def tearDown(self): def tearDown(self):
try: try:
shutil.rmtree(self.basedir) shutil.rmtree(self.basedir)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册