import hashlib import os import shutil import sys from io import BytesIO from shutil import copy, rmtree from tempfile import mkdtemp import pytest from mock import Mock, patch from pip._internal.download import ( _copy_source_tree, _download_http_url, parse_content_disposition, sanitize_content_filename, unpack_file_url, unpack_http_url, ) from pip._internal.exceptions import HashMismatch from pip._internal.models.link import Link from pip._internal.network.session import PipSession from pip._internal.utils.hashes import Hashes from pip._internal.utils.urls import path_to_url from tests.lib import create_file from tests.lib.filesystem import ( get_filelist, make_socket_file, make_unreadable_file, ) from tests.lib.path import Path def test_unpack_http_url_with_urllib_response_without_content_type(data): """ It should download and unpack files even if no Content-Type header exists """ _real_session = PipSession() def _fake_session_get(*args, **kwargs): resp = _real_session.get(*args, **kwargs) del resp.headers["Content-Type"] return resp session = Mock() session.get = _fake_session_get uri = path_to_url(data.packages.joinpath("simple-1.0.tar.gz")) link = Link(uri) temp_dir = mkdtemp() try: unpack_http_url( link, temp_dir, download_dir=None, session=session, ) assert set(os.listdir(temp_dir)) == { 'PKG-INFO', 'setup.cfg', 'setup.py', 'simple', 'simple.egg-info' } finally: rmtree(temp_dir) class FakeStream(object): def __init__(self, contents): self._io = BytesIO(contents) def read(self, size, decode_content=None): return self._io.read(size) def stream(self, size, decode_content=None): yield self._io.read(size) def release_conn(self): pass class MockResponse(object): def __init__(self, contents): self.raw = FakeStream(contents) self.content = contents self.request = None self.status_code = 200 self.connection = None self.url = None self.headers = {} self.history = [] def raise_for_status(self): pass class MockConnection(object): def _send(self, req, **kwargs): raise NotImplementedError("_send must be overridden for tests") def send(self, req, **kwargs): resp = self._send(req, **kwargs) for cb in req.hooks.get("response", []): cb(resp) return resp class MockRequest(object): def __init__(self, url): self.url = url self.headers = {} self.hooks = {} def register_hook(self, event_name, callback): self.hooks.setdefault(event_name, []).append(callback) @patch('pip._internal.download.unpack_file') def test_unpack_http_url_bad_downloaded_checksum(mock_unpack_file): """ If already-downloaded file has bad checksum, re-download. """ base_url = 'http://www.example.com/somepackage.tgz' contents = b'downloaded' download_hash = hashlib.new('sha1', contents) link = Link(base_url + '#sha1=' + download_hash.hexdigest()) session = Mock() session.get = Mock() response = session.get.return_value = MockResponse(contents) response.headers = {'content-type': 'application/x-tar'} response.url = base_url download_dir = mkdtemp() try: downloaded_file = os.path.join(download_dir, 'somepackage.tgz') create_file(downloaded_file, 'some contents') unpack_http_url( link, 'location', download_dir=download_dir, session=session, hashes=Hashes({'sha1': [download_hash.hexdigest()]}) ) # despite existence of downloaded file with bad hash, downloaded again session.get.assert_called_once_with( 'http://www.example.com/somepackage.tgz', headers={"Accept-Encoding": "identity"}, stream=True, ) # cached file is replaced with newly downloaded file with open(downloaded_file) as fh: assert fh.read() == 'downloaded' finally: rmtree(download_dir) @pytest.mark.parametrize("filename, expected", [ ('dir/file', 'file'), ('../file', 'file'), ('../../file', 'file'), ('../', ''), ('../..', '..'), ('/', ''), ]) def test_sanitize_content_filename(filename, expected): """ Test inputs where the result is the same for Windows and non-Windows. """ assert sanitize_content_filename(filename) == expected @pytest.mark.parametrize("filename, win_expected, non_win_expected", [ ('dir\\file', 'file', 'dir\\file'), ('..\\file', 'file', '..\\file'), ('..\\..\\file', 'file', '..\\..\\file'), ('..\\', '', '..\\'), ('..\\..', '..', '..\\..'), ('\\', '', '\\'), ]) def test_sanitize_content_filename__platform_dependent( filename, win_expected, non_win_expected ): """ Test inputs where the result is different for Windows and non-Windows. """ if sys.platform == 'win32': expected = win_expected else: expected = non_win_expected assert sanitize_content_filename(filename) == expected @pytest.mark.parametrize("content_disposition, default_filename, expected", [ ('attachment;filename="../file"', 'df', 'file'), ]) def test_parse_content_disposition( content_disposition, default_filename, expected ): actual = parse_content_disposition(content_disposition, default_filename) assert actual == expected def test_download_http_url__no_directory_traversal(tmpdir): """ Test that directory traversal doesn't happen on download when the Content-Disposition header contains a filename with a ".." path part. """ mock_url = 'http://www.example.com/whatever.tgz' contents = b'downloaded' link = Link(mock_url) session = Mock() resp = MockResponse(contents) resp.url = mock_url resp.headers = { # Set the content-type to a random value to prevent # mimetypes.guess_extension from guessing the extension. 'content-type': 'random', 'content-disposition': 'attachment;filename="../out_dir_file"' } session.get.return_value = resp download_dir = tmpdir.joinpath('download') os.mkdir(download_dir) file_path, content_type = _download_http_url( link, session, download_dir, hashes=None, progress_bar='on', ) # The file should be downloaded to download_dir. actual = os.listdir(download_dir) assert actual == ['out_dir_file'] @pytest.fixture def clean_project(tmpdir_factory, data): tmpdir = Path(str(tmpdir_factory.mktemp("clean_project"))) new_project_dir = tmpdir.joinpath("FSPkg") path = data.packages.joinpath("FSPkg") shutil.copytree(path, new_project_dir) return new_project_dir def test_copy_source_tree(clean_project, tmpdir): target = tmpdir.joinpath("target") expected_files = get_filelist(clean_project) assert len(expected_files) == 3 _copy_source_tree(clean_project, target) copied_files = get_filelist(target) assert expected_files == copied_files @pytest.mark.skipif("sys.platform == 'win32' or sys.version_info < (3,)") def test_copy_source_tree_with_socket(clean_project, tmpdir, caplog): target = tmpdir.joinpath("target") expected_files = get_filelist(clean_project) socket_path = str(clean_project.joinpath("aaa")) make_socket_file(socket_path) _copy_source_tree(clean_project, target) copied_files = get_filelist(target) assert expected_files == copied_files # Warning should have been logged. assert len(caplog.records) == 1 record = caplog.records[0] assert record.levelname == 'WARNING' assert socket_path in record.message @pytest.mark.skipif("sys.platform == 'win32' or sys.version_info < (3,)") def test_copy_source_tree_with_socket_fails_with_no_socket_error( clean_project, tmpdir ): target = tmpdir.joinpath("target") expected_files = get_filelist(clean_project) make_socket_file(clean_project.joinpath("aaa")) unreadable_file = clean_project.joinpath("bbb") make_unreadable_file(unreadable_file) with pytest.raises(shutil.Error) as e: _copy_source_tree(clean_project, target) errored_files = [err[0] for err in e.value.args[0]] assert len(errored_files) == 1 assert unreadable_file in errored_files copied_files = get_filelist(target) # All files without errors should have been copied. assert expected_files == copied_files def test_copy_source_tree_with_unreadable_dir_fails(clean_project, tmpdir): target = tmpdir.joinpath("target") expected_files = get_filelist(clean_project) unreadable_file = clean_project.joinpath("bbb") make_unreadable_file(unreadable_file) with pytest.raises(shutil.Error) as e: _copy_source_tree(clean_project, target) errored_files = [err[0] for err in e.value.args[0]] assert len(errored_files) == 1 assert unreadable_file in errored_files copied_files = get_filelist(target) # All files without errors should have been copied. assert expected_files == copied_files class Test_unpack_file_url(object): def prep(self, tmpdir, data): self.build_dir = tmpdir.joinpath('build') self.download_dir = tmpdir.joinpath('download') os.mkdir(self.build_dir) os.mkdir(self.download_dir) self.dist_file = "simple-1.0.tar.gz" self.dist_file2 = "simple-2.0.tar.gz" self.dist_path = data.packages.joinpath(self.dist_file) self.dist_path2 = data.packages.joinpath(self.dist_file2) self.dist_url = Link(path_to_url(self.dist_path)) self.dist_url2 = Link(path_to_url(self.dist_path2)) def test_unpack_file_url_no_download(self, tmpdir, data): self.prep(tmpdir, data) unpack_file_url(self.dist_url, self.build_dir) assert os.path.isdir(os.path.join(self.build_dir, 'simple')) assert not os.path.isfile( os.path.join(self.download_dir, self.dist_file)) def test_unpack_file_url_and_download(self, tmpdir, data): self.prep(tmpdir, data) unpack_file_url(self.dist_url, self.build_dir, download_dir=self.download_dir) assert os.path.isdir(os.path.join(self.build_dir, 'simple')) assert os.path.isfile(os.path.join(self.download_dir, self.dist_file)) def test_unpack_file_url_download_already_exists(self, tmpdir, data, monkeypatch): self.prep(tmpdir, data) # add in previous download (copy simple-2.0 as simple-1.0) # so we can tell it didn't get overwritten dest_file = os.path.join(self.download_dir, self.dist_file) copy(self.dist_path2, dest_file) with open(self.dist_path2, 'rb') as f: dist_path2_md5 = hashlib.md5(f.read()).hexdigest() unpack_file_url(self.dist_url, self.build_dir, download_dir=self.download_dir) # our hash should be the same, i.e. not overwritten by simple-1.0 hash with open(dest_file, 'rb') as f: assert dist_path2_md5 == hashlib.md5(f.read()).hexdigest() def test_unpack_file_url_bad_hash(self, tmpdir, data, monkeypatch): """ Test when the file url hash fragment is wrong """ self.prep(tmpdir, data) url = '{}#md5=bogus'.format(self.dist_url.url) dist_url = Link(url) with pytest.raises(HashMismatch): unpack_file_url(dist_url, self.build_dir, hashes=Hashes({'md5': ['bogus']})) def test_unpack_file_url_download_bad_hash(self, tmpdir, data, monkeypatch): """ Test when existing download has different hash from the file url fragment """ self.prep(tmpdir, data) # add in previous download (copy simple-2.0 as simple-1.0 so it's wrong # hash) dest_file = os.path.join(self.download_dir, self.dist_file) copy(self.dist_path2, dest_file) with open(self.dist_path, 'rb') as f: dist_path_md5 = hashlib.md5(f.read()).hexdigest() with open(dest_file, 'rb') as f: dist_path2_md5 = hashlib.md5(f.read()).hexdigest() assert dist_path_md5 != dist_path2_md5 url = '{}#md5={}'.format(self.dist_url.url, dist_path_md5) dist_url = Link(url) unpack_file_url(dist_url, self.build_dir, download_dir=self.download_dir, hashes=Hashes({'md5': [dist_path_md5]})) # confirm hash is for simple1-1.0 # the previous bad download has been removed with open(dest_file, 'rb') as f: assert hashlib.md5(f.read()).hexdigest() == dist_path_md5 def test_unpack_file_url_thats_a_dir(self, tmpdir, data): self.prep(tmpdir, data) dist_path = data.packages.joinpath("FSPkg") dist_url = Link(path_to_url(dist_path)) unpack_file_url(dist_url, self.build_dir, download_dir=self.download_dir) assert os.path.isdir(os.path.join(self.build_dir, 'fspkg')) @pytest.mark.parametrize('exclude_dir', [ '.nox', '.tox' ]) def test_unpack_file_url_excludes_expected_dirs(tmpdir, exclude_dir): src_dir = tmpdir / 'src' dst_dir = tmpdir / 'dst' src_included_file = src_dir.joinpath('file.txt') src_excluded_dir = src_dir.joinpath(exclude_dir) src_excluded_file = src_dir.joinpath(exclude_dir, 'file.txt') src_included_dir = src_dir.joinpath('subdir', exclude_dir) # set up source directory src_excluded_dir.mkdir(parents=True) src_included_dir.mkdir(parents=True) src_included_file.touch() src_excluded_file.touch() dst_included_file = dst_dir.joinpath('file.txt') dst_excluded_dir = dst_dir.joinpath(exclude_dir) dst_excluded_file = dst_dir.joinpath(exclude_dir, 'file.txt') dst_included_dir = dst_dir.joinpath('subdir', exclude_dir) src_link = Link(path_to_url(src_dir)) unpack_file_url( src_link, dst_dir, download_dir=None ) assert not os.path.isdir(dst_excluded_dir) assert not os.path.isfile(dst_excluded_file) assert os.path.isfile(dst_included_file) assert os.path.isdir(dst_included_dir)