提交 09a16998 编写于 作者: R Rudá Moura 提交者: Rudá Moura

Merge pull request #106 from lmr/ruda-new_archive_module

New archive module
......@@ -331,7 +331,8 @@ class Job(object):
# Let's clean up test artifacts
if self.args is not None:
if self.args.archive:
archive.create_zip(self.debugdir, self.debugdir)
filename = self.debugdir + '.zip'
archive.create(filename, self.debugdir)
if not self.args.keep_tmp_files:
data_dir.clean_tmp_files()
......
# license: MIT
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# Based on django.utils.archive, which is on its turn Based on "python-archive"
# http://pypi.python.org/pypi/python-archive/
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# Copyright (c) 2010 Gary Wilson Jr. <gary.wilson@gmail.com> and contributors.
# See LICENSE for more details.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# Copyright: Red Hat Inc. 2014
# Author: Ruda Moura <rmoura@redhat.com>
"""
Library used to transparently uncompress compressed files.
Module to help extract and create compressed archives.
"""
import logging
import os
import shutil
import tarfile
import zipfile
log = logging.getLogger('avocado.test')
import tarfile
class ArchiveException(Exception):
"""
Base exception class for all archive errors.
Base exception for all archive errors.
"""
pass
class UnrecognizedArchiveFormat(ArchiveException):
class ArchiveFile(object):
"""
Error raised when passed file is not a recognized archive format.
"""
pass
Class that represents an Archive file.
def extract(path, to_path=''):
"""
Unpack the tar or zip file at the specified path to the directory
specified by to_path.
Archives are ZIP files or Tarballs.
"""
with Archive(path) as archive:
archive.extract(to_path)
# extension info: is_zip, is_tar, zipfile|tarfile, +mode
_extension_table = {
'.zip': (True, False, zipfile.ZipFile, ''),
'.tar': (False, True, tarfile.open, ''),
'.tar.gz': (False, True, tarfile.open, ':gz'),
'.tgz': (False, True, tarfile.open, ':gz'),
'.tar.bz2': (False, True, tarfile.open, ':bz2'),
'.tbz2': (False, True, tarfile.open, ':bz2'), }
class Archive(object):
"""
The external API class that encapsulates an archive implementation.
"""
def __init__(self, path):
self._archive = self._archive_cls(path)(path)
def __init__(self, filename, mode='r'):
"""
Creates an instance of :class:`ArchiveFile`.
@staticmethod
def _archive_cls(path):
cls = None
if isinstance(path, basestring):
filename = path
:param filename: the archive file name.
:param mode: file mode, `r` read, `w` write.
"""
self.filename = filename
self.mode = mode
engine = None
for ext in ArchiveFile._extension_table:
if filename.endswith(ext):
(self.is_zip,
self.is_tar,
engine,
extra_mode) = ArchiveFile._extension_table[ext]
if engine is not None:
self.mode += extra_mode
self._engine = engine(self.filename, self.mode)
else:
try:
filename = path.name
except AttributeError:
raise UnrecognizedArchiveFormat(
"File object not a recognized archive format.")
base, tail_ext = os.path.splitext(filename.lower())
cls = extension_map.get(tail_ext)
if not cls:
base, ext = os.path.splitext(base)
cls = extension_map.get(ext)
if not cls:
raise UnrecognizedArchiveFormat(
"Path not a recognized archive format: %s" % filename)
return cls
raise ArchiveException('file is not an archive')
def __repr__(self):
return "ArchiveFile('%s', '%s')" % (self.filename, self.mode)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def extract(self, to_path=''):
self._archive.extract(to_path)
def list(self):
self._archive.list()
def close(self):
self._archive.close()
def __exit__(self, exc_type, exc_value, exc_traceback):
if self._engine is not None:
self.close()
@classmethod
def open(cls, filename, mode='r'):
"""
Creates an instance of :class:`ArchiveFile`.
class BaseArchive(object):
:param filename: the archive file name.
:param mode: file mode, `r` read, `w` write.
"""
return cls(filename, mode)
"""
Base Archive class. Implementations should inherit this class.
"""
def add(self, filename, arcname=None):
"""
Add file to the archive.
def split_leading_dir(self, path):
path = str(path)
path = path.lstrip('/').lstrip('\\')
if '/' in path and (('\\' in path and path.find('/') < path.find('\\'))
or '\\' not in path):
return path.split('/', 1)
elif '\\' in path:
return path.split('\\', 1)
:param filename: file to archive.
:param arcname: alternative name for the file in the archive.
"""
if self.is_zip:
self._engine.write(filename, arcname, zipfile.ZIP_DEFLATED)
else:
return path, ''
self._engine.add(filename, arcname)
def has_leading_dir(self, paths):
def list(self):
"""
Returns true if all the paths have the same leading path name
(i.e., everything is in one subdirectory in an archive)
List files to the standard output.
"""
common_prefix = None
for path in paths:
prefix, _ = self.split_leading_dir(path)
if not prefix:
return False
elif common_prefix is None:
common_prefix = prefix
elif prefix != common_prefix:
return False
return True
def extract(self):
raise NotImplementedError('Subclasses of BaseArchive must provide an '
'extract() method')
if self.is_zip:
self._engine.printdir()
else:
self._engine.list()
def list(self):
raise NotImplementedError('Subclasses of BaseArchive must provide a '
'list() method')
class TarArchive(BaseArchive):
def __init__(self, path):
self._archive = tarfile.open(path)
def list(self, *args, **kwargs):
self._archive.list(*args, **kwargs)
def extract(self, to_path):
# note: python<=2.5 doesn't seem to know about pax headers, filter them
members = [member for member in self._archive.getmembers()
if member.name != 'pax_global_header']
leading = self.has_leading_dir(members)
for member in members:
name = member.name
if leading:
name = self.split_leading_dir(name)[1]
filename = os.path.join(to_path, name)
if member.isdir():
if filename and not os.path.exists(filename):
os.makedirs(filename)
else:
try:
extracted = self._archive.extractfile(member)
except (KeyError, AttributeError) as exc:
# Some corrupt tar files seem to produce this
# (specifically bad symlinks)
log.error("In the tar file %s the member %s is "
"invalid: %s" % (name, member.name, exc))
else:
dirname = os.path.dirname(filename)
if dirname and not os.path.exists(dirname):
os.makedirs(dirname)
with open(filename, 'wb') as outfile:
if extracted is not None:
shutil.copyfileobj(extracted, outfile)
if not member.issym():
os.chmod(filename, member.mode)
else:
log.error("Member correspondent to file %s does "
"not seem to be a regular file or a link",
filename)
finally:
if extracted:
extracted.close()
def extract(self, path='.'):
"""
Extract all files from the archive.
def close(self):
self._archive.close()
class ZipArchive(BaseArchive):
def __init__(self, path):
self._archive = zipfile.ZipFile(path)
def list(self, *args, **kwargs):
self._archive.printdir(*args, **kwargs)
def extract(self, to_path):
namelist = self._archive.namelist()
leading = self.has_leading_dir(namelist)
for name in namelist:
data = self._archive.read(name)
if leading:
name = self.split_leading_dir(name)[1]
filename = os.path.join(to_path, name)
dirname = os.path.dirname(filename)
if dirname and not os.path.exists(dirname):
os.makedirs(dirname)
if filename.endswith(('/', '\\')):
# A directory
if not os.path.exists(filename):
os.makedirs(filename)
else:
with open(filename, 'wb') as outfile:
outfile.write(data)
:param path: destination path.
"""
self._engine.extractall(path)
def close(self):
self._archive.close()
extension_map = {
'.tar': TarArchive,
'.tar.bz2': TarArchive,
'.tar.gz': TarArchive,
'.tgz': TarArchive,
'.tz2': TarArchive,
'.zip': ZipArchive,
}
# Handy functions for ZIP files
"""
Close archive.
"""
self._engine.close()
def create_zip(name, path):
def is_archive(filename):
"""
Create a ZIP archive from a directory.
Test if a given file is an archive.
:param name: the name of the zip file. The .zip suffix is optional.
:param path: the directory with files to compress.
:param filename: file to test.
:return: `True` if it is an archive.
"""
if name.endswith('.zip') is False:
name += '.zip'
with zipfile.ZipFile(name, 'w') as zf:
for root, _, files in os.walk(path):
for f in files:
newroot = root.replace(path, '')
zf.write(os.path.join(root, f),
os.path.join(newroot, f), zipfile.ZIP_DEFLATED)
return zipfile.is_zipfile(filename) or tarfile.is_tarfile(filename)
def uncompress_zip(name, path):
def compress(filename, path):
"""
Uncompress a ZIP archive under a directory.
Compress files in an archive.
:param name: the name of the zip file. The .zip suffix is optional.
:param path: the directory to uncompress de files.
:param filename: archive file name.
:param path: origin directory path to files to compress. No
individual files allowed.
"""
with ArchiveFile.open(filename, 'w') as x:
if os.path.isdir(path):
for root, _, files in os.walk(path):
for name in files:
newroot = root.replace(path, '')
x.add(os.path.join(root, name),
os.path.join(newroot, name))
elif os.path.isfile(path):
x.add(path, os.path.basename(path))
def uncompress(filename, path):
"""
with zipfile.ZipFile(name) as zf:
zf.extractall(path)
Extract files from an archive.
:param filename: archive file name.
:param path: destination path to extract to.
"""
with ArchiveFile.open(filename) as x:
x.extract(path)
# Some aliases
create = compress
extract = uncompress
#!/usr/bin/env python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright: Red Hat Inc. 2013-2014
# Author: Lucas Meneghel Rodrigues <lmr@redhat.com>
import unittest
import tempfile
import os
import sys
import shutil
import random
# simple magic for using scripts within a source tree
basedir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
basedir = os.path.dirname(basedir)
if os.path.isdir(os.path.join(basedir, 'avocado')):
sys.path.append(basedir)
from avocado.utils import crypto
from avocado.utils import archive
class ArchiveTest(unittest.TestCase):
def setUp(self):
self.basedir = tempfile.mkdtemp(prefix='archive_datadir_unittest')
self.compressdir = tempfile.mkdtemp(dir=self.basedir)
self.decompressdir = tempfile.mkdtemp(dir=self.basedir)
self.sys_random = random.SystemRandom()
def compress_and_check_dir(self, extension):
hash_map_1 = {}
for i in xrange(self.sys_random.randint(10, 20)):
if i % 2 == 0:
compressdir = tempfile.mkdtemp(dir=self.compressdir)
else:
compressdir = self.compressdir
str_length = self.sys_random.randint(30, 50)
fd, filename = tempfile.mkstemp(dir=compressdir, text=True)
os.write(fd, crypto.get_random_string(str_length))
relative_path = filename.replace(self.compressdir, '')
hash_map_1[relative_path] = crypto.hash_file(filename)
archive_filename = self.compressdir + extension
archive.compress(archive_filename, self.compressdir)
archive.uncompress(archive_filename, self.decompressdir)
hash_map_2 = {}
for root, _, files in os.walk(self.decompressdir):
for name in files:
file_path = os.path.join(root, name)
relative_path = file_path.replace(self.decompressdir, '')
hash_map_2[relative_path] = crypto.hash_file(file_path)
self.assertEqual(hash_map_1, hash_map_2)
def compress_and_check_file(self, extension):
str_length = self.sys_random.randint(30, 50)
fd, filename = tempfile.mkstemp(dir=self.basedir, text=True)
os.write(fd, crypto.get_random_string(str_length))
original_hash = crypto.hash_file(filename)
dstfile = filename + extension
archive_filename = os.path.join(self.basedir, dstfile)
archive.compress(archive_filename, filename)
archive.uncompress(archive_filename, self.decompressdir)
decompress_file = os.path.join(self.decompressdir,
os.path.basename(filename))
decompress_hash = crypto.hash_file(decompress_file)
self.assertEqual(original_hash, decompress_hash)
def test_zip_dir(self):
self.compress_and_check_dir('.zip')
def test_zip_file(self):
self.compress_and_check_file('.zip')
def test_tar_dir(self):
self.compress_and_check_dir('.tar')
def test_tar_file(self):
self.compress_and_check_file('.tar')
def test_tgz_dir(self):
self.compress_and_check_dir('.tar.gz')
def test_tgz_file(self):
self.compress_and_check_file('.tar.gz')
def test_tgz_2_dir(self):
self.compress_and_check_dir('.tgz')
def test_tgz_2_file(self):
self.compress_and_check_file('.tgz')
def test_tbz2_dir(self):
self.compress_and_check_dir('.tar.bz2')
def test_tbz2_file(self):
self.compress_and_check_file('.tar.bz2')
def test_tbz2_2_dir(self):
self.compress_and_check_dir('.tbz2')
def test_tbz2_2_file(self):
self.compress_and_check_file('.tbz2')
def tearDown(self):
pass
try:
shutil.rmtree(self.basedir)
except OSError:
pass
if __name__ == '__main__':
unittest.main()
......@@ -16,7 +16,6 @@
from avocado import test
from avocado import job
from avocado.utils import download, archive, build
from avocado.linux import kernel_build
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册